1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/tracing_service_impl.h"
18
19 #include "perfetto/base/build_config.h"
20 #include "perfetto/tracing/core/forward_decls.h"
21
22 #include <errno.h>
23 #include <limits.h>
24 #include <string.h>
25
26 #include <cinttypes>
27 #include <regex>
28 #include <unordered_set>
29
30 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
31 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
32 #include <sys/uio.h>
33 #include <sys/utsname.h>
34 #include <unistd.h>
35 #endif
36
37 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
38 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
39 #include "src/android_internal/lazy_library_loader.h" // nogncheck
40 #include "src/android_internal/tracing_service_proxy.h" // nogncheck
41 #endif
42
43 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
44 PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
45 PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
46 #define PERFETTO_HAS_CHMOD
47 #include <sys/stat.h>
48 #endif
49
50 #include <algorithm>
51
52 #include "perfetto/base/build_config.h"
53 #include "perfetto/base/status.h"
54 #include "perfetto/base/task_runner.h"
55 #include "perfetto/ext/base/android_utils.h"
56 #include "perfetto/ext/base/file_utils.h"
57 #include "perfetto/ext/base/metatrace.h"
58 #include "perfetto/ext/base/string_utils.h"
59 #include "perfetto/ext/base/temp_file.h"
60 #include "perfetto/ext/base/utils.h"
61 #include "perfetto/ext/base/uuid.h"
62 #include "perfetto/ext/base/version.h"
63 #include "perfetto/ext/base/watchdog.h"
64 #include "perfetto/ext/tracing/core/basic_types.h"
65 #include "perfetto/ext/tracing/core/consumer.h"
66 #include "perfetto/ext/tracing/core/observable_events.h"
67 #include "perfetto/ext/tracing/core/producer.h"
68 #include "perfetto/ext/tracing/core/shared_memory.h"
69 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
70 #include "perfetto/ext/tracing/core/trace_packet.h"
71 #include "perfetto/ext/tracing/core/trace_writer.h"
72 #include "perfetto/protozero/scattered_heap_buffer.h"
73 #include "perfetto/protozero/static_buffer.h"
74 #include "perfetto/tracing/core/data_source_descriptor.h"
75 #include "perfetto/tracing/core/tracing_service_capabilities.h"
76 #include "perfetto/tracing/core/tracing_service_state.h"
77 #include "src/android_stats/statsd_logging_helper.h"
78 #include "src/protozero/filtering/message_filter.h"
79 #include "src/tracing/core/packet_stream_validator.h"
80 #include "src/tracing/core/shared_memory_arbiter_impl.h"
81 #include "src/tracing/core/trace_buffer.h"
82
83 #include "protos/perfetto/common/builtin_clock.gen.h"
84 #include "protos/perfetto/common/builtin_clock.pbzero.h"
85 #include "protos/perfetto/common/trace_stats.pbzero.h"
86 #include "protos/perfetto/config/trace_config.pbzero.h"
87 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
88 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
89 #include "protos/perfetto/trace/system_info.pbzero.h"
90 #include "protos/perfetto/trace/trace_packet.pbzero.h"
91 #include "protos/perfetto/trace/trigger.pbzero.h"
92
93 // General note: this class must assume that Producers are malicious and will
94 // try to crash / exploit this class. We can trust pointers because they come
95 // from the IPC layer, but we should never assume that that the producer calls
96 // come in the right order or their arguments are sane / within bounds.
97
98 // This is a macro because we want the call-site line number for the ELOG.
99 #define PERFETTO_SVC_ERR(...) \
100 (PERFETTO_ELOG(__VA_ARGS__), ::perfetto::base::ErrStatus(__VA_ARGS__))
101
102 namespace perfetto {
103
104 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
105 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
106 // These are the only SELinux approved dir for trace files that are created
107 // directly by traced.
108 const char* kTraceDirBasePath = "/data/misc/perfetto-traces/";
109 const char* kAndroidProductionBugreportTracePath =
110 "/data/misc/perfetto-traces/bugreport/systrace.pftrace";
111 #endif
112
113 namespace {
114 constexpr int kMaxBuffersPerConsumer = 128;
115 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
116 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
117 constexpr int kMaxConcurrentTracingSessions = 15;
118 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
119 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
120 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
121
122 constexpr uint32_t kMillisPerHour = 3600000;
123 constexpr uint32_t kMillisPerDay = kMillisPerHour * 24;
124 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
125
126 // These apply only if enable_extra_guardrails is true.
127 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
128 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
129
130 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
131 struct iovec {
132 void* iov_base; // Address
133 size_t iov_len; // Block size
134 };
135
136 // Simple implementation of writev. Note that this does not give the atomicity
137 // guarantees of a real writev, but we don't depend on these (we aren't writing
138 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)139 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
140 ssize_t total_size = 0;
141 for (int i = 0; i < iovcnt; ++i) {
142 ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
143 if (current_size != static_cast<ssize_t>(iov[i].iov_len))
144 return -1;
145 total_size += current_size;
146 }
147 return total_size;
148 }
149
150 #define IOV_MAX 1024 // Linux compatible limit.
151
152 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
153 // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
154
155 // Partially encodes a CommitDataRequest in an int32 for the purposes of
156 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
157 // (which is technically 16 bits wide).
158 //
159 // Format (by bit range):
160 // [ 31 ][ 30 ][ 29:20 ][ 19:10 ][ 9:0]
161 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)162 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
163 const CommitDataRequest& req_untrusted) {
164 uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
165 uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
166 uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
167
168 uint32_t mask = (1 << 10) - 1;
169 uint32_t acc = 0;
170 acc |= has_flush_id << 30;
171 acc |= (cpatch & mask) << 20;
172 acc |= (cmov & mask) << 10;
173 acc |= (producer_id & mask);
174 return static_cast<int32_t>(acc);
175 }
176
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)177 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
178 std::vector<uint8_t> packet) {
179 Slice slice = Slice::Allocate(packet.size());
180 memcpy(slice.own_data(), packet.data(), packet.size());
181 packets->emplace_back();
182 packets->back().AddSlice(std::move(slice));
183 }
184
EnsureValidShmSizes(size_t shm_size,size_t page_size)185 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
186 size_t shm_size,
187 size_t page_size) {
188 // Theoretically the max page size supported by the ABI is 64KB.
189 // However, the current implementation of TraceBuffer (the non-shared
190 // userspace buffer where the service copies data) supports at most
191 // 32K. Setting 64K "works" from the producer<>consumer viewpoint
192 // but then causes the data to be discarded when copying it into
193 // TraceBuffer.
194 constexpr size_t kMaxPageSize = 32 * 1024;
195 static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
196
197 if (page_size == 0)
198 page_size = TracingServiceImpl::kDefaultShmPageSize;
199 if (shm_size == 0)
200 shm_size = TracingServiceImpl::kDefaultShmSize;
201
202 page_size = std::min<size_t>(page_size, kMaxPageSize);
203 shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
204
205 // The tracing page size has to be multiple of 4K. On some systems (e.g. Mac
206 // on Arm64) the system page size can be larger (e.g., 16K). That doesn't
207 // matter here, because the tracing page size is just a logical partitioning
208 // and does not have any dependencies on kernel mm syscalls (read: it's fine
209 // to have trace page sizes of 4K on a system where the kernel page size is
210 // 16K).
211 bool page_size_is_valid = page_size >= SharedMemoryABI::kMinPageSize;
212 page_size_is_valid &= page_size % SharedMemoryABI::kMinPageSize == 0;
213
214 // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
215 size_t num_pages = page_size / SharedMemoryABI::kMinPageSize;
216 page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
217
218 if (!page_size_is_valid || shm_size < page_size ||
219 shm_size % page_size != 0) {
220 return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
221 TracingServiceImpl::kDefaultShmPageSize);
222 }
223 return std::make_tuple(shm_size, page_size);
224 }
225
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)226 bool NameMatchesFilter(const std::string& name,
227 const std::vector<std::string>& name_filter,
228 const std::vector<std::string>& name_regex_filter) {
229 bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
230 if (!filter_is_set)
231 return true;
232 bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
233 name) != name_filter.end();
234 bool filter_regex_matches =
235 std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
236 [&](const std::string& regex) {
237 return std::regex_match(
238 name, std::regex(regex, std::regex::extended));
239 }) != name_regex_filter.end();
240 return filter_matches || filter_regex_matches;
241 }
242
243 // Used when:
244 // 1. TraceConfig.write_into_file == true and output_path is not empty.
245 // 2. Calling SaveTraceForBugreport(), from perfetto --save-for-bugreport.
CreateTraceFile(const std::string & path,bool overwrite)246 base::ScopedFile CreateTraceFile(const std::string& path, bool overwrite) {
247 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
248 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
249 // This is NOT trying to preserve any security property, SELinux does that.
250 // It just improves the actionability of the error when people try to save the
251 // trace in a location that is not SELinux-allowed (a generic "permission
252 // denied" vs "don't put it here, put it there").
253 if (!base::StartsWith(path, kTraceDirBasePath)) {
254 PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
255 path.c_str(), kTraceDirBasePath);
256 return base::ScopedFile();
257 }
258 #endif
259 // O_CREAT | O_EXCL will fail if the file exists already.
260 const int flags = O_RDWR | O_CREAT | (overwrite ? O_TRUNC : O_EXCL);
261 auto fd = base::OpenFile(path, flags, 0600);
262 if (fd) {
263 #if defined(PERFETTO_HAS_CHMOD)
264 // Passing 0644 directly above won't work because of umask.
265 PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
266 #endif
267 } else {
268 PERFETTO_PLOG("Failed to create %s", path.c_str());
269 }
270 return fd;
271 }
272
GetBugreportTmpPath()273 std::string GetBugreportTmpPath() {
274 return GetBugreportPath() + ".tmp";
275 }
276
ShouldLogEvent(const TraceConfig & cfg)277 bool ShouldLogEvent(const TraceConfig& cfg) {
278 switch (cfg.statsd_logging()) {
279 case TraceConfig::STATSD_LOGGING_ENABLED:
280 return true;
281 case TraceConfig::STATSD_LOGGING_DISABLED:
282 return false;
283 case TraceConfig::STATSD_LOGGING_UNSPECIFIED:
284 // For backward compatibility with older versions of perfetto_cmd.
285 return cfg.enable_extra_guardrails();
286 }
287 PERFETTO_FATAL("For GCC");
288 }
289
290 // Appends `data` (which has `size` bytes), to `*packet`. Splits the data in
291 // slices no larger than `max_slice_size`.
AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,size_t size,size_t max_slice_size,perfetto::TracePacket * packet)292 void AppendOwnedSlicesToPacket(std::unique_ptr<uint8_t[]> data,
293 size_t size,
294 size_t max_slice_size,
295 perfetto::TracePacket* packet) {
296 if (size <= max_slice_size) {
297 packet->AddSlice(Slice::TakeOwnership(std::move(data), size));
298 return;
299 }
300 uint8_t* src_ptr = data.get();
301 for (size_t size_left = size; size_left > 0;) {
302 const size_t slice_size = std::min(size_left, max_slice_size);
303
304 Slice slice = Slice::Allocate(slice_size);
305 memcpy(slice.own_data(), src_ptr, slice_size);
306 packet->AddSlice(std::move(slice));
307
308 src_ptr += slice_size;
309 size_left -= slice_size;
310 }
311 }
312
313 } // namespace
314
315 // These constants instead are defined in the header because are used by tests.
316 constexpr size_t TracingServiceImpl::kDefaultShmSize;
317 constexpr size_t TracingServiceImpl::kDefaultShmPageSize;
318
319 constexpr size_t TracingServiceImpl::kMaxShmSize;
320 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
321 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
322
GetBugreportPath()323 std::string GetBugreportPath() {
324 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) && \
325 PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
326 return kAndroidProductionBugreportTracePath;
327 #else
328 // Only for tests, SaveTraceForBugreport is not used on other OSes.
329 return base::GetSysTempDir() + "/bugreport.pftrace";
330 #endif
331 }
332
333 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)334 std::unique_ptr<TracingService> TracingService::CreateInstance(
335 std::unique_ptr<SharedMemory::Factory> shm_factory,
336 base::TaskRunner* task_runner) {
337 return std::unique_ptr<TracingService>(
338 new TracingServiceImpl(std::move(shm_factory), task_runner));
339 }
340
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)341 TracingServiceImpl::TracingServiceImpl(
342 std::unique_ptr<SharedMemory::Factory> shm_factory,
343 base::TaskRunner* task_runner)
344 : task_runner_(task_runner),
345 shm_factory_(std::move(shm_factory)),
346 uid_(base::GetCurrentUserId()),
347 buffer_ids_(kMaxTraceBufferID),
348 trigger_probability_rand_(
349 static_cast<uint32_t>(base::GetWallTimeNs().count())),
350 weak_ptr_factory_(this) {
351 PERFETTO_DCHECK(task_runner_);
352 }
353
~TracingServiceImpl()354 TracingServiceImpl::~TracingServiceImpl() {
355 // TODO(fmayer): handle teardown of all Producer.
356 }
357
358 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,pid_t pid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,const std::string & sdk_version)359 TracingServiceImpl::ConnectProducer(Producer* producer,
360 uid_t uid,
361 pid_t pid,
362 const std::string& producer_name,
363 size_t shared_memory_size_hint_bytes,
364 bool in_process,
365 ProducerSMBScrapingMode smb_scraping_mode,
366 size_t shared_memory_page_size_hint_bytes,
367 std::unique_ptr<SharedMemory> shm,
368 const std::string& sdk_version) {
369 PERFETTO_DCHECK_THREAD(thread_checker_);
370
371 if (lockdown_mode_ && uid != base::GetCurrentUserId()) {
372 PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
373 static_cast<unsigned long>(uid));
374 return nullptr;
375 }
376
377 if (producers_.size() >= kMaxProducerID) {
378 PERFETTO_DFATAL("Too many producers.");
379 return nullptr;
380 }
381 const ProducerID id = GetNextProducerID();
382 PERFETTO_DLOG("Producer %" PRIu16 " connected, uid=%d", id,
383 static_cast<int>(uid));
384 bool smb_scraping_enabled = smb_scraping_enabled_;
385 switch (smb_scraping_mode) {
386 case ProducerSMBScrapingMode::kDefault:
387 break;
388 case ProducerSMBScrapingMode::kEnabled:
389 smb_scraping_enabled = true;
390 break;
391 case ProducerSMBScrapingMode::kDisabled:
392 smb_scraping_enabled = false;
393 break;
394 }
395
396 std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
397 id, uid, pid, this, task_runner_, producer, producer_name, sdk_version,
398 in_process, smb_scraping_enabled));
399 auto it_and_inserted = producers_.emplace(id, endpoint.get());
400 PERFETTO_DCHECK(it_and_inserted.second);
401 endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
402 endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
403
404 // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
405 // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
406 auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
407 task_runner_->PostTask([weak_ptr] {
408 if (weak_ptr)
409 weak_ptr->producer_->OnConnect();
410 });
411
412 if (shm) {
413 // The producer supplied an SMB. This is used only by Chrome; in the most
414 // common cases the SMB is created by the service and passed via
415 // OnTracingSetup(). Verify that it is correctly sized before we attempt to
416 // use it. The transport layer has to verify the integrity of the SMB (e.g.
417 // ensure that the producer can't resize if after the fact).
418 size_t shm_size, page_size;
419 std::tie(shm_size, page_size) =
420 EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
421 if (shm_size == shm->size() &&
422 page_size == endpoint->shmem_page_size_hint_bytes_) {
423 PERFETTO_DLOG(
424 "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
425 shm_size / 1024, endpoint->name_.c_str());
426 endpoint->SetupSharedMemory(std::move(shm), page_size,
427 /*provided_by_producer=*/true);
428 } else {
429 PERFETTO_LOG(
430 "Discarding incorrectly sized producer-provided SMB for producer "
431 "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
432 "B total, %zu B page size; suggested corrected sizes: %zu B total, "
433 "%zu B page size",
434 endpoint->name_.c_str(), shm->size(),
435 endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
436 shm.reset();
437 }
438 }
439
440 return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
441 }
442
DisconnectProducer(ProducerID id)443 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
444 PERFETTO_DCHECK_THREAD(thread_checker_);
445 PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
446 PERFETTO_DCHECK(producers_.count(id));
447
448 // Scrape remaining chunks for this producer to ensure we don't lose data.
449 if (auto* producer = GetProducer(id)) {
450 for (auto& session_id_and_session : tracing_sessions_)
451 ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
452 }
453
454 for (auto it = data_sources_.begin(); it != data_sources_.end();) {
455 auto next = it;
456 next++;
457 if (it->second.producer_id == id)
458 UnregisterDataSource(id, it->second.descriptor.name());
459 it = next;
460 }
461
462 producers_.erase(id);
463 UpdateMemoryGuardrail();
464 }
465
GetProducer(ProducerID id) const466 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
467 ProducerID id) const {
468 PERFETTO_DCHECK_THREAD(thread_checker_);
469 auto it = producers_.find(id);
470 if (it == producers_.end())
471 return nullptr;
472 return it->second;
473 }
474
475 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)476 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
477 PERFETTO_DCHECK_THREAD(thread_checker_);
478 PERFETTO_DLOG("Consumer %p connected from UID %" PRIu64,
479 reinterpret_cast<void*>(consumer), static_cast<uint64_t>(uid));
480 std::unique_ptr<ConsumerEndpointImpl> endpoint(
481 new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
482 auto it_and_inserted = consumers_.emplace(endpoint.get());
483 PERFETTO_DCHECK(it_and_inserted.second);
484 // Consumer might go away before we're able to send the connect notification,
485 // if that is the case just bail out.
486 auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
487 task_runner_->PostTask([weak_ptr] {
488 if (weak_ptr)
489 weak_ptr->consumer_->OnConnect();
490 });
491 return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
492 }
493
DisconnectConsumer(ConsumerEndpointImpl * consumer)494 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
495 PERFETTO_DCHECK_THREAD(thread_checker_);
496 PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
497 PERFETTO_DCHECK(consumers_.count(consumer));
498
499 // TODO(primiano) : Check that this is safe (what happens if there are
500 // ReadBuffers() calls posted in the meantime? They need to become noop).
501 if (consumer->tracing_session_id_)
502 FreeBuffers(consumer->tracing_session_id_); // Will also DisableTracing().
503 consumers_.erase(consumer);
504
505 // At this point no more pointers to |consumer| should be around.
506 PERFETTO_DCHECK(!std::any_of(
507 tracing_sessions_.begin(), tracing_sessions_.end(),
508 [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
509 return kv.second.consumer_maybe_null == consumer;
510 }));
511 }
512
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)513 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
514 const std::string& key) {
515 PERFETTO_DCHECK_THREAD(thread_checker_);
516 PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
517 PERFETTO_DCHECK(consumers_.count(consumer));
518
519 TracingSessionID tsid = consumer->tracing_session_id_;
520 TracingSession* tracing_session;
521 if (!tsid || !(tracing_session = GetTracingSession(tsid)))
522 return false;
523
524 if (GetDetachedSession(consumer->uid_, key)) {
525 PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
526 key.c_str());
527 return false;
528 }
529
530 PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
531 tracing_session->consumer_maybe_null = nullptr;
532 tracing_session->detach_key = key;
533 consumer->tracing_session_id_ = 0;
534 return true;
535 }
536
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)537 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
538 const std::string& key) {
539 PERFETTO_DCHECK_THREAD(thread_checker_);
540 PERFETTO_DLOG("Consumer %p attaching to session %s",
541 reinterpret_cast<void*>(consumer), key.c_str());
542 PERFETTO_DCHECK(consumers_.count(consumer));
543
544 if (consumer->tracing_session_id_) {
545 PERFETTO_ELOG(
546 "Cannot reattach consumer to session %s"
547 " while it already attached tracing session ID %" PRIu64,
548 key.c_str(), consumer->tracing_session_id_);
549 return false;
550 }
551
552 auto* tracing_session = GetDetachedSession(consumer->uid_, key);
553 if (!tracing_session) {
554 PERFETTO_ELOG(
555 "Failed to attach consumer, session '%s' not found for uid %d",
556 key.c_str(), static_cast<int>(consumer->uid_));
557 return false;
558 }
559
560 consumer->tracing_session_id_ = tracing_session->id;
561 tracing_session->consumer_maybe_null = consumer;
562 tracing_session->detach_key.clear();
563 return true;
564 }
565
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)566 base::Status TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
567 const TraceConfig& cfg,
568 base::ScopedFile fd) {
569 PERFETTO_DCHECK_THREAD(thread_checker_);
570 PERFETTO_DLOG("Enabling tracing for consumer %p",
571 reinterpret_cast<void*>(consumer));
572 MaybeLogUploadEvent(cfg, PerfettoStatsdAtom::kTracedEnableTracing);
573 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
574 lockdown_mode_ = true;
575 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
576 lockdown_mode_ = false;
577
578 // Scope |tracing_session| to this block to prevent accidental use of a null
579 // pointer later in this function.
580 {
581 TracingSession* tracing_session =
582 GetTracingSession(consumer->tracing_session_id_);
583 if (tracing_session) {
584 MaybeLogUploadEvent(
585 cfg, PerfettoStatsdAtom::kTracedEnableTracingExistingTraceSession);
586 return PERFETTO_SVC_ERR(
587 "A Consumer is trying to EnableTracing() but another tracing "
588 "session is already active (forgot a call to FreeBuffers() ?)");
589 }
590 }
591
592 const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
593 ? kGuardrailsMaxTracingDurationMillis
594 : kMaxTracingDurationMillis;
595 if (cfg.duration_ms() > max_duration_ms) {
596 MaybeLogUploadEvent(cfg,
597 PerfettoStatsdAtom::kTracedEnableTracingTooLongTrace);
598 return PERFETTO_SVC_ERR("Requested too long trace (%" PRIu32
599 "ms > %" PRIu32 " ms)",
600 cfg.duration_ms(), max_duration_ms);
601 }
602
603 const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
604 TraceConfig::TriggerConfig::UNSPECIFIED;
605 if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
606 cfg.trigger_config().trigger_timeout_ms() >
607 kGuardrailsMaxTracingDurationMillis)) {
608 MaybeLogUploadEvent(
609 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerTimeout);
610 return PERFETTO_SVC_ERR(
611 "Traces with START_TRACING triggers must provide a positive "
612 "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
613 cfg.trigger_config().trigger_timeout_ms());
614 }
615
616 // This check has been introduced in May 2023 after finding b/274931668.
617 if (static_cast<int>(cfg.trigger_config().trigger_mode()) >
618 TraceConfig::TriggerConfig::TriggerMode_MAX) {
619 MaybeLogUploadEvent(
620 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidTriggerMode);
621 return PERFETTO_SVC_ERR(
622 "The trace config specified an invalid trigger_mode");
623 }
624
625 if (has_trigger_config && cfg.duration_ms() != 0) {
626 MaybeLogUploadEvent(
627 cfg, PerfettoStatsdAtom::kTracedEnableTracingDurationWithTrigger);
628 return PERFETTO_SVC_ERR(
629 "duration_ms was set, this must not be set for traces with triggers.");
630 }
631
632 if (cfg.trigger_config().trigger_mode() ==
633 TraceConfig::TriggerConfig::STOP_TRACING &&
634 cfg.write_into_file()) {
635 // We don't support this usecase because there are subtle assumptions which
636 // break around TracingServiceEvents and windowed sorting (i.e. if we don't
637 // drain the events in ReadBuffersIntoFile because we are waiting for
638 // STOP_TRACING, we can end up queueing up a lot of TracingServiceEvents and
639 // emitting them wildy out of order breaking windowed sorting in trace
640 // processor).
641 MaybeLogUploadEvent(
642 cfg, PerfettoStatsdAtom::kTracedEnableTracingStopTracingWriteIntoFile);
643 return PERFETTO_SVC_ERR(
644 "Specifying trigger mode STOP_TRACING and write_into_file together is "
645 "unsupported");
646 }
647
648 std::unordered_set<std::string> triggers;
649 for (const auto& trigger : cfg.trigger_config().triggers()) {
650 if (!triggers.insert(trigger.name()).second) {
651 MaybeLogUploadEvent(
652 cfg, PerfettoStatsdAtom::kTracedEnableTracingDuplicateTriggerName);
653 return PERFETTO_SVC_ERR("Duplicate trigger name: %s",
654 trigger.name().c_str());
655 }
656 }
657
658 if (cfg.enable_extra_guardrails()) {
659 if (cfg.deferred_start()) {
660 MaybeLogUploadEvent(
661 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidDeferredStart);
662 return PERFETTO_SVC_ERR(
663 "deferred_start=true is not supported in unsupervised traces");
664 }
665 uint64_t buf_size_sum = 0;
666 for (const auto& buf : cfg.buffers()) {
667 if (buf.size_kb() % 4 != 0) {
668 MaybeLogUploadEvent(
669 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidBufferSize);
670 return PERFETTO_SVC_ERR(
671 "buffers.size_kb must be a multiple of 4, got %" PRIu32,
672 buf.size_kb());
673 }
674 buf_size_sum += buf.size_kb();
675 }
676
677 uint32_t max_tracing_buffer_size_kb =
678 std::max(kGuardrailsMaxTracingBufferSizeKb,
679 cfg.guardrail_overrides().max_tracing_buffer_size_kb());
680 if (buf_size_sum > max_tracing_buffer_size_kb) {
681 MaybeLogUploadEvent(
682 cfg, PerfettoStatsdAtom::kTracedEnableTracingBufferSizeTooLarge);
683 return PERFETTO_SVC_ERR("Requested too large trace buffer (%" PRIu64
684 "kB > %" PRIu32 " kB)",
685 buf_size_sum, max_tracing_buffer_size_kb);
686 }
687 }
688
689 if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
690 MaybeLogUploadEvent(cfg,
691 PerfettoStatsdAtom::kTracedEnableTracingTooManyBuffers);
692 return PERFETTO_SVC_ERR("Too many buffers configured (%d)",
693 cfg.buffers_size());
694 }
695 // Check that the config specifies all buffers for its data sources. This
696 // is also checked in SetupDataSource, but it is simpler to return a proper
697 // error to the consumer from here (and there will be less state to undo).
698 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
699 size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
700 size_t target_buffer = cfg_data_source.config().target_buffer();
701 if (target_buffer >= num_buffers) {
702 MaybeLogUploadEvent(
703 cfg, PerfettoStatsdAtom::kTracedEnableTracingOobTargetBuffer);
704 return PERFETTO_SVC_ERR(
705 "Data source \"%s\" specified an out of bounds target_buffer (%zu >= "
706 "%zu)",
707 cfg_data_source.config().name().c_str(), target_buffer, num_buffers);
708 }
709 }
710
711 if (!cfg.unique_session_name().empty()) {
712 const std::string& name = cfg.unique_session_name();
713 for (auto& kv : tracing_sessions_) {
714 if (kv.second.config.unique_session_name() == name) {
715 MaybeLogUploadEvent(
716 cfg, PerfettoStatsdAtom::kTracedEnableTracingDuplicateSessionName);
717 static const char fmt[] =
718 "A trace with this unique session name (%s) already exists";
719 // This happens frequently, don't make it an "E"LOG.
720 PERFETTO_LOG(fmt, name.c_str());
721 return base::ErrStatus(fmt, name.c_str());
722 }
723 }
724 }
725
726 if (cfg.enable_extra_guardrails()) {
727 // unique_session_name can be empty
728 const std::string& name = cfg.unique_session_name();
729 int64_t now_s = base::GetBootTimeS().count();
730
731 // Remove any entries where the time limit has passed so this map doesn't
732 // grow indefinitely:
733 std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
734 for (auto it = sessions.cbegin(); it != sessions.cend();) {
735 if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
736 it = sessions.erase(it);
737 } else {
738 ++it;
739 }
740 }
741
742 int64_t& previous_s = session_to_last_trace_s_[name];
743 if (previous_s == 0) {
744 previous_s = now_s;
745 } else {
746 MaybeLogUploadEvent(
747 cfg, PerfettoStatsdAtom::kTracedEnableTracingSessionNameTooRecent);
748 return PERFETTO_SVC_ERR(
749 "A trace with unique session name \"%s\" began less than %" PRId64
750 "s ago (%" PRId64 "s)",
751 name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
752 }
753 }
754
755 const int sessions_for_uid = static_cast<int>(std::count_if(
756 tracing_sessions_.begin(), tracing_sessions_.end(),
757 [consumer](const decltype(tracing_sessions_)::value_type& s) {
758 return s.second.consumer_uid == consumer->uid_;
759 }));
760
761 int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
762 if (consumer->uid_ == 1066 /* AID_STATSD*/) {
763 per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
764 }
765 if (sessions_for_uid >= per_uid_limit) {
766 MaybeLogUploadEvent(
767 cfg, PerfettoStatsdAtom::kTracedEnableTracingTooManySessionsForUid);
768 return PERFETTO_SVC_ERR(
769 "Too many concurrent tracing sesions (%d) for uid %d limit is %d",
770 sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
771 }
772
773 // TODO(primiano): This is a workaround to prevent that a producer gets stuck
774 // in a state where it stalls by design by having more TraceWriterImpl
775 // instances than free pages in the buffer. This is really a bug in
776 // trace_probes and the way it handles stalls in the shmem buffer.
777 if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
778 MaybeLogUploadEvent(
779 cfg, PerfettoStatsdAtom::kTracedEnableTracingTooManyConcurrentSessions);
780 return PERFETTO_SVC_ERR("Too many concurrent tracing sesions (%zu)",
781 tracing_sessions_.size());
782 }
783
784 // If the trace config provides a filter bytecode, setup the filter now.
785 // If the filter loading fails, abort the tracing session rather than running
786 // unfiltered.
787 std::unique_ptr<protozero::MessageFilter> trace_filter;
788 if (cfg.has_trace_filter()) {
789 const auto& filt = cfg.trace_filter();
790 const std::string& bytecode = filt.bytecode();
791 trace_filter.reset(new protozero::MessageFilter());
792 if (!trace_filter->LoadFilterBytecode(bytecode.data(), bytecode.size())) {
793 MaybeLogUploadEvent(
794 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
795 return PERFETTO_SVC_ERR("Trace filter bytecode invalid, aborting");
796 }
797 // The filter is created using perfetto.protos.Trace as root message
798 // (because that makes it possible to play around with the `proto_filter`
799 // tool on actual traces). Here in the service, however, we deal with
800 // perfetto.protos.TracePacket(s), which are one level down (Trace.packet).
801 // The IPC client (or the write_into_filte logic in here) are responsible
802 // for pre-pending the packet preamble (See GetProtoPreamble() calls), but
803 // the preamble is not there at ReadBuffer time. Hence we change the root of
804 // the filtering to start at the Trace.packet level.
805 uint32_t packet_field_id = TracePacket::kPacketFieldNumber;
806 if (!trace_filter->SetFilterRoot(&packet_field_id, 1)) {
807 MaybeLogUploadEvent(
808 cfg, PerfettoStatsdAtom::kTracedEnableTracingInvalidFilter);
809 return PERFETTO_SVC_ERR("Failed to set filter root.");
810 }
811 }
812
813 const TracingSessionID tsid = ++last_tracing_session_id_;
814 TracingSession* tracing_session =
815 &tracing_sessions_
816 .emplace(std::piecewise_construct, std::forward_as_tuple(tsid),
817 std::forward_as_tuple(tsid, consumer, cfg, task_runner_))
818 .first->second;
819
820 if (trace_filter)
821 tracing_session->trace_filter = std::move(trace_filter);
822
823 if (cfg.write_into_file()) {
824 if (!fd ^ !cfg.output_path().empty()) {
825 tracing_sessions_.erase(tsid);
826 MaybeLogUploadEvent(
827 tracing_session->config,
828 PerfettoStatsdAtom::kTracedEnableTracingInvalidFdOutputFile);
829 return PERFETTO_SVC_ERR(
830 "When write_into_file==true either a FD needs to be passed or "
831 "output_path must be populated (but not both)");
832 }
833 if (!cfg.output_path().empty()) {
834 fd = CreateTraceFile(cfg.output_path(), /*overwrite=*/false);
835 if (!fd) {
836 MaybeLogUploadEvent(
837 tracing_session->config,
838 PerfettoStatsdAtom::kTracedEnableTracingFailedToCreateFile);
839 tracing_sessions_.erase(tsid);
840 return PERFETTO_SVC_ERR("Failed to create the trace file %s",
841 cfg.output_path().c_str());
842 }
843 }
844 tracing_session->write_into_file = std::move(fd);
845 uint32_t write_period_ms = cfg.file_write_period_ms();
846 if (write_period_ms == 0)
847 write_period_ms = kDefaultWriteIntoFilePeriodMs;
848 if (write_period_ms < min_write_period_ms_)
849 write_period_ms = min_write_period_ms_;
850 tracing_session->write_period_ms = write_period_ms;
851 tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
852 tracing_session->bytes_written_into_file = 0;
853 }
854
855 // Initialize the log buffers.
856 bool did_allocate_all_buffers = true;
857
858 // Allocate the trace buffers. Also create a map to translate a consumer
859 // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
860 // corresponding BufferID, which is a global ID namespace for the service and
861 // all producers.
862 size_t total_buf_size_kb = 0;
863 const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
864 tracing_session->buffers_index.reserve(num_buffers);
865 for (size_t i = 0; i < num_buffers; i++) {
866 const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
867 BufferID global_id = buffer_ids_.Allocate();
868 if (!global_id) {
869 did_allocate_all_buffers = false; // We ran out of IDs.
870 break;
871 }
872 tracing_session->buffers_index.push_back(global_id);
873 const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
874 total_buf_size_kb += buffer_cfg.size_kb();
875 TraceBuffer::OverwritePolicy policy =
876 buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
877 ? TraceBuffer::kDiscard
878 : TraceBuffer::kOverwrite;
879 auto it_and_inserted = buffers_.emplace(
880 global_id, TraceBuffer::Create(buf_size_bytes, policy));
881 PERFETTO_DCHECK(it_and_inserted.second); // buffers_.count(global_id) == 0.
882 std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
883 if (!trace_buffer) {
884 did_allocate_all_buffers = false;
885 break;
886 }
887 }
888
889 UpdateMemoryGuardrail();
890
891 // This can happen if either:
892 // - All the kMaxTraceBufferID slots are taken.
893 // - OOM, or, more relistically, we exhausted virtual memory.
894 // In any case, free all the previously allocated buffers and abort.
895 // TODO(fmayer): add a test to cover this case, this is quite subtle.
896 if (!did_allocate_all_buffers) {
897 for (BufferID global_id : tracing_session->buffers_index) {
898 buffer_ids_.Free(global_id);
899 buffers_.erase(global_id);
900 }
901 tracing_sessions_.erase(tsid);
902 MaybeLogUploadEvent(tracing_session->config,
903 PerfettoStatsdAtom::kTracedEnableTracingOom);
904 return PERFETTO_SVC_ERR(
905 "Failed to allocate tracing buffers: OOM or too many buffers");
906 }
907
908 consumer->tracing_session_id_ = tsid;
909
910 // Setup the data sources on the producers without starting them.
911 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
912 // Scan all the registered data sources with a matching name.
913 auto range = data_sources_.equal_range(cfg_data_source.config().name());
914 for (auto it = range.first; it != range.second; it++) {
915 TraceConfig::ProducerConfig producer_config;
916 for (auto& config : cfg.producers()) {
917 if (GetProducer(it->second.producer_id)->name_ ==
918 config.producer_name()) {
919 producer_config = config;
920 break;
921 }
922 }
923 SetupDataSource(cfg_data_source, producer_config, it->second,
924 tracing_session);
925 }
926 }
927
928 bool has_start_trigger = false;
929 auto weak_this = weak_ptr_factory_.GetWeakPtr();
930 switch (cfg.trigger_config().trigger_mode()) {
931 case TraceConfig::TriggerConfig::UNSPECIFIED:
932 // no triggers are specified so this isn't a trace that is using triggers.
933 PERFETTO_DCHECK(!has_trigger_config);
934 break;
935 case TraceConfig::TriggerConfig::START_TRACING:
936 // For traces which use START_TRACE triggers we need to ensure that the
937 // tracing session will be cleaned up when it times out.
938 has_start_trigger = true;
939 task_runner_->PostDelayedTask(
940 [weak_this, tsid]() {
941 if (weak_this)
942 weak_this->OnStartTriggersTimeout(tsid);
943 },
944 cfg.trigger_config().trigger_timeout_ms());
945 break;
946 case TraceConfig::TriggerConfig::STOP_TRACING:
947 // Update the tracing_session's duration_ms to ensure that if no trigger
948 // is received the session will end and be cleaned up equal to the
949 // timeout.
950 //
951 // TODO(nuskos): Refactor this so that rather then modifying the config we
952 // have a field we look at on the tracing_session.
953 tracing_session->config.set_duration_ms(
954 cfg.trigger_config().trigger_timeout_ms());
955 break;
956
957 // The case of unknown modes (coming from future versions of the service)
958 // is handled few lines above (search for TriggerMode_MAX).
959 }
960
961 tracing_session->state = TracingSession::CONFIGURED;
962 PERFETTO_LOG(
963 "Configured tracing session %" PRIu64
964 ", #sources:%zu, duration:%d ms, #buffers:%d, total "
965 "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
966 tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
967 cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
968 static_cast<unsigned int>(consumer->uid_),
969 cfg.unique_session_name().c_str());
970
971 // Start the data sources, unless this is a case of early setup + fast
972 // triggering, either through TraceConfig.deferred_start or
973 // TraceConfig.trigger_config(). If both are specified which ever one occurs
974 // first will initiate the trace.
975 if (!cfg.deferred_start() && !has_start_trigger)
976 return StartTracing(tsid);
977
978 return base::OkStatus();
979 }
980
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)981 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
982 const TraceConfig& updated_cfg) {
983 PERFETTO_DCHECK_THREAD(thread_checker_);
984 TracingSession* tracing_session =
985 GetTracingSession(consumer->tracing_session_id_);
986 PERFETTO_DCHECK(tracing_session);
987
988 if ((tracing_session->state != TracingSession::STARTED) &&
989 (tracing_session->state != TracingSession::CONFIGURED)) {
990 PERFETTO_ELOG(
991 "ChangeTraceConfig() was called for a tracing session which isn't "
992 "running.");
993 return;
994 }
995
996 // We only support updating producer_name_{,regex}_filter (and pass-through
997 // configs) for now; null out any changeable fields and make sure the rest are
998 // identical.
999 TraceConfig new_config_copy(updated_cfg);
1000 for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
1001 ds_cfg.clear_producer_name_filter();
1002 ds_cfg.clear_producer_name_regex_filter();
1003 }
1004
1005 TraceConfig current_config_copy(tracing_session->config);
1006 for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
1007 ds_cfg.clear_producer_name_filter();
1008 ds_cfg.clear_producer_name_regex_filter();
1009 }
1010
1011 if (new_config_copy != current_config_copy) {
1012 PERFETTO_LOG(
1013 "ChangeTraceConfig() was called with a config containing unsupported "
1014 "changes; only adding to the producer_name_{,regex}_filter is "
1015 "currently supported and will have an effect.");
1016 }
1017
1018 for (TraceConfig::DataSource& cfg_data_source :
1019 *tracing_session->config.mutable_data_sources()) {
1020 // Find the updated producer_filter in the new config.
1021 std::vector<std::string> new_producer_name_filter;
1022 std::vector<std::string> new_producer_name_regex_filter;
1023 bool found_data_source = false;
1024 for (const auto& it : updated_cfg.data_sources()) {
1025 if (cfg_data_source.config().name() == it.config().name()) {
1026 new_producer_name_filter = it.producer_name_filter();
1027 new_producer_name_regex_filter = it.producer_name_regex_filter();
1028 found_data_source = true;
1029 break;
1030 }
1031 }
1032
1033 // Bail out if data source not present in the new config.
1034 if (!found_data_source) {
1035 PERFETTO_ELOG(
1036 "ChangeTraceConfig() called without a current data source also "
1037 "present in the new config: %s",
1038 cfg_data_source.config().name().c_str());
1039 continue;
1040 }
1041
1042 // TODO(oysteine): Just replacing the filter means that if
1043 // there are any filter entries which were present in the original config,
1044 // but removed from the config passed to ChangeTraceConfig, any matching
1045 // producers will keep producing but newly added producers after this
1046 // point will never start.
1047 *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
1048 *cfg_data_source.mutable_producer_name_regex_filter() =
1049 new_producer_name_regex_filter;
1050
1051 // Scan all the registered data sources with a matching name.
1052 auto range = data_sources_.equal_range(cfg_data_source.config().name());
1053 for (auto it = range.first; it != range.second; it++) {
1054 ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
1055 PERFETTO_DCHECK(producer);
1056
1057 // Check if the producer name of this data source is present
1058 // in the name filters. We currently only support new filters, not
1059 // removing old ones.
1060 if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
1061 new_producer_name_regex_filter)) {
1062 continue;
1063 }
1064
1065 bool already_setup = false;
1066 auto& ds_instances = tracing_session->data_source_instances;
1067 for (auto instance_it = ds_instances.begin();
1068 instance_it != ds_instances.end(); ++instance_it) {
1069 if (instance_it->first == it->second.producer_id &&
1070 instance_it->second.data_source_name ==
1071 cfg_data_source.config().name()) {
1072 already_setup = true;
1073 break;
1074 }
1075 }
1076
1077 if (already_setup)
1078 continue;
1079
1080 // If it wasn't previously setup, set it up now.
1081 // (The per-producer config is optional).
1082 TraceConfig::ProducerConfig producer_config;
1083 for (auto& config : tracing_session->config.producers()) {
1084 if (producer->name_ == config.producer_name()) {
1085 producer_config = config;
1086 break;
1087 }
1088 }
1089
1090 DataSourceInstance* ds_inst = SetupDataSource(
1091 cfg_data_source, producer_config, it->second, tracing_session);
1092
1093 if (ds_inst && tracing_session->state == TracingSession::STARTED)
1094 StartDataSourceInstance(producer, tracing_session, ds_inst);
1095 }
1096 }
1097 }
1098
StartTracing(TracingSessionID tsid)1099 base::Status TracingServiceImpl::StartTracing(TracingSessionID tsid) {
1100 PERFETTO_DCHECK_THREAD(thread_checker_);
1101
1102 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1103 TracingSession* tracing_session = GetTracingSession(tsid);
1104 if (!tracing_session) {
1105 return PERFETTO_SVC_ERR(
1106 "StartTracing() failed, invalid session ID %" PRIu64, tsid);
1107 }
1108
1109 MaybeLogUploadEvent(tracing_session->config,
1110 PerfettoStatsdAtom::kTracedStartTracing);
1111
1112 if (tracing_session->state != TracingSession::CONFIGURED) {
1113 MaybeLogUploadEvent(
1114 tracing_session->config,
1115 PerfettoStatsdAtom::kTracedStartTracingInvalidSessionState);
1116 return PERFETTO_SVC_ERR("StartTracing() failed, invalid session state: %d",
1117 tracing_session->state);
1118 }
1119
1120 tracing_session->state = TracingSession::STARTED;
1121
1122 // We store the start of trace snapshot separately as it's important to make
1123 // sure we can interpret all the data in the trace and storing it in the ring
1124 // buffer means it could be overwritten by a later snapshot.
1125 if (!tracing_session->config.builtin_data_sources()
1126 .disable_clock_snapshotting()) {
1127 SnapshotClocks(&tracing_session->initial_clock_snapshot);
1128 }
1129
1130 // We don't snapshot the clocks here because we just did this above.
1131 SnapshotLifecyleEvent(
1132 tracing_session,
1133 protos::pbzero::TracingServiceEvent::kTracingStartedFieldNumber,
1134 false /* snapshot_clocks */);
1135
1136 // Periodically snapshot clocks, stats, sync markers while the trace is
1137 // active. The snapshots are emitted on the future ReadBuffers() calls, which
1138 // means that:
1139 // (a) If we're streaming to a file (or to a consumer) while tracing, we
1140 // write snapshots periodically into the trace.
1141 // (b) If ReadBuffers() is only called after tracing ends, we emit the latest
1142 // snapshot into the trace. For clock snapshots, we keep track of the
1143 // snapshot recorded at the beginning of the session
1144 // (initial_clock_snapshot above), as well as the most recent sampled
1145 // snapshots that showed significant new drift between different clocks.
1146 // The latter clock snapshots are sampled periodically and at lifecycle
1147 // events.
1148 base::PeriodicTask::Args snapshot_task_args;
1149 snapshot_task_args.start_first_task_immediately = true;
1150 snapshot_task_args.use_suspend_aware_timer =
1151 tracing_session->config.builtin_data_sources()
1152 .prefer_suspend_clock_for_snapshot();
1153 snapshot_task_args.task = [weak_this, tsid] {
1154 if (weak_this)
1155 weak_this->PeriodicSnapshotTask(tsid);
1156 };
1157 snapshot_task_args.period_ms =
1158 tracing_session->config.builtin_data_sources().snapshot_interval_ms();
1159 if (!snapshot_task_args.period_ms)
1160 snapshot_task_args.period_ms = kDefaultSnapshotsIntervalMs;
1161 tracing_session->snapshot_periodic_task.Start(snapshot_task_args);
1162
1163 // Trigger delayed task if the trace is time limited.
1164 const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
1165 if (trace_duration_ms > 0) {
1166 task_runner_->PostDelayedTask(
1167 [weak_this, tsid] {
1168 // Skip entirely the flush if the trace session doesn't exist anymore.
1169 // This is to prevent misleading error messages to be logged.
1170 if (!weak_this)
1171 return;
1172 auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
1173 if (!tracing_session_ptr)
1174 return;
1175 // If this trace was using STOP_TRACING triggers and we've seen
1176 // one, then the trigger overrides the normal timeout. In this
1177 // case we just return and let the other task clean up this trace.
1178 if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
1179 TraceConfig::TriggerConfig::STOP_TRACING &&
1180 !tracing_session_ptr->received_triggers.empty())
1181 return;
1182 // In all other cases (START_TRACING or no triggers) we flush
1183 // after |trace_duration_ms| unconditionally.
1184 weak_this->FlushAndDisableTracing(tsid);
1185 },
1186 trace_duration_ms);
1187 }
1188
1189 // Start the periodic drain tasks if we should to save the trace into a file.
1190 if (tracing_session->config.write_into_file()) {
1191 task_runner_->PostDelayedTask(
1192 [weak_this, tsid] {
1193 if (weak_this)
1194 weak_this->ReadBuffersIntoFile(tsid);
1195 },
1196 tracing_session->delay_to_next_write_period_ms());
1197 }
1198
1199 // Start the periodic flush tasks if the config specified a flush period.
1200 if (tracing_session->config.flush_period_ms())
1201 PeriodicFlushTask(tsid, /*post_next_only=*/true);
1202
1203 // Start the periodic incremental state clear tasks if the config specified a
1204 // period.
1205 if (tracing_session->config.incremental_state_config().clear_period_ms()) {
1206 PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
1207 }
1208
1209 for (auto& kv : tracing_session->data_source_instances) {
1210 ProducerID producer_id = kv.first;
1211 DataSourceInstance& data_source = kv.second;
1212 ProducerEndpointImpl* producer = GetProducer(producer_id);
1213 if (!producer) {
1214 PERFETTO_DFATAL("Producer does not exist.");
1215 continue;
1216 }
1217 StartDataSourceInstance(producer, tracing_session, &data_source);
1218 }
1219
1220 MaybeNotifyAllDataSourcesStarted(tracing_session);
1221 return base::OkStatus();
1222 }
1223
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)1224 void TracingServiceImpl::StartDataSourceInstance(
1225 ProducerEndpointImpl* producer,
1226 TracingSession* tracing_session,
1227 TracingServiceImpl::DataSourceInstance* instance) {
1228 PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
1229 if (instance->will_notify_on_start) {
1230 instance->state = DataSourceInstance::STARTING;
1231 } else {
1232 instance->state = DataSourceInstance::STARTED;
1233 }
1234 if (tracing_session->consumer_maybe_null) {
1235 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1236 *producer, *instance);
1237 }
1238 producer->StartDataSource(instance->instance_id, instance->config);
1239
1240 // If all data sources are started, notify the consumer.
1241 if (instance->state == DataSourceInstance::STARTED)
1242 MaybeNotifyAllDataSourcesStarted(tracing_session);
1243 }
1244
1245 // DisableTracing just stops the data sources but doesn't free up any buffer.
1246 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1247 // and then drain the buffers. The actual teardown of the TracingSession happens
1248 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1249 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1250 bool disable_immediately) {
1251 PERFETTO_DCHECK_THREAD(thread_checker_);
1252 TracingSession* tracing_session = GetTracingSession(tsid);
1253 if (!tracing_session) {
1254 // Can happen if the consumer calls this before EnableTracing() or after
1255 // FreeBuffers().
1256 PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1257 return;
1258 }
1259
1260 MaybeLogUploadEvent(tracing_session->config,
1261 PerfettoStatsdAtom::kTracedDisableTracing);
1262
1263 switch (tracing_session->state) {
1264 // Spurious call to DisableTracing() while already disabled, nothing to do.
1265 case TracingSession::DISABLED:
1266 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1267 return;
1268
1269 // This is either:
1270 // A) The case of a graceful DisableTracing() call followed by a call to
1271 // FreeBuffers(), iff |disable_immediately| == true. In this case we want
1272 // to forcefully transition in the disabled state without waiting for the
1273 // outstanding acks because the buffers are going to be destroyed soon.
1274 // B) A spurious call, iff |disable_immediately| == false, in which case
1275 // there is nothing to do.
1276 case TracingSession::DISABLING_WAITING_STOP_ACKS:
1277 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1278 if (disable_immediately)
1279 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1280 return;
1281
1282 // Continues below.
1283 case TracingSession::CONFIGURED:
1284 // If the session didn't even start there is no need to orchestrate a
1285 // graceful stop of data sources.
1286 disable_immediately = true;
1287 break;
1288
1289 // This is the nominal case, continues below.
1290 case TracingSession::STARTED:
1291 break;
1292 }
1293
1294 for (auto& data_source_inst : tracing_session->data_source_instances) {
1295 const ProducerID producer_id = data_source_inst.first;
1296 DataSourceInstance& instance = data_source_inst.second;
1297 ProducerEndpointImpl* producer = GetProducer(producer_id);
1298 PERFETTO_DCHECK(producer);
1299 PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1300 instance.state == DataSourceInstance::STARTING ||
1301 instance.state == DataSourceInstance::STARTED);
1302 StopDataSourceInstance(producer, tracing_session, &instance,
1303 disable_immediately);
1304 }
1305
1306 // If the periodic task is running, we can stop the periodic snapshot timer
1307 // here instead of waiting until FreeBuffers to prevent useless snapshots
1308 // which won't be read.
1309 tracing_session->snapshot_periodic_task.Reset();
1310
1311 // Either this request is flagged with |disable_immediately| or there are no
1312 // data sources that are requesting a final handshake. In both cases just mark
1313 // the session as disabled immediately, notify the consumer and flush the
1314 // trace file (if used).
1315 if (tracing_session->AllDataSourceInstancesStopped())
1316 return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1317
1318 tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1319 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1320 task_runner_->PostDelayedTask(
1321 [weak_this, tsid] {
1322 if (weak_this)
1323 weak_this->OnDisableTracingTimeout(tsid);
1324 },
1325 tracing_session->data_source_stop_timeout_ms());
1326
1327 // Deliberately NOT removing the session from |tracing_session_|, it's still
1328 // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1329 }
1330
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1331 void TracingServiceImpl::NotifyDataSourceStarted(
1332 ProducerID producer_id,
1333 DataSourceInstanceID instance_id) {
1334 PERFETTO_DCHECK_THREAD(thread_checker_);
1335 for (auto& kv : tracing_sessions_) {
1336 TracingSession& tracing_session = kv.second;
1337 DataSourceInstance* instance =
1338 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1339
1340 if (!instance)
1341 continue;
1342
1343 // If the tracing session was already stopped, ignore this notification.
1344 if (tracing_session.state != TracingSession::STARTED)
1345 continue;
1346
1347 if (instance->state != DataSourceInstance::STARTING) {
1348 PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1349 instance->state);
1350 continue;
1351 }
1352
1353 instance->state = DataSourceInstance::STARTED;
1354
1355 ProducerEndpointImpl* producer = GetProducer(producer_id);
1356 PERFETTO_DCHECK(producer);
1357 if (tracing_session.consumer_maybe_null) {
1358 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1359 *producer, *instance);
1360 }
1361
1362 // If all data sources are started, notify the consumer.
1363 MaybeNotifyAllDataSourcesStarted(&tracing_session);
1364 } // for (tracing_session)
1365 }
1366
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1367 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1368 TracingSession* tracing_session) {
1369 if (!tracing_session->consumer_maybe_null)
1370 return;
1371
1372 if (!tracing_session->AllDataSourceInstancesStarted())
1373 return;
1374
1375 // In some rare cases, we can get in this state more than once. Consider the
1376 // following scenario: 3 data sources are registered -> trace starts ->
1377 // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1378 // Imagine now that a 4th data source registers while the trace is ongoing.
1379 // This would hit the AllDataSourceInstancesStarted() condition again.
1380 // In this case, however, we don't want to re-notify the consumer again.
1381 // That would be unexpected (even if, perhaps, technically correct) and
1382 // trigger bugs in the consumer.
1383 if (tracing_session->did_notify_all_data_source_started)
1384 return;
1385
1386 PERFETTO_DLOG("All data sources started");
1387
1388 SnapshotLifecyleEvent(
1389 tracing_session,
1390 protos::pbzero::TracingServiceEvent::kAllDataSourcesStartedFieldNumber,
1391 true /* snapshot_clocks */);
1392
1393 tracing_session->did_notify_all_data_source_started = true;
1394 tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1395 }
1396
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1397 void TracingServiceImpl::NotifyDataSourceStopped(
1398 ProducerID producer_id,
1399 DataSourceInstanceID instance_id) {
1400 PERFETTO_DCHECK_THREAD(thread_checker_);
1401 for (auto& kv : tracing_sessions_) {
1402 TracingSession& tracing_session = kv.second;
1403 DataSourceInstance* instance =
1404 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1405
1406 if (!instance)
1407 continue;
1408
1409 if (instance->state != DataSourceInstance::STOPPING) {
1410 PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1411 instance->state);
1412 continue;
1413 }
1414
1415 instance->state = DataSourceInstance::STOPPED;
1416
1417 ProducerEndpointImpl* producer = GetProducer(producer_id);
1418 PERFETTO_DCHECK(producer);
1419 if (tracing_session.consumer_maybe_null) {
1420 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1421 *producer, *instance);
1422 }
1423
1424 if (!tracing_session.AllDataSourceInstancesStopped())
1425 continue;
1426
1427 if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1428 continue;
1429
1430 // All data sources acked the termination.
1431 DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1432 } // for (tracing_session)
1433 }
1434
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1435 void TracingServiceImpl::ActivateTriggers(
1436 ProducerID producer_id,
1437 const std::vector<std::string>& triggers) {
1438 PERFETTO_DCHECK_THREAD(thread_checker_);
1439 auto* producer = GetProducer(producer_id);
1440 PERFETTO_DCHECK(producer);
1441
1442 int64_t now_ns = base::GetBootTimeNs().count();
1443 for (const auto& trigger_name : triggers) {
1444 PERFETTO_DLOG("Received ActivateTriggers request for \"%s\"",
1445 trigger_name.c_str());
1446 base::Hash hash;
1447 hash.Update(trigger_name.c_str(), trigger_name.size());
1448 std::string triggered_session_name;
1449 base::Uuid triggered_session_uuid;
1450 TracingSessionID triggered_session_id = 0;
1451 int trigger_mode = 0;
1452
1453 uint64_t trigger_name_hash = hash.digest();
1454 size_t count_in_window =
1455 PurgeExpiredAndCountTriggerInWindow(now_ns, trigger_name_hash);
1456
1457 bool trigger_matched = false;
1458 bool trigger_activated = false;
1459 for (auto& id_and_tracing_session : tracing_sessions_) {
1460 auto& tracing_session = id_and_tracing_session.second;
1461 TracingSessionID tsid = id_and_tracing_session.first;
1462 auto iter = std::find_if(
1463 tracing_session.config.trigger_config().triggers().begin(),
1464 tracing_session.config.trigger_config().triggers().end(),
1465 [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1466 return trigger.name() == trigger_name;
1467 });
1468 if (iter == tracing_session.config.trigger_config().triggers().end()) {
1469 continue;
1470 }
1471
1472 // If this trigger requires a certain producer to have sent it
1473 // (non-empty producer_name()) ensure the producer who sent this trigger
1474 // matches.
1475 if (!iter->producer_name_regex().empty() &&
1476 !std::regex_match(
1477 producer->name_,
1478 std::regex(iter->producer_name_regex(), std::regex::extended))) {
1479 continue;
1480 }
1481
1482 // Use a random number between 0 and 1 to check if we should allow this
1483 // trigger through or not.
1484 double trigger_rnd =
1485 trigger_rnd_override_for_testing_ > 0
1486 ? trigger_rnd_override_for_testing_
1487 : trigger_probability_dist_(trigger_probability_rand_);
1488 PERFETTO_DCHECK(trigger_rnd >= 0 && trigger_rnd < 1);
1489 if (trigger_rnd < iter->skip_probability()) {
1490 MaybeLogTriggerEvent(tracing_session.config,
1491 PerfettoTriggerAtom::kTracedLimitProbability,
1492 trigger_name);
1493 continue;
1494 }
1495
1496 // If we already triggered more times than the limit, silently ignore
1497 // this trigger.
1498 if (iter->max_per_24_h() > 0 && count_in_window >= iter->max_per_24_h()) {
1499 MaybeLogTriggerEvent(tracing_session.config,
1500 PerfettoTriggerAtom::kTracedLimitMaxPer24h,
1501 trigger_name);
1502 continue;
1503 }
1504 trigger_matched = true;
1505 triggered_session_id = tracing_session.id;
1506 triggered_session_name = tracing_session.config.unique_session_name();
1507 triggered_session_uuid.set_lsb_msb(
1508 tracing_session.config.trace_uuid_lsb(),
1509 tracing_session.config.trace_uuid_msb());
1510 trigger_mode = static_cast<int>(
1511 tracing_session.config.trigger_config().trigger_mode());
1512
1513 const bool triggers_already_received =
1514 !tracing_session.received_triggers.empty();
1515 tracing_session.received_triggers.push_back(
1516 {static_cast<uint64_t>(now_ns), iter->name(), producer->name_,
1517 producer->uid_});
1518 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1519 switch (tracing_session.config.trigger_config().trigger_mode()) {
1520 case TraceConfig::TriggerConfig::START_TRACING:
1521 // If the session has already been triggered and moved past
1522 // CONFIGURED then we don't need to repeat StartTracing. This would
1523 // work fine (StartTracing would return false) but would add error
1524 // logs.
1525 if (tracing_session.state != TracingSession::CONFIGURED)
1526 break;
1527
1528 trigger_activated = true;
1529 MaybeLogUploadEvent(tracing_session.config,
1530 PerfettoStatsdAtom::kTracedTriggerStartTracing,
1531 iter->name());
1532
1533 // We override the trace duration to be the trigger's requested
1534 // value, this ensures that the trace will end after this amount
1535 // of time has passed.
1536 tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1537 StartTracing(tsid);
1538 break;
1539 case TraceConfig::TriggerConfig::STOP_TRACING:
1540 // Only stop the trace once to avoid confusing log messages. I.E.
1541 // when we've already hit the first trigger we've already Posted the
1542 // task to FlushAndDisable. So all future triggers will just break
1543 // out.
1544 if (triggers_already_received)
1545 break;
1546
1547 trigger_activated = true;
1548 MaybeLogUploadEvent(tracing_session.config,
1549 PerfettoStatsdAtom::kTracedTriggerStopTracing,
1550 iter->name());
1551
1552 // Now that we've seen a trigger we need to stop, flush, and disable
1553 // this session after the configured |stop_delay_ms|.
1554 task_runner_->PostDelayedTask(
1555 [weak_this, tsid] {
1556 // Skip entirely the flush if the trace session doesn't exist
1557 // anymore. This is to prevent misleading error messages to be
1558 // logged.
1559 if (weak_this && weak_this->GetTracingSession(tsid))
1560 weak_this->FlushAndDisableTracing(tsid);
1561 },
1562 // If this trigger is zero this will immediately executable and
1563 // will happen shortly.
1564 iter->stop_delay_ms());
1565 break;
1566 case TraceConfig::TriggerConfig::UNSPECIFIED:
1567 PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1568 break;
1569 }
1570 } // for (.. : tracing_sessions_)
1571
1572 if (trigger_matched) {
1573 trigger_history_.emplace_back(TriggerHistory{now_ns, trigger_name_hash});
1574 }
1575
1576 if (trigger_activated) {
1577 // Log only the trigger that actually caused a trace stop/start, don't log
1578 // the follow-up ones, even if they matched.
1579 PERFETTO_LOG(
1580 "Trace trigger activated: trigger_name=\"%s\" trigger_mode=%d "
1581 "trace_name=\"%s\" trace_uuid=\"%s\" tsid=%" PRIu64,
1582 trigger_name.c_str(), trigger_mode, triggered_session_name.c_str(),
1583 triggered_session_uuid.ToPrettyString().c_str(),
1584 triggered_session_id);
1585 }
1586 } // for (trigger_name : triggers)
1587 }
1588
1589 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
1590 // conditions all data sources should have acked the stop and this will early
1591 // out.
OnDisableTracingTimeout(TracingSessionID tsid)1592 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1593 PERFETTO_DCHECK_THREAD(thread_checker_);
1594 TracingSession* tracing_session = GetTracingSession(tsid);
1595 if (!tracing_session ||
1596 tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1597 return; // Tracing session was successfully disabled.
1598 }
1599
1600 PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1601 tsid);
1602 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1603 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1604 }
1605
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1606 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1607 TracingSession* tracing_session) {
1608 PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1609 for (auto& inst_kv : tracing_session->data_source_instances) {
1610 if (inst_kv.second.state == DataSourceInstance::STOPPED)
1611 continue;
1612 inst_kv.second.state = DataSourceInstance::STOPPED;
1613 ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1614 PERFETTO_DCHECK(producer);
1615 if (tracing_session->consumer_maybe_null) {
1616 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1617 *producer, inst_kv.second);
1618 }
1619 }
1620 tracing_session->state = TracingSession::DISABLED;
1621
1622 // Scrape any remaining chunks that weren't flushed by the producers.
1623 for (auto& producer_id_and_producer : producers_)
1624 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1625
1626 SnapshotLifecyleEvent(
1627 tracing_session,
1628 protos::pbzero::TracingServiceEvent::kTracingDisabledFieldNumber,
1629 true /* snapshot_clocks */);
1630
1631 if (tracing_session->write_into_file) {
1632 tracing_session->write_period_ms = 0;
1633 ReadBuffersIntoFile(tracing_session->id);
1634 }
1635
1636 if (tracing_session->on_disable_callback_for_bugreport) {
1637 std::move(tracing_session->on_disable_callback_for_bugreport)();
1638 tracing_session->on_disable_callback_for_bugreport = nullptr;
1639 }
1640
1641 MaybeLogUploadEvent(tracing_session->config,
1642 PerfettoStatsdAtom::kTracedNotifyTracingDisabled);
1643
1644 if (tracing_session->consumer_maybe_null)
1645 tracing_session->consumer_maybe_null->NotifyOnTracingDisabled("");
1646 }
1647
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1648 void TracingServiceImpl::Flush(TracingSessionID tsid,
1649 uint32_t timeout_ms,
1650 ConsumerEndpoint::FlushCallback callback) {
1651 PERFETTO_DCHECK_THREAD(thread_checker_);
1652 TracingSession* tracing_session = GetTracingSession(tsid);
1653 if (!tracing_session) {
1654 PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1655 return;
1656 }
1657
1658 if (!timeout_ms)
1659 timeout_ms = tracing_session->flush_timeout_ms();
1660
1661 if (tracing_session->pending_flushes.size() > 1000) {
1662 PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1663 tracing_session->pending_flushes.size());
1664 callback(false);
1665 return;
1666 }
1667
1668 if (tracing_session->state != TracingSession::STARTED) {
1669 PERFETTO_ELOG("Flush() called, but tracing has not been started");
1670 callback(false);
1671 return;
1672 }
1673
1674 ++tracing_session->flushes_requested;
1675 FlushRequestID flush_request_id = ++last_flush_request_id_;
1676 PendingFlush& pending_flush =
1677 tracing_session->pending_flushes
1678 .emplace_hint(tracing_session->pending_flushes.end(),
1679 flush_request_id, PendingFlush(std::move(callback)))
1680 ->second;
1681
1682 // Send a flush request to each producer involved in the tracing session. In
1683 // order to issue a flush request we have to build a map of all data source
1684 // instance ids enabled for each producer.
1685 std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1686 for (const auto& data_source_inst : tracing_session->data_source_instances) {
1687 const ProducerID producer_id = data_source_inst.first;
1688 const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1689 flush_map[producer_id].push_back(ds_inst_id);
1690 }
1691
1692 for (const auto& kv : flush_map) {
1693 ProducerID producer_id = kv.first;
1694 ProducerEndpointImpl* producer = GetProducer(producer_id);
1695 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1696 producer->Flush(flush_request_id, data_sources);
1697 pending_flush.producers.insert(producer_id);
1698 }
1699
1700 // If there are no producers to flush (realistically this happens only in
1701 // some tests) fire OnFlushTimeout() straight away, without waiting.
1702 if (flush_map.empty())
1703 timeout_ms = 0;
1704
1705 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1706 task_runner_->PostDelayedTask(
1707 [weak_this, tsid, flush_request_id] {
1708 if (weak_this)
1709 weak_this->OnFlushTimeout(tsid, flush_request_id);
1710 },
1711 timeout_ms);
1712 }
1713
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1714 void TracingServiceImpl::NotifyFlushDoneForProducer(
1715 ProducerID producer_id,
1716 FlushRequestID flush_request_id) {
1717 for (auto& kv : tracing_sessions_) {
1718 // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1719 auto& pending_flushes = kv.second.pending_flushes;
1720 auto end_it = pending_flushes.upper_bound(flush_request_id);
1721 for (auto it = pending_flushes.begin(); it != end_it;) {
1722 PendingFlush& pending_flush = it->second;
1723 pending_flush.producers.erase(producer_id);
1724 if (pending_flush.producers.empty()) {
1725 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1726 TracingSessionID tsid = kv.first;
1727 auto callback = std::move(pending_flush.callback);
1728 task_runner_->PostTask([weak_this, tsid, callback]() {
1729 if (weak_this) {
1730 weak_this->CompleteFlush(tsid, std::move(callback),
1731 /*success=*/true);
1732 }
1733 });
1734 it = pending_flushes.erase(it);
1735 } else {
1736 it++;
1737 }
1738 } // for (pending_flushes)
1739 } // for (tracing_session)
1740 }
1741
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1742 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1743 FlushRequestID flush_request_id) {
1744 TracingSession* tracing_session = GetTracingSession(tsid);
1745 if (!tracing_session)
1746 return;
1747 auto it = tracing_session->pending_flushes.find(flush_request_id);
1748 if (it == tracing_session->pending_flushes.end())
1749 return; // Nominal case: flush was completed and acked on time.
1750
1751 // If there were no producers to flush, consider it a success.
1752 bool success = it->second.producers.empty();
1753 auto callback = std::move(it->second.callback);
1754 tracing_session->pending_flushes.erase(it);
1755 CompleteFlush(tsid, std::move(callback), success);
1756 }
1757
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1758 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1759 ConsumerEndpoint::FlushCallback callback,
1760 bool success) {
1761 TracingSession* tracing_session = GetTracingSession(tsid);
1762 if (!tracing_session) {
1763 callback(false);
1764 return;
1765 }
1766 // Producers may not have been able to flush all their data, even if they
1767 // indicated flush completion. If possible, also collect uncommitted chunks
1768 // to make sure we have everything they wrote so far.
1769 for (auto& producer_id_and_producer : producers_) {
1770 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1771 }
1772 SnapshotLifecyleEvent(
1773 tracing_session,
1774 protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
1775 true /* snapshot_clocks */);
1776
1777 tracing_session->flushes_succeeded += success ? 1 : 0;
1778 tracing_session->flushes_failed += success ? 0 : 1;
1779 callback(success);
1780 }
1781
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1782 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1783 TracingSession* tracing_session,
1784 ProducerEndpointImpl* producer) {
1785 if (!producer->smb_scraping_enabled_)
1786 return;
1787
1788 // Can't copy chunks if we don't know about any trace writers.
1789 if (producer->writers_.empty())
1790 return;
1791
1792 // Performance optimization: On flush or session disconnect, this method is
1793 // called for each producer. If the producer doesn't participate in the
1794 // session, there's no need to scape its chunks right now. We can tell if a
1795 // producer participates in the session by checking if the producer is allowed
1796 // to write into the session's log buffers.
1797 const auto& session_buffers = tracing_session->buffers_index;
1798 bool producer_in_session =
1799 std::any_of(session_buffers.begin(), session_buffers.end(),
1800 [producer](BufferID buffer_id) {
1801 return producer->allowed_target_buffers_.count(buffer_id);
1802 });
1803 if (!producer_in_session)
1804 return;
1805
1806 PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1807
1808 // Find and copy any uncommitted chunks from the SMB.
1809 //
1810 // In nominal conditions, the page layout of the used SMB pages should never
1811 // change because the service is the only one who is supposed to modify used
1812 // pages (to make them free again).
1813 //
1814 // However, the code here needs to deal with the case of a malicious producer
1815 // altering the SMB in unpredictable ways. Thankfully the SMB size is
1816 // immutable, so a chunk will always point to some valid memory, even if the
1817 // producer alters the intended layout and chunk header concurrently.
1818 // Ultimately a malicious producer altering the SMB's chunk layout while we
1819 // are iterating in this function is not any different from the case of a
1820 // malicious producer asking to commit a chunk made of random data, which is
1821 // something this class has to deal with regardless.
1822 //
1823 // The only legitimate mutations that can happen from sane producers,
1824 // concurrently to this function, are:
1825 // A. free pages being partitioned,
1826 // B. free chunks being migrated to kChunkBeingWritten,
1827 // C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1828
1829 SharedMemoryABI* abi = &producer->shmem_abi_;
1830 // num_pages() is immutable after the SMB is initialized and cannot be changed
1831 // even by a producer even if malicious.
1832 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1833 uint32_t layout = abi->GetPageLayout(page_idx);
1834
1835 uint32_t used_chunks = abi->GetUsedChunks(layout); // Returns a bitmap.
1836 // Skip empty pages.
1837 if (used_chunks == 0)
1838 continue;
1839
1840 // Scrape the chunks that are currently used. These should be either in
1841 // state kChunkBeingWritten or kChunkComplete.
1842 for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1843 if (!(used_chunks & 1))
1844 continue;
1845
1846 SharedMemoryABI::ChunkState state =
1847 SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1848 PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1849 state == SharedMemoryABI::kChunkComplete);
1850 bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1851
1852 SharedMemoryABI::Chunk chunk =
1853 abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1854
1855 uint16_t packet_count;
1856 uint8_t flags;
1857 // GetPacketCountAndFlags has acquire_load semantics.
1858 std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1859
1860 // It only makes sense to copy an incomplete chunk if there's at least
1861 // one full packet available. (The producer may not have completed the
1862 // last packet in it yet, so we need at least 2.)
1863 if (!chunk_complete && packet_count < 2)
1864 continue;
1865
1866 // At this point, it is safe to access the remaining header fields of
1867 // the chunk. Even if the chunk was only just transferred from
1868 // kChunkFree into kChunkBeingWritten state, the header should be
1869 // written completely once the packet count increased above 1 (it was
1870 // reset to 0 by the service when the chunk was freed).
1871
1872 WriterID writer_id = chunk.writer_id();
1873 base::Optional<BufferID> target_buffer_id =
1874 producer->buffer_id_for_writer(writer_id);
1875
1876 // We can only scrape this chunk if we know which log buffer to copy it
1877 // into.
1878 if (!target_buffer_id)
1879 continue;
1880
1881 // Skip chunks that don't belong to the requested tracing session.
1882 bool target_buffer_belongs_to_session =
1883 std::find(session_buffers.begin(), session_buffers.end(),
1884 *target_buffer_id) != session_buffers.end();
1885 if (!target_buffer_belongs_to_session)
1886 continue;
1887
1888 uint32_t chunk_id =
1889 chunk.header()->chunk_id.load(std::memory_order_relaxed);
1890
1891 CopyProducerPageIntoLogBuffer(
1892 producer->id_, producer->uid_, producer->pid_, writer_id, chunk_id,
1893 *target_buffer_id, packet_count, flags, chunk_complete,
1894 chunk.payload_begin(), chunk.payload_size());
1895 }
1896 }
1897 }
1898
FlushAndDisableTracing(TracingSessionID tsid)1899 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1900 PERFETTO_DCHECK_THREAD(thread_checker_);
1901 PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1902 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1903 Flush(tsid, 0, [weak_this, tsid](bool success) {
1904 // This was a DLOG up to Jun 2021 (v16, Android S).
1905 PERFETTO_LOG("FlushAndDisableTracing(%" PRIu64 ") done, success=%d", tsid,
1906 success);
1907 if (!weak_this)
1908 return;
1909 TracingSession* session = weak_this->GetTracingSession(tsid);
1910 session->final_flush_outcome = success ? TraceStats::FINAL_FLUSH_SUCCEEDED
1911 : TraceStats::FINAL_FLUSH_FAILED;
1912 if (session->consumer_maybe_null) {
1913 // If the consumer is still attached, just disable the session but give it
1914 // a chance to read the contents.
1915 weak_this->DisableTracing(tsid);
1916 } else {
1917 // If the consumer detached, destroy the session. If the consumer did
1918 // start the session in long-tracing mode, the service will have saved
1919 // the contents to the passed file. If not, the contents will be
1920 // destroyed.
1921 weak_this->FreeBuffers(tsid);
1922 }
1923 });
1924 }
1925
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1926 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1927 bool post_next_only) {
1928 PERFETTO_DCHECK_THREAD(thread_checker_);
1929 TracingSession* tracing_session = GetTracingSession(tsid);
1930 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1931 return;
1932
1933 uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1934 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1935 task_runner_->PostDelayedTask(
1936 [weak_this, tsid] {
1937 if (weak_this)
1938 weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1939 },
1940 flush_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
1941 flush_period_ms));
1942
1943 if (post_next_only)
1944 return;
1945
1946 PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1947 Flush(tsid, 0, [](bool success) {
1948 if (!success)
1949 PERFETTO_ELOG("Periodic flush timed out");
1950 });
1951 }
1952
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1953 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1954 TracingSessionID tsid,
1955 bool post_next_only) {
1956 PERFETTO_DCHECK_THREAD(thread_checker_);
1957 TracingSession* tracing_session = GetTracingSession(tsid);
1958 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1959 return;
1960
1961 uint32_t clear_period_ms =
1962 tracing_session->config.incremental_state_config().clear_period_ms();
1963 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1964 task_runner_->PostDelayedTask(
1965 [weak_this, tsid] {
1966 if (weak_this)
1967 weak_this->PeriodicClearIncrementalStateTask(
1968 tsid, /*post_next_only=*/false);
1969 },
1970 clear_period_ms - static_cast<uint32_t>(base::GetWallTimeMs().count() %
1971 clear_period_ms));
1972
1973 if (post_next_only)
1974 return;
1975
1976 PERFETTO_DLOG(
1977 "Performing periodic incremental state clear for trace session %" PRIu64,
1978 tsid);
1979
1980 // Queue the IPCs to producers with active data sources that opted in.
1981 std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1982 for (const auto& kv : tracing_session->data_source_instances) {
1983 ProducerID producer_id = kv.first;
1984 const DataSourceInstance& data_source = kv.second;
1985 if (data_source.handles_incremental_state_clear) {
1986 clear_map[producer_id].push_back(data_source.instance_id);
1987 }
1988 }
1989
1990 for (const auto& kv : clear_map) {
1991 ProducerID producer_id = kv.first;
1992 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1993 ProducerEndpointImpl* producer = GetProducer(producer_id);
1994 if (!producer) {
1995 PERFETTO_DFATAL("Producer does not exist.");
1996 continue;
1997 }
1998 producer->ClearIncrementalState(data_sources);
1999 }
2000 }
2001
ReadBuffersIntoConsumer(TracingSessionID tsid,ConsumerEndpointImpl * consumer)2002 bool TracingServiceImpl::ReadBuffersIntoConsumer(
2003 TracingSessionID tsid,
2004 ConsumerEndpointImpl* consumer) {
2005 PERFETTO_DCHECK(consumer);
2006 PERFETTO_DCHECK_THREAD(thread_checker_);
2007 TracingSession* tracing_session = GetTracingSession(tsid);
2008 if (!tracing_session) {
2009 PERFETTO_DLOG(
2010 "Cannot ReadBuffersIntoConsumer(): no tracing session is active");
2011 return false;
2012 }
2013
2014 if (tracing_session->write_into_file) {
2015 // If the consumer enabled tracing and asked to save the contents into the
2016 // passed file makes little sense to also try to read the buffers over IPC,
2017 // as that would just steal data from the periodic draining task.
2018 PERFETTO_ELOG("Consumer trying to read from write_into_file session.");
2019 return false;
2020 }
2021
2022 // If a bugreport request happened and the trace was stolen for that, give
2023 // an empty trace with a clear signal to the consumer. This deals only with
2024 // the case of readback-from-IPC. A similar code-path deals with the
2025 // write_into_file case in MaybeSaveTraceForBugreport().
2026 if (tracing_session->seized_for_bugreport) {
2027 std::vector<TracePacket> packets;
2028 if (!tracing_session->config.builtin_data_sources()
2029 .disable_service_events()) {
2030 EmitSeizedForBugreportLifecycleEvent(&packets);
2031 }
2032 EmitLifecycleEvents(tracing_session, &packets);
2033 consumer->consumer_->OnTraceData(std::move(packets), /*has_more=*/false);
2034 return true;
2035 }
2036
2037 if (IsWaitingForTrigger(tracing_session))
2038 return false;
2039
2040 // This is a rough threshold to determine how much to read from the buffer in
2041 // each task. This is to avoid executing a single huge sending task for too
2042 // long and risk to hit the watchdog. This is *not* an upper bound: we just
2043 // stop accumulating new packets and PostTask *after* we cross this threshold.
2044 // This constant essentially balances the PostTask and IPC overhead vs the
2045 // responsiveness of the service. An extremely small value will cause one IPC
2046 // and one PostTask for each slice but will keep the service extremely
2047 // responsive. An extremely large value will batch the send for the full
2048 // buffer in one large task, will hit the blocking send() once the socket
2049 // buffers are full and hang the service for a bit (until the consumer
2050 // catches up).
2051 static constexpr size_t kApproxBytesPerTask = 32768;
2052 bool has_more;
2053 std::vector<TracePacket> packets =
2054 ReadBuffers(tracing_session, kApproxBytesPerTask, &has_more);
2055
2056 if (has_more) {
2057 auto weak_consumer = consumer->weak_ptr_factory_.GetWeakPtr();
2058 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2059 task_runner_->PostTask([weak_this, weak_consumer, tsid] {
2060 if (!weak_this || !weak_consumer)
2061 return;
2062 weak_this->ReadBuffersIntoConsumer(tsid, weak_consumer.get());
2063 });
2064 }
2065
2066 // Keep this as tail call, just in case the consumer re-enters.
2067 consumer->consumer_->OnTraceData(std::move(packets), has_more);
2068 return true;
2069 }
2070
ReadBuffersIntoFile(TracingSessionID tsid)2071 bool TracingServiceImpl::ReadBuffersIntoFile(TracingSessionID tsid) {
2072 PERFETTO_DCHECK_THREAD(thread_checker_);
2073 TracingSession* tracing_session = GetTracingSession(tsid);
2074 if (!tracing_session) {
2075 // This will be hit systematically from the PostDelayedTask. Avoid logging,
2076 // it would be just spam.
2077 return false;
2078 }
2079
2080 // This can happen if the file is closed by a previous task because it reaches
2081 // |max_file_size_bytes|.
2082 if (!tracing_session->write_into_file)
2083 return false;
2084
2085 if (!tracing_session->seized_for_bugreport &&
2086 IsWaitingForTrigger(tracing_session))
2087 return false;
2088
2089 // ReadBuffers() can allocate memory internally, for filtering. By limiting
2090 // the data that ReadBuffers() reads to kWriteIntoChunksSize per iteration,
2091 // we limit the amount of memory used on each iteration.
2092 //
2093 // It would be tempting to split this into multiple tasks like in
2094 // ReadBuffersIntoConsumer, but that's not currently possible.
2095 // ReadBuffersIntoFile has to read the whole available data before returning,
2096 // to support the disable_immediately=true code paths.
2097 bool has_more = true;
2098 bool stop_writing_into_file = false;
2099 do {
2100 std::vector<TracePacket> packets =
2101 ReadBuffers(tracing_session, kWriteIntoFileChunkSize, &has_more);
2102
2103 stop_writing_into_file = WriteIntoFile(tracing_session, std::move(packets));
2104 } while (has_more && !stop_writing_into_file);
2105
2106 if (stop_writing_into_file || tracing_session->write_period_ms == 0) {
2107 // Ensure all data was written to the file before we close it.
2108 base::FlushFile(tracing_session->write_into_file.get());
2109 tracing_session->write_into_file.reset();
2110 tracing_session->write_period_ms = 0;
2111 if (tracing_session->state == TracingSession::STARTED)
2112 DisableTracing(tsid);
2113 return true;
2114 }
2115
2116 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2117 task_runner_->PostDelayedTask(
2118 [weak_this, tsid] {
2119 if (weak_this)
2120 weak_this->ReadBuffersIntoFile(tsid);
2121 },
2122 tracing_session->delay_to_next_write_period_ms());
2123 return true;
2124 }
2125
IsWaitingForTrigger(TracingSession * tracing_session)2126 bool TracingServiceImpl::IsWaitingForTrigger(TracingSession* tracing_session) {
2127 // When a tracing session is waiting for a trigger, it is considered empty. If
2128 // a tracing session finishes and moves into DISABLED without ever receiving a
2129 // trigger, the trace should never return any data. This includes the
2130 // synthetic packets like TraceConfig and Clock snapshots. So we bail out
2131 // early and let the consumer know there is no data.
2132 if (!tracing_session->config.trigger_config().triggers().empty() &&
2133 tracing_session->received_triggers.empty()) {
2134 PERFETTO_DLOG(
2135 "ReadBuffers(): tracing session has not received a trigger yet.");
2136 return true;
2137 }
2138 return false;
2139 }
2140
ReadBuffers(TracingSession * tracing_session,size_t threshold,bool * has_more)2141 std::vector<TracePacket> TracingServiceImpl::ReadBuffers(
2142 TracingSession* tracing_session,
2143 size_t threshold,
2144 bool* has_more) {
2145 PERFETTO_DCHECK_THREAD(thread_checker_);
2146 PERFETTO_DCHECK(tracing_session);
2147 *has_more = false;
2148
2149 std::vector<TracePacket> packets;
2150 packets.reserve(1024); // Just an educated guess to avoid trivial expansions.
2151
2152 if (!tracing_session->initial_clock_snapshot.empty()) {
2153 EmitClockSnapshot(tracing_session,
2154 std::move(tracing_session->initial_clock_snapshot),
2155 &packets);
2156 }
2157
2158 for (auto& snapshot : tracing_session->clock_snapshot_ring_buffer) {
2159 PERFETTO_DCHECK(!snapshot.empty());
2160 EmitClockSnapshot(tracing_session, std::move(snapshot), &packets);
2161 }
2162 tracing_session->clock_snapshot_ring_buffer.clear();
2163
2164 if (tracing_session->should_emit_sync_marker) {
2165 EmitSyncMarker(&packets);
2166 tracing_session->should_emit_sync_marker = false;
2167 }
2168
2169 if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
2170 MaybeEmitTraceConfig(tracing_session, &packets);
2171 MaybeEmitReceivedTriggers(tracing_session, &packets);
2172 }
2173 if (!tracing_session->config.builtin_data_sources().disable_system_info())
2174 MaybeEmitSystemInfo(tracing_session, &packets);
2175
2176 // Note that in the proto comment, we guarantee that the tracing_started
2177 // lifecycle event will be emitted before any data packets so make sure to
2178 // keep this before reading the tracing buffers.
2179 if (!tracing_session->config.builtin_data_sources().disable_service_events())
2180 EmitLifecycleEvents(tracing_session, &packets);
2181
2182 size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|).
2183
2184 // Add up size for packets added by the Maybe* calls above.
2185 for (const TracePacket& packet : packets) {
2186 packets_bytes += packet.size();
2187 }
2188
2189 bool did_hit_threshold = false;
2190
2191 for (size_t buf_idx = 0;
2192 buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
2193 buf_idx++) {
2194 auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
2195 if (tbuf_iter == buffers_.end()) {
2196 PERFETTO_DFATAL("Buffer not found.");
2197 continue;
2198 }
2199 TraceBuffer& tbuf = *tbuf_iter->second;
2200 tbuf.BeginRead();
2201 while (!did_hit_threshold) {
2202 TracePacket packet;
2203 TraceBuffer::PacketSequenceProperties sequence_properties{};
2204 bool previous_packet_dropped;
2205 if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
2206 &previous_packet_dropped)) {
2207 break;
2208 }
2209 PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
2210 PERFETTO_DCHECK(sequence_properties.writer_id != 0);
2211 PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
2212 // Not checking sequence_properties.producer_pid_trusted: it is
2213 // base::kInvalidPid if the platform doesn't support it.
2214
2215 PERFETTO_DCHECK(packet.size() > 0);
2216 if (!PacketStreamValidator::Validate(packet.slices())) {
2217 tracing_session->invalid_packets++;
2218 PERFETTO_DLOG("Dropping invalid packet");
2219 continue;
2220 }
2221
2222 // Append a slice with the trusted field data. This can't be spoofed
2223 // because above we validated that the existing slices don't contain any
2224 // trusted fields. For added safety we append instead of prepending
2225 // because according to protobuf semantics, if the same field is
2226 // encountered multiple times the last instance takes priority. Note that
2227 // truncated packets are also rejected, so the producer can't give us a
2228 // partial packet (e.g., a truncated string) which only becomes valid when
2229 // the trusted data is appended here.
2230 Slice slice = Slice::Allocate(32);
2231 protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
2232 slice.own_data(), slice.size);
2233 trusted_packet->set_trusted_uid(
2234 static_cast<int32_t>(sequence_properties.producer_uid_trusted));
2235 trusted_packet->set_trusted_packet_sequence_id(
2236 tracing_session->GetPacketSequenceID(
2237 sequence_properties.producer_id_trusted,
2238 sequence_properties.writer_id));
2239 if (sequence_properties.producer_pid_trusted != base::kInvalidPid) {
2240 // Not supported on all platforms.
2241 trusted_packet->set_trusted_pid(
2242 static_cast<int32_t>(sequence_properties.producer_pid_trusted));
2243 }
2244 if (previous_packet_dropped)
2245 trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
2246 slice.size = trusted_packet.Finalize();
2247 packet.AddSlice(std::move(slice));
2248
2249 // Append the packet (inclusive of the trusted uid) to |packets|.
2250 packets_bytes += packet.size();
2251 did_hit_threshold = packets_bytes >= threshold;
2252 packets.emplace_back(std::move(packet));
2253 } // for(packets...)
2254 } // for(buffers...)
2255
2256 *has_more = did_hit_threshold;
2257
2258 // Only emit the "read complete" lifetime event when there is no more trace
2259 // data available to read. These events are used as safe points to limit
2260 // sorting in trace processor: the code shouldn't emit the event unless the
2261 // buffers are empty.
2262 if (!*has_more && !tracing_session->config.builtin_data_sources()
2263 .disable_service_events()) {
2264 // We don't bother snapshotting clocks here because we wouldn't be able to
2265 // emit it and we shouldn't have significant drift from the last snapshot in
2266 // any case.
2267 SnapshotLifecyleEvent(tracing_session,
2268 protos::pbzero::TracingServiceEvent::
2269 kReadTracingBuffersCompletedFieldNumber,
2270 false /* snapshot_clocks */);
2271 EmitLifecycleEvents(tracing_session, &packets);
2272 }
2273
2274 // Only emit the stats when there is no more trace data is available to read.
2275 // That way, any problems that occur while reading from the buffers are
2276 // reflected in the emitted stats. This is particularly important for use
2277 // cases where ReadBuffers is only ever called after the tracing session is
2278 // stopped.
2279 if (!*has_more && tracing_session->should_emit_stats) {
2280 EmitStats(tracing_session, &packets);
2281 tracing_session->should_emit_stats = false;
2282 }
2283
2284 MaybeFilterPackets(tracing_session, &packets);
2285
2286 if (!*has_more) {
2287 // We've observed some extremely high memory usage by scudo after
2288 // MaybeFilterPackets in the past. The original bug (b/195145848) is fixed
2289 // now, but this code asks scudo to release memory just in case.
2290 base::MaybeReleaseAllocatorMemToOS();
2291 }
2292
2293 return packets;
2294 }
2295
MaybeFilterPackets(TracingSession * tracing_session,std::vector<TracePacket> * packets)2296 void TracingServiceImpl::MaybeFilterPackets(TracingSession* tracing_session,
2297 std::vector<TracePacket>* packets) {
2298 // If the tracing session specified a filter, run all packets through the
2299 // filter and replace them with the filter results.
2300 // The process below mantains the cardinality of input packets. Even if an
2301 // entire packet is filtered out, we emit a zero-sized TracePacket proto. That
2302 // makes debugging and reasoning about the trace stats easier.
2303 // This place swaps the contents of each |packets| entry in place.
2304 if (!tracing_session->trace_filter) {
2305 return;
2306 }
2307 protozero::MessageFilter& trace_filter = *tracing_session->trace_filter;
2308 // The filter root shoud be reset from protos.Trace to protos.TracePacket
2309 // by the earlier call to SetFilterRoot() in EnableTracing().
2310 PERFETTO_DCHECK(trace_filter.root_msg_index() != 0);
2311 std::vector<protozero::MessageFilter::InputSlice> filter_input;
2312 for (TracePacket& packet : *packets) {
2313 const auto& packet_slices = packet.slices();
2314 filter_input.clear();
2315 filter_input.resize(packet_slices.size());
2316 ++tracing_session->filter_input_packets;
2317 tracing_session->filter_input_bytes += packet.size();
2318 for (size_t i = 0; i < packet_slices.size(); ++i)
2319 filter_input[i] = {packet_slices[i].start, packet_slices[i].size};
2320 auto filtered_packet = trace_filter.FilterMessageFragments(
2321 &filter_input[0], filter_input.size());
2322
2323 // Replace the packet in-place with the filtered one (unless failed).
2324 packet = TracePacket();
2325 if (filtered_packet.error) {
2326 ++tracing_session->filter_errors;
2327 PERFETTO_DLOG("Trace packet filtering failed @ packet %" PRIu64,
2328 tracing_session->filter_input_packets);
2329 continue;
2330 }
2331 tracing_session->filter_output_bytes += filtered_packet.size;
2332 AppendOwnedSlicesToPacket(std::move(filtered_packet.data),
2333 filtered_packet.size, kMaxTracePacketSliceSize,
2334 &packet);
2335 }
2336 }
2337
WriteIntoFile(TracingSession * tracing_session,std::vector<TracePacket> packets)2338 bool TracingServiceImpl::WriteIntoFile(TracingSession* tracing_session,
2339 std::vector<TracePacket> packets) {
2340 if (!tracing_session->write_into_file) {
2341 return false;
2342 }
2343 const uint64_t max_size = tracing_session->max_file_size_bytes
2344 ? tracing_session->max_file_size_bytes
2345 : std::numeric_limits<size_t>::max();
2346
2347 size_t total_slices = 0;
2348 for (const TracePacket& packet : packets) {
2349 total_slices += packet.slices().size();
2350 }
2351 // When writing into a file, the file should look like a root trace.proto
2352 // message. Each packet should be prepended with a proto preamble stating
2353 // its field id (within trace.proto) and size. Hence the addition below.
2354 const size_t max_iovecs = total_slices + packets.size();
2355
2356 size_t num_iovecs = 0;
2357 bool stop_writing_into_file = false;
2358 std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
2359 size_t num_iovecs_at_last_packet = 0;
2360 uint64_t bytes_about_to_be_written = 0;
2361 for (TracePacket& packet : packets) {
2362 std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
2363 packet.GetProtoPreamble();
2364 bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
2365 num_iovecs++;
2366 for (const Slice& slice : packet.slices()) {
2367 // writev() doesn't change the passed pointer. However, struct iovec
2368 // take a non-const ptr because it's the same struct used by readv().
2369 // Hence the const_cast here.
2370 char* start = static_cast<char*>(const_cast<void*>(slice.start));
2371 bytes_about_to_be_written += slice.size;
2372 iovecs[num_iovecs++] = {start, slice.size};
2373 }
2374
2375 if (tracing_session->bytes_written_into_file + bytes_about_to_be_written >=
2376 max_size) {
2377 stop_writing_into_file = true;
2378 num_iovecs = num_iovecs_at_last_packet;
2379 break;
2380 }
2381
2382 num_iovecs_at_last_packet = num_iovecs;
2383 }
2384 PERFETTO_DCHECK(num_iovecs <= max_iovecs);
2385 int fd = *tracing_session->write_into_file;
2386
2387 uint64_t total_wr_size = 0;
2388
2389 // writev() can take at most IOV_MAX entries per call. Batch them.
2390 constexpr size_t kIOVMax = IOV_MAX;
2391 for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
2392 int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
2393 ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
2394 if (wr_size <= 0) {
2395 PERFETTO_PLOG("writev() failed");
2396 stop_writing_into_file = true;
2397 break;
2398 }
2399 total_wr_size += static_cast<size_t>(wr_size);
2400 }
2401
2402 tracing_session->bytes_written_into_file += total_wr_size;
2403
2404 PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
2405 (total_wr_size + 1023) / 1024, stop_writing_into_file);
2406 return stop_writing_into_file;
2407 }
2408
FreeBuffers(TracingSessionID tsid)2409 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
2410 PERFETTO_DCHECK_THREAD(thread_checker_);
2411 PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
2412 TracingSession* tracing_session = GetTracingSession(tsid);
2413 if (!tracing_session) {
2414 PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
2415 return; // TODO(primiano): signal failure?
2416 }
2417 DisableTracing(tsid, /*disable_immediately=*/true);
2418
2419 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
2420 tracing_session->data_source_instances.clear();
2421
2422 for (auto& producer_entry : producers_) {
2423 ProducerEndpointImpl* producer = producer_entry.second;
2424 producer->OnFreeBuffers(tracing_session->buffers_index);
2425 }
2426
2427 for (BufferID buffer_id : tracing_session->buffers_index) {
2428 buffer_ids_.Free(buffer_id);
2429 PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
2430 buffers_.erase(buffer_id);
2431 }
2432 bool notify_traceur = tracing_session->config.notify_traceur();
2433 bool is_long_trace =
2434 (tracing_session->config.write_into_file() &&
2435 tracing_session->config.file_write_period_ms() < kMillisPerDay);
2436 bool seized_for_bugreport = tracing_session->seized_for_bugreport;
2437 tracing_sessions_.erase(tsid);
2438 tracing_session = nullptr;
2439 UpdateMemoryGuardrail();
2440
2441 PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
2442 tracing_sessions_.size());
2443
2444 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD) && \
2445 PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2446 if (notify_traceur && (seized_for_bugreport || is_long_trace)) {
2447 PERFETTO_LAZY_LOAD(android_internal::NotifyTraceSessionEnded, notify_fn);
2448 if (!notify_fn || !notify_fn(seized_for_bugreport))
2449 PERFETTO_ELOG("Failed to notify Traceur long tracing has ended");
2450 }
2451 #else
2452 base::ignore_result(notify_traceur);
2453 base::ignore_result(is_long_trace);
2454 base::ignore_result(seized_for_bugreport);
2455 #endif
2456 }
2457
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)2458 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
2459 const DataSourceDescriptor& desc) {
2460 PERFETTO_DCHECK_THREAD(thread_checker_);
2461 if (desc.name().empty()) {
2462 PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2463 return;
2464 }
2465
2466 ProducerEndpointImpl* producer = GetProducer(producer_id);
2467 if (!producer) {
2468 PERFETTO_DFATAL("Producer not found.");
2469 return;
2470 }
2471
2472 // Check that the producer doesn't register two data sources with the same ID.
2473 // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
2474 // field didn't exist.
2475 for (const auto& kv : data_sources_) {
2476 if (desc.id() && kv.second.producer_id == producer_id &&
2477 kv.second.descriptor.id() == desc.id()) {
2478 PERFETTO_ELOG(
2479 "Failed to register data source \"%s\". A data source with the same "
2480 "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
2481 desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
2482 producer_id);
2483 return;
2484 }
2485 }
2486
2487 PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
2488 producer_id, desc.name().c_str());
2489
2490 auto reg_ds = data_sources_.emplace(desc.name(),
2491 RegisteredDataSource{producer_id, desc});
2492
2493 // If there are existing tracing sessions, we need to check if the new
2494 // data source is enabled by any of them.
2495 for (auto& iter : tracing_sessions_) {
2496 TracingSession& tracing_session = iter.second;
2497 if (tracing_session.state != TracingSession::STARTED &&
2498 tracing_session.state != TracingSession::CONFIGURED) {
2499 continue;
2500 }
2501
2502 TraceConfig::ProducerConfig producer_config;
2503 for (auto& config : tracing_session.config.producers()) {
2504 if (producer->name_ == config.producer_name()) {
2505 producer_config = config;
2506 break;
2507 }
2508 }
2509 for (const TraceConfig::DataSource& cfg_data_source :
2510 tracing_session.config.data_sources()) {
2511 if (cfg_data_source.config().name() != desc.name())
2512 continue;
2513 DataSourceInstance* ds_inst = SetupDataSource(
2514 cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2515 if (ds_inst && tracing_session.state == TracingSession::STARTED)
2516 StartDataSourceInstance(producer, &tracing_session, ds_inst);
2517 }
2518 } // for(iter : tracing_sessions_)
2519 }
2520
UpdateDataSource(ProducerID producer_id,const DataSourceDescriptor & new_desc)2521 void TracingServiceImpl::UpdateDataSource(
2522 ProducerID producer_id,
2523 const DataSourceDescriptor& new_desc) {
2524 if (new_desc.id() == 0) {
2525 PERFETTO_ELOG("UpdateDataSource() must have a non-zero id");
2526 return;
2527 }
2528
2529 // If this producer has already registered a matching descriptor name and id,
2530 // just update the descriptor.
2531 RegisteredDataSource* data_source = nullptr;
2532 auto range = data_sources_.equal_range(new_desc.name());
2533 for (auto it = range.first; it != range.second; ++it) {
2534 if (it->second.producer_id == producer_id &&
2535 it->second.descriptor.id() == new_desc.id()) {
2536 data_source = &it->second;
2537 break;
2538 }
2539 }
2540
2541 if (!data_source) {
2542 PERFETTO_ELOG(
2543 "UpdateDataSource() failed, could not find an existing data source "
2544 "with name=\"%s\" id=%" PRIu64,
2545 new_desc.name().c_str(), new_desc.id());
2546 return;
2547 }
2548
2549 data_source->descriptor = new_desc;
2550 }
2551
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2552 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2553 TracingSession* tracing_session,
2554 DataSourceInstance* instance,
2555 bool disable_immediately) {
2556 const DataSourceInstanceID ds_inst_id = instance->instance_id;
2557 if (instance->will_notify_on_stop && !disable_immediately) {
2558 instance->state = DataSourceInstance::STOPPING;
2559 } else {
2560 instance->state = DataSourceInstance::STOPPED;
2561 }
2562 if (tracing_session->consumer_maybe_null) {
2563 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2564 *producer, *instance);
2565 }
2566 producer->StopDataSource(ds_inst_id);
2567 }
2568
UnregisterDataSource(ProducerID producer_id,const std::string & name)2569 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2570 const std::string& name) {
2571 PERFETTO_DCHECK_THREAD(thread_checker_);
2572 PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2573 producer_id, name.c_str());
2574 PERFETTO_CHECK(producer_id);
2575 ProducerEndpointImpl* producer = GetProducer(producer_id);
2576 PERFETTO_DCHECK(producer);
2577 for (auto& kv : tracing_sessions_) {
2578 auto& ds_instances = kv.second.data_source_instances;
2579 bool removed = false;
2580 for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2581 if (it->first == producer_id && it->second.data_source_name == name) {
2582 DataSourceInstanceID ds_inst_id = it->second.instance_id;
2583 if (it->second.state != DataSourceInstance::STOPPED) {
2584 if (it->second.state != DataSourceInstance::STOPPING) {
2585 StopDataSourceInstance(producer, &kv.second, &it->second,
2586 /* disable_immediately = */ false);
2587 }
2588
2589 // Mark the instance as stopped immediately, since we are
2590 // unregistering it below.
2591 //
2592 // The StopDataSourceInstance above might have set the state to
2593 // STOPPING so this condition isn't an else.
2594 if (it->second.state == DataSourceInstance::STOPPING)
2595 NotifyDataSourceStopped(producer_id, ds_inst_id);
2596 }
2597 it = ds_instances.erase(it);
2598 removed = true;
2599 } else {
2600 ++it;
2601 }
2602 } // for (data_source_instances)
2603 if (removed)
2604 MaybeNotifyAllDataSourcesStarted(&kv.second);
2605 } // for (tracing_session)
2606
2607 for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2608 if (it->second.producer_id == producer_id &&
2609 it->second.descriptor.name() == name) {
2610 data_sources_.erase(it);
2611 return;
2612 }
2613 }
2614
2615 PERFETTO_DFATAL(
2616 "Tried to unregister a non-existent data source \"%s\" for "
2617 "producer %" PRIu16,
2618 name.c_str(), producer_id);
2619 }
2620
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2621 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2622 const TraceConfig::DataSource& cfg_data_source,
2623 const TraceConfig::ProducerConfig& producer_config,
2624 const RegisteredDataSource& data_source,
2625 TracingSession* tracing_session) {
2626 PERFETTO_DCHECK_THREAD(thread_checker_);
2627 ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2628 PERFETTO_DCHECK(producer);
2629 // An existing producer that is not ftrace could have registered itself as
2630 // ftrace, we must not enable it in that case.
2631 if (lockdown_mode_ && producer->uid_ != uid_) {
2632 PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2633 return nullptr;
2634 }
2635 // TODO(primiano): Add tests for registration ordering (data sources vs
2636 // consumers).
2637 if (!NameMatchesFilter(producer->name_,
2638 cfg_data_source.producer_name_filter(),
2639 cfg_data_source.producer_name_regex_filter())) {
2640 PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2641 cfg_data_source.config().name().c_str(),
2642 producer->name_.c_str());
2643 return nullptr;
2644 }
2645
2646 auto relative_buffer_id = cfg_data_source.config().target_buffer();
2647 if (relative_buffer_id >= tracing_session->num_buffers()) {
2648 PERFETTO_LOG(
2649 "The TraceConfig for DataSource %s specified a target_buffer out of "
2650 "bound (%d). Skipping it.",
2651 cfg_data_source.config().name().c_str(), relative_buffer_id);
2652 return nullptr;
2653 }
2654
2655 // Create a copy of the DataSourceConfig specified in the trace config. This
2656 // will be passed to the producer after translating the |target_buffer| id.
2657 // The |target_buffer| parameter passed by the consumer in the trace config is
2658 // relative to the buffers declared in the same trace config. This has to be
2659 // translated to the global BufferID before passing it to the producers, which
2660 // don't know anything about tracing sessions and consumers.
2661
2662 DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2663 auto insert_iter = tracing_session->data_source_instances.emplace(
2664 std::piecewise_construct, //
2665 std::forward_as_tuple(producer->id_),
2666 std::forward_as_tuple(
2667 inst_id,
2668 cfg_data_source.config(), // Deliberate copy.
2669 data_source.descriptor.name(),
2670 data_source.descriptor.will_notify_on_start(),
2671 data_source.descriptor.will_notify_on_stop(),
2672 data_source.descriptor.handles_incremental_state_clear()));
2673 DataSourceInstance* ds_instance = &insert_iter->second;
2674
2675 // New data source instance starts out in CONFIGURED state.
2676 if (tracing_session->consumer_maybe_null) {
2677 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2678 *producer, *ds_instance);
2679 }
2680
2681 DataSourceConfig& ds_config = ds_instance->config;
2682 ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2683 ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2684 ds_config.set_enable_extra_guardrails(
2685 tracing_session->config.enable_extra_guardrails());
2686 if (tracing_session->consumer_uid == 1066 /* AID_STATSD */ &&
2687 tracing_session->config.statsd_metadata().triggering_config_uid() !=
2688 2000 /* AID_SHELL */
2689 && tracing_session->config.statsd_metadata().triggering_config_uid() !=
2690 0 /* AID_ROOT */) {
2691 // StatsD can be triggered either by shell, root or an app that has DUMP and
2692 // USAGE_STATS permission. When triggered by shell or root, we do not want
2693 // to consider the trace a trusted system trace, as it was initiated by the
2694 // user. Otherwise, it has to come from an app with DUMP and
2695 // PACKAGE_USAGE_STATS, which has to be preinstalled and trusted by the
2696 // system.
2697 // Check for shell / root: https://bit.ly/3b7oZNi
2698 // Check for DUMP or PACKAGE_USAGE_STATS: https://bit.ly/3ep0NrR
2699 ds_config.set_session_initiator(
2700 DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM);
2701 } else {
2702 // Unset in case the consumer set it.
2703 // We need to be able to trust this field.
2704 ds_config.set_session_initiator(
2705 DataSourceConfig::SESSION_INITIATOR_UNSPECIFIED);
2706 }
2707 ds_config.set_tracing_session_id(tracing_session->id);
2708 BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2709 PERFETTO_DCHECK(global_id);
2710 ds_config.set_target_buffer(global_id);
2711
2712 PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2713 ds_config.name().c_str(), global_id);
2714 if (!producer->shared_memory()) {
2715 // Determine the SMB page size. Must be an integer multiple of 4k.
2716 // As for the SMB size below, the decision tree is as follows:
2717 // 1. Give priority to what is defined in the trace config.
2718 // 2. If unset give priority to the hint passed by the producer.
2719 // 3. Keep within bounds and ensure it's a multiple of 4k.
2720 size_t page_size = producer_config.page_size_kb() * 1024;
2721 if (page_size == 0)
2722 page_size = producer->shmem_page_size_hint_bytes_;
2723
2724 // Determine the SMB size. Must be an integer multiple of the SMB page size.
2725 // The decision tree is as follows:
2726 // 1. Give priority to what defined in the trace config.
2727 // 2. If unset give priority to the hint passed by the producer.
2728 // 3. Keep within bounds and ensure it's a multiple of the page size.
2729 size_t shm_size = producer_config.shm_size_kb() * 1024;
2730 if (shm_size == 0)
2731 shm_size = producer->shmem_size_hint_bytes_;
2732
2733 auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2734 if (valid_sizes != std::tie(shm_size, page_size)) {
2735 PERFETTO_DLOG(
2736 "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2737 "back to shm_size %zu page_size %zu.",
2738 shm_size, page_size, std::get<0>(valid_sizes),
2739 std::get<1>(valid_sizes));
2740 }
2741 std::tie(shm_size, page_size) = valid_sizes;
2742
2743 // TODO(primiano): right now Create() will suicide in case of OOM if the
2744 // mmap fails. We should instead gracefully fail the request and tell the
2745 // client to go away.
2746 PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2747 producer->name_.c_str());
2748 auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2749 producer->SetupSharedMemory(std::move(shared_memory), page_size,
2750 /*provided_by_producer=*/false);
2751 }
2752 producer->SetupDataSource(inst_id, ds_config);
2753 return ds_instance;
2754 }
2755
2756 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2757 // might be lying / returning garbage contents. |src| and |size| can be trusted
2758 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,pid_t producer_pid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2759 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2760 ProducerID producer_id_trusted,
2761 uid_t producer_uid_trusted,
2762 pid_t producer_pid_trusted,
2763 WriterID writer_id,
2764 ChunkID chunk_id,
2765 BufferID buffer_id,
2766 uint16_t num_fragments,
2767 uint8_t chunk_flags,
2768 bool chunk_complete,
2769 const uint8_t* src,
2770 size_t size) {
2771 PERFETTO_DCHECK_THREAD(thread_checker_);
2772
2773 ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2774 if (!producer) {
2775 PERFETTO_DFATAL("Producer not found.");
2776 chunks_discarded_++;
2777 return;
2778 }
2779
2780 TraceBuffer* buf = GetBufferByID(buffer_id);
2781 if (!buf) {
2782 PERFETTO_DLOG("Could not find target buffer %" PRIu16
2783 " for producer %" PRIu16,
2784 buffer_id, producer_id_trusted);
2785 chunks_discarded_++;
2786 return;
2787 }
2788
2789 // Verify that the producer is actually allowed to write into the target
2790 // buffer specified in the request. This prevents a malicious producer from
2791 // injecting data into a log buffer that belongs to a tracing session the
2792 // producer is not part of.
2793 if (!producer->is_allowed_target_buffer(buffer_id)) {
2794 PERFETTO_ELOG("Producer %" PRIu16
2795 " tried to write into forbidden target buffer %" PRIu16,
2796 producer_id_trusted, buffer_id);
2797 PERFETTO_DFATAL("Forbidden target buffer");
2798 chunks_discarded_++;
2799 return;
2800 }
2801
2802 // If the writer was registered by the producer, it should only write into the
2803 // buffer it was registered with.
2804 base::Optional<BufferID> associated_buffer =
2805 producer->buffer_id_for_writer(writer_id);
2806 if (associated_buffer && *associated_buffer != buffer_id) {
2807 PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2808 " was registered to write into target buffer %" PRIu16
2809 ", but tried to write into buffer %" PRIu16,
2810 writer_id, producer_id_trusted, *associated_buffer,
2811 buffer_id);
2812 PERFETTO_DFATAL("Wrong target buffer");
2813 chunks_discarded_++;
2814 return;
2815 }
2816
2817 buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted,
2818 producer_pid_trusted, writer_id, chunk_id,
2819 num_fragments, chunk_flags, chunk_complete, src,
2820 size);
2821 }
2822
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2823 void TracingServiceImpl::ApplyChunkPatches(
2824 ProducerID producer_id_trusted,
2825 const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2826 PERFETTO_DCHECK_THREAD(thread_checker_);
2827
2828 for (const auto& chunk : chunks_to_patch) {
2829 const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2830 const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2831 TraceBuffer* buf =
2832 GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2833 static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2834 "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2835 if (!writer_id || writer_id > kMaxWriterID || !buf) {
2836 // This can genuinely happen when the trace is stopped. The producers
2837 // might see the stop signal with some delay and try to keep sending
2838 // patches left soon after.
2839 PERFETTO_DLOG(
2840 "Received invalid chunks_to_patch request from Producer: %" PRIu16
2841 ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2842 producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2843 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2844 continue;
2845 }
2846
2847 // Note, there's no need to validate that the producer is allowed to write
2848 // to the specified buffer ID (or that it's the correct buffer ID for a
2849 // registered TraceWriter). That's because TraceBuffer uses the producer ID
2850 // and writer ID to look up the chunk to patch. If the producer specifies an
2851 // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2852 // patches. Because the producer ID is trusted, there's also no way for a
2853 // malicious producer to patch another producer's data.
2854
2855 // Speculate on the fact that there are going to be a limited amount of
2856 // patches per request, so we can allocate the |patches| array on the stack.
2857 std::array<TraceBuffer::Patch, 1024> patches; // Uninitialized.
2858 if (chunk.patches().size() > patches.size()) {
2859 PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2860 patches.size());
2861 PERFETTO_DFATAL("Too many patches");
2862 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2863 continue;
2864 }
2865
2866 size_t i = 0;
2867 for (const auto& patch : chunk.patches()) {
2868 const std::string& patch_data = patch.data();
2869 if (patch_data.size() != patches[i].data.size()) {
2870 PERFETTO_ELOG("Received patch from producer: %" PRIu16
2871 " of unexpected size %zu",
2872 producer_id_trusted, patch_data.size());
2873 patches_discarded_++;
2874 continue;
2875 }
2876 patches[i].offset_untrusted = patch.offset();
2877 memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
2878 i++;
2879 }
2880 buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
2881 &patches[0], i, chunk.has_more_patches());
2882 }
2883 }
2884
GetDetachedSession(uid_t uid,const std::string & key)2885 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
2886 uid_t uid,
2887 const std::string& key) {
2888 PERFETTO_DCHECK_THREAD(thread_checker_);
2889 for (auto& kv : tracing_sessions_) {
2890 TracingSession* session = &kv.second;
2891 if (session->consumer_uid == uid && session->detach_key == key) {
2892 PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
2893 return session;
2894 }
2895 }
2896 return nullptr;
2897 }
2898
GetTracingSession(TracingSessionID tsid)2899 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2900 TracingSessionID tsid) {
2901 PERFETTO_DCHECK_THREAD(thread_checker_);
2902 auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2903 if (it == tracing_sessions_.end())
2904 return nullptr;
2905 return &it->second;
2906 }
2907
GetNextProducerID()2908 ProducerID TracingServiceImpl::GetNextProducerID() {
2909 PERFETTO_DCHECK_THREAD(thread_checker_);
2910 PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2911 do {
2912 ++last_producer_id_;
2913 } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2914 PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2915 return last_producer_id_;
2916 }
2917
GetBufferByID(BufferID buffer_id)2918 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2919 auto buf_iter = buffers_.find(buffer_id);
2920 if (buf_iter == buffers_.end())
2921 return nullptr;
2922 return &*buf_iter->second;
2923 }
2924
OnStartTriggersTimeout(TracingSessionID tsid)2925 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2926 // Skip entirely the flush if the trace session doesn't exist anymore.
2927 // This is to prevent misleading error messages to be logged.
2928 //
2929 // if the trace has started from the trigger we rely on
2930 // the |stop_delay_ms| from the trigger so don't flush and
2931 // disable if we've moved beyond a CONFIGURED state
2932 auto* tracing_session_ptr = GetTracingSession(tsid);
2933 if (tracing_session_ptr &&
2934 tracing_session_ptr->state == TracingSession::CONFIGURED) {
2935 PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2936 " since no triggers activated.",
2937 tsid);
2938 // No data should be returned from ReadBuffers() regardless of if we
2939 // call FreeBuffers() or DisableTracing(). This is because in
2940 // STOP_TRACING we need this promise in either case, and using
2941 // DisableTracing() allows a graceful shutdown. Consumers can follow
2942 // their normal path and check the buffers through ReadBuffers() and
2943 // the code won't hang because the tracing session will still be
2944 // alive just disabled.
2945 DisableTracing(tsid);
2946 }
2947 }
2948
UpdateMemoryGuardrail()2949 void TracingServiceImpl::UpdateMemoryGuardrail() {
2950 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
2951 uint64_t total_buffer_bytes = 0;
2952
2953 // Sum up all the shared memory buffers.
2954 for (const auto& id_to_producer : producers_) {
2955 if (id_to_producer.second->shared_memory())
2956 total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2957 }
2958
2959 // Sum up all the trace buffers.
2960 for (const auto& id_to_buffer : buffers_) {
2961 total_buffer_bytes += id_to_buffer.second->size();
2962 }
2963
2964 // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2965 // interval.
2966 uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
2967 base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2968 #endif
2969 }
2970
PeriodicSnapshotTask(TracingSessionID tsid)2971 void TracingServiceImpl::PeriodicSnapshotTask(TracingSessionID tsid) {
2972 auto* tracing_session = GetTracingSession(tsid);
2973 if (!tracing_session)
2974 return;
2975 if (tracing_session->state != TracingSession::STARTED)
2976 return;
2977 tracing_session->should_emit_sync_marker = true;
2978 tracing_session->should_emit_stats = true;
2979 MaybeSnapshotClocksIntoRingBuffer(tracing_session);
2980 }
2981
SnapshotLifecyleEvent(TracingSession * tracing_session,uint32_t field_id,bool snapshot_clocks)2982 void TracingServiceImpl::SnapshotLifecyleEvent(TracingSession* tracing_session,
2983 uint32_t field_id,
2984 bool snapshot_clocks) {
2985 // field_id should be an id of a field in TracingServiceEvent.
2986 auto& lifecycle_events = tracing_session->lifecycle_events;
2987 auto event_it =
2988 std::find_if(lifecycle_events.begin(), lifecycle_events.end(),
2989 [field_id](const TracingSession::LifecycleEvent& event) {
2990 return event.field_id == field_id;
2991 });
2992
2993 TracingSession::LifecycleEvent* event;
2994 if (event_it == lifecycle_events.end()) {
2995 lifecycle_events.emplace_back(field_id);
2996 event = &lifecycle_events.back();
2997 } else {
2998 event = &*event_it;
2999 }
3000
3001 // Snapshot the clocks before capturing the timestamp for the event so we can
3002 // use this snapshot to resolve the event timestamp if necessary.
3003 if (snapshot_clocks)
3004 MaybeSnapshotClocksIntoRingBuffer(tracing_session);
3005
3006 // Erase before emplacing to prevent a unncessary doubling of memory if
3007 // not needed.
3008 if (event->timestamps.size() >= event->max_size) {
3009 event->timestamps.erase_front(1 + event->timestamps.size() -
3010 event->max_size);
3011 }
3012 event->timestamps.emplace_back(base::GetBootTimeNs().count());
3013 }
3014
MaybeSnapshotClocksIntoRingBuffer(TracingSession * tracing_session)3015 void TracingServiceImpl::MaybeSnapshotClocksIntoRingBuffer(
3016 TracingSession* tracing_session) {
3017 if (tracing_session->config.builtin_data_sources()
3018 .disable_clock_snapshotting()) {
3019 return;
3020 }
3021
3022 // We are making an explicit copy of the latest snapshot (if it exists)
3023 // because SnapshotClocks reads this data and computes the drift based on its
3024 // content. If the clock drift is high enough, it will update the contents of
3025 // |snapshot| and return true. Otherwise, it will return false.
3026 TracingSession::ClockSnapshotData snapshot =
3027 tracing_session->clock_snapshot_ring_buffer.empty()
3028 ? TracingSession::ClockSnapshotData()
3029 : tracing_session->clock_snapshot_ring_buffer.back();
3030 bool did_update = SnapshotClocks(&snapshot);
3031 if (did_update) {
3032 // This means clocks drifted enough since last snapshot. See the comment
3033 // in SnapshotClocks.
3034 auto* snapshot_buffer = &tracing_session->clock_snapshot_ring_buffer;
3035
3036 // Erase before emplacing to prevent a unncessary doubling of memory if
3037 // not needed.
3038 static constexpr uint32_t kClockSnapshotRingBufferSize = 16;
3039 if (snapshot_buffer->size() >= kClockSnapshotRingBufferSize) {
3040 snapshot_buffer->erase_front(1 + snapshot_buffer->size() -
3041 kClockSnapshotRingBufferSize);
3042 }
3043 snapshot_buffer->emplace_back(std::move(snapshot));
3044 }
3045 }
3046
3047 // Returns true when the data in |snapshot_data| is updated with the new state
3048 // of the clocks and false otherwise.
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)3049 bool TracingServiceImpl::SnapshotClocks(
3050 TracingSession::ClockSnapshotData* snapshot_data) {
3051 // Minimum drift that justifies replacing a prior clock snapshot that hasn't
3052 // been emitted into the trace yet (see comment below).
3053 static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000; // 10 ms
3054
3055 TracingSession::ClockSnapshotData new_snapshot_data;
3056
3057 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE) && \
3058 !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3059 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3060 struct {
3061 clockid_t id;
3062 protos::pbzero::BuiltinClock type;
3063 struct timespec ts;
3064 } clocks[] = {
3065 {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
3066 {CLOCK_REALTIME_COARSE,
3067 protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
3068 {0, 0}},
3069 {CLOCK_MONOTONIC_COARSE,
3070 protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
3071 {0, 0}},
3072 {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
3073 {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
3074 {CLOCK_MONOTONIC_RAW,
3075 protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
3076 {0, 0}},
3077 };
3078 // First snapshot all the clocks as atomically as we can.
3079 for (auto& clock : clocks) {
3080 if (clock_gettime(clock.id, &clock.ts) == -1)
3081 PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
3082 }
3083 for (auto& clock : clocks) {
3084 new_snapshot_data.push_back(std::make_pair(
3085 static_cast<uint32_t>(clock.type),
3086 static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
3087 }
3088 #else // OS_APPLE || OS_WIN && OS_NACL
3089 auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
3090 // The default trace clock is boot time, so we always need to emit a path to
3091 // it. However since we don't actually have a boot time source on these
3092 // platforms, pretend that wall time equals boot time.
3093 new_snapshot_data.push_back(
3094 std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
3095 new_snapshot_data.push_back(
3096 std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
3097 #endif
3098
3099 // If we're about to update a session's latest clock snapshot that hasn't been
3100 // emitted into the trace yet, check whether the clocks have drifted enough to
3101 // warrant overriding the current snapshot values. The older snapshot would be
3102 // valid for a larger part of the currently buffered trace data because the
3103 // clock sync protocol in trace processor uses the latest clock <= timestamp
3104 // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
3105 // we try to keep it if we can.
3106 if (!snapshot_data->empty()) {
3107 PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
3108 PERFETTO_DCHECK((*snapshot_data)[0].first ==
3109 protos::gen::BUILTIN_CLOCK_BOOTTIME);
3110
3111 bool update_snapshot = false;
3112 uint64_t old_boot_ns = (*snapshot_data)[0].second;
3113 uint64_t new_boot_ns = new_snapshot_data[0].second;
3114 int64_t boot_diff =
3115 static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
3116
3117 for (size_t i = 1; i < snapshot_data->size(); i++) {
3118 uint64_t old_ns = (*snapshot_data)[i].second;
3119 uint64_t new_ns = new_snapshot_data[i].second;
3120
3121 int64_t diff =
3122 static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
3123
3124 // Compare the boottime delta against the delta of this clock.
3125 if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
3126 update_snapshot = true;
3127 break;
3128 }
3129 }
3130 if (!update_snapshot)
3131 return false;
3132 snapshot_data->clear();
3133 }
3134
3135 *snapshot_data = std::move(new_snapshot_data);
3136 return true;
3137 }
3138
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,std::vector<TracePacket> * packets)3139 void TracingServiceImpl::EmitClockSnapshot(
3140 TracingSession* tracing_session,
3141 TracingSession::ClockSnapshotData snapshot_data,
3142 std::vector<TracePacket>* packets) {
3143 PERFETTO_DCHECK(!tracing_session->config.builtin_data_sources()
3144 .disable_clock_snapshotting());
3145
3146 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3147 auto* snapshot = packet->set_clock_snapshot();
3148
3149 protos::gen::BuiltinClock trace_clock =
3150 tracing_session->config.builtin_data_sources().primary_trace_clock();
3151 if (!trace_clock)
3152 trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
3153 snapshot->set_primary_trace_clock(
3154 static_cast<protos::pbzero::BuiltinClock>(trace_clock));
3155
3156 for (auto& clock_id_and_ts : snapshot_data) {
3157 auto* c = snapshot->add_clocks();
3158 c->set_clock_id(clock_id_and_ts.first);
3159 c->set_timestamp(clock_id_and_ts.second);
3160 }
3161
3162 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3163 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3164 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3165 }
3166
EmitSyncMarker(std::vector<TracePacket> * packets)3167 void TracingServiceImpl::EmitSyncMarker(std::vector<TracePacket>* packets) {
3168 // The sync marks are used to tokenize large traces efficiently.
3169 // See description in trace_packet.proto.
3170 if (sync_marker_packet_size_ == 0) {
3171 // The marker ABI expects that the marker is written after the uid.
3172 // Protozero guarantees that fields are written in the same order of the
3173 // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
3174 protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
3175 &sync_marker_packet_[0], sizeof(sync_marker_packet_));
3176 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3177 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3178
3179 // Keep this last.
3180 packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
3181 sync_marker_packet_size_ = packet.Finalize();
3182 }
3183 packets->emplace_back();
3184 packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
3185 }
3186
EmitStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)3187 void TracingServiceImpl::EmitStats(TracingSession* tracing_session,
3188 std::vector<TracePacket>* packets) {
3189 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3190 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3191 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3192 GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
3193 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3194 }
3195
GetTraceStats(TracingSession * tracing_session)3196 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
3197 TraceStats trace_stats;
3198 trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
3199 trace_stats.set_producers_seen(last_producer_id_);
3200 trace_stats.set_data_sources_registered(
3201 static_cast<uint32_t>(data_sources_.size()));
3202 trace_stats.set_data_sources_seen(last_data_source_instance_id_);
3203 trace_stats.set_tracing_sessions(
3204 static_cast<uint32_t>(tracing_sessions_.size()));
3205 trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
3206 trace_stats.set_chunks_discarded(chunks_discarded_);
3207 trace_stats.set_patches_discarded(patches_discarded_);
3208 trace_stats.set_invalid_packets(tracing_session->invalid_packets);
3209 trace_stats.set_flushes_requested(tracing_session->flushes_requested);
3210 trace_stats.set_flushes_succeeded(tracing_session->flushes_succeeded);
3211 trace_stats.set_flushes_failed(tracing_session->flushes_failed);
3212 trace_stats.set_final_flush_outcome(tracing_session->final_flush_outcome);
3213
3214 if (tracing_session->trace_filter) {
3215 auto* filt_stats = trace_stats.mutable_filter_stats();
3216 filt_stats->set_input_packets(tracing_session->filter_input_packets);
3217 filt_stats->set_input_bytes(tracing_session->filter_input_bytes);
3218 filt_stats->set_output_bytes(tracing_session->filter_output_bytes);
3219 filt_stats->set_errors(tracing_session->filter_errors);
3220 }
3221
3222 for (BufferID buf_id : tracing_session->buffers_index) {
3223 TraceBuffer* buf = GetBufferByID(buf_id);
3224 if (!buf) {
3225 PERFETTO_DFATAL("Buffer not found.");
3226 continue;
3227 }
3228 *trace_stats.add_buffer_stats() = buf->stats();
3229 } // for (buf in session).
3230 return trace_stats;
3231 }
3232
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)3233 void TracingServiceImpl::MaybeEmitTraceConfig(
3234 TracingSession* tracing_session,
3235 std::vector<TracePacket>* packets) {
3236 if (tracing_session->did_emit_config)
3237 return;
3238 tracing_session->did_emit_config = true;
3239 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3240 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3241 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3242 tracing_session->config.Serialize(packet->set_trace_config());
3243 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3244 }
3245
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)3246 void TracingServiceImpl::MaybeEmitSystemInfo(
3247 TracingSession* tracing_session,
3248 std::vector<TracePacket>* packets) {
3249 if (tracing_session->did_emit_system_info)
3250 return;
3251 tracing_session->did_emit_system_info = true;
3252 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3253 auto* info = packet->set_system_info();
3254 info->set_tracing_service_version(base::GetVersionString());
3255 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
3256 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
3257 struct utsname uname_info;
3258 if (uname(&uname_info) == 0) {
3259 auto* utsname_info = info->set_utsname();
3260 utsname_info->set_sysname(uname_info.sysname);
3261 utsname_info->set_version(uname_info.version);
3262 utsname_info->set_machine(uname_info.machine);
3263 utsname_info->set_release(uname_info.release);
3264 }
3265 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
3266 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3267 std::string fingerprint_value = base::GetAndroidProp("ro.build.fingerprint");
3268 if (!fingerprint_value.empty()) {
3269 info->set_android_build_fingerprint(fingerprint_value);
3270 } else {
3271 PERFETTO_ELOG("Unable to read ro.build.fingerprint");
3272 }
3273
3274 std::string sdk_str_value = base::GetAndroidProp("ro.build.version.sdk");
3275 base::Optional<uint64_t> sdk_value = base::StringToUInt64(sdk_str_value);
3276 if (sdk_value.has_value()) {
3277 info->set_android_sdk_version(*sdk_value);
3278 } else {
3279 PERFETTO_ELOG("Unable to read ro.build.version.sdk");
3280 }
3281 info->set_hz(sysconf(_SC_CLK_TCK));
3282 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
3283 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3284 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3285 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3286 }
3287
EmitLifecycleEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)3288 void TracingServiceImpl::EmitLifecycleEvents(
3289 TracingSession* tracing_session,
3290 std::vector<TracePacket>* packets) {
3291 using TimestampedPacket =
3292 std::pair<int64_t /* ts */, std::vector<uint8_t> /* serialized packet */>;
3293
3294 std::vector<TimestampedPacket> timestamped_packets;
3295 for (auto& event : tracing_session->lifecycle_events) {
3296 for (int64_t ts : event.timestamps) {
3297 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3298 packet->set_timestamp(static_cast<uint64_t>(ts));
3299 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3300 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3301
3302 auto* service_event = packet->set_service_event();
3303 service_event->AppendVarInt(event.field_id, 1);
3304 timestamped_packets.emplace_back(ts, packet.SerializeAsArray());
3305 }
3306 event.timestamps.clear();
3307 }
3308
3309 // We sort by timestamp here to ensure that the "sequence" of lifecycle
3310 // packets has monotonic timestamps like other sequences in the trace.
3311 // Note that these events could still be out of order with respect to other
3312 // events on the service packet sequence (e.g. trigger received packets).
3313 std::sort(timestamped_packets.begin(), timestamped_packets.end(),
3314 [](const TimestampedPacket& a, const TimestampedPacket& b) {
3315 return a.first < b.first;
3316 });
3317
3318 for (const auto& pair : timestamped_packets)
3319 SerializeAndAppendPacket(packets, std::move(pair.second));
3320 }
3321
EmitSeizedForBugreportLifecycleEvent(std::vector<TracePacket> * packets)3322 void TracingServiceImpl::EmitSeizedForBugreportLifecycleEvent(
3323 std::vector<TracePacket>* packets) {
3324 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3325 packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
3326 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3327 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3328 auto* service_event = packet->set_service_event();
3329 service_event->AppendVarInt(
3330 protos::pbzero::TracingServiceEvent::kSeizedForBugreportFieldNumber, 1);
3331 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3332 }
3333
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)3334 void TracingServiceImpl::MaybeEmitReceivedTriggers(
3335 TracingSession* tracing_session,
3336 std::vector<TracePacket>* packets) {
3337 PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
3338 tracing_session->received_triggers.size());
3339 for (size_t i = tracing_session->num_triggers_emitted_into_trace;
3340 i < tracing_session->received_triggers.size(); ++i) {
3341 const auto& info = tracing_session->received_triggers[i];
3342 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
3343 auto* trigger = packet->set_trigger();
3344 trigger->set_trigger_name(info.trigger_name);
3345 trigger->set_producer_name(info.producer_name);
3346 trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
3347
3348 packet->set_timestamp(info.boot_time_ns);
3349 packet->set_trusted_uid(static_cast<int32_t>(uid_));
3350 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
3351 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
3352 ++tracing_session->num_triggers_emitted_into_trace;
3353 }
3354 }
3355
MaybeSaveTraceForBugreport(std::function<void ()> callback)3356 bool TracingServiceImpl::MaybeSaveTraceForBugreport(
3357 std::function<void()> callback) {
3358 TracingSession* max_session = nullptr;
3359 TracingSessionID max_tsid = 0;
3360 for (auto& session_id_and_session : tracing_sessions_) {
3361 auto& session = session_id_and_session.second;
3362 const int32_t score = session.config.bugreport_score();
3363 // Exclude sessions with 0 (or below) score. By default tracing sessions
3364 // should NOT be eligible to be attached to bugreports.
3365 if (score <= 0 || session.state != TracingSession::STARTED)
3366 continue;
3367
3368 // Also don't try to steal long traces with write_into_file if their content
3369 // has been already partially written into a file, as we would get partial
3370 // traces on both sides. We can't just copy the original file into the
3371 // bugreport because the file could be too big (GBs) for bugreports.
3372 // The only case where it's legit to steal traces with write_into_file, is
3373 // when the consumer specified a very large write_period_ms (e.g. 24h),
3374 // meaning that this is effectively a ring-buffer trace. Traceur (the
3375 // Android System Tracing app), which uses --detach, does this to have a
3376 // consistent invocation path for long-traces and ring-buffer-mode traces.
3377 if (session.write_into_file && session.bytes_written_into_file > 0)
3378 continue;
3379
3380 // If we are already in the process of finalizing another trace for
3381 // bugreport, don't even start another one, as they would try to write onto
3382 // the same file.
3383 if (session.on_disable_callback_for_bugreport)
3384 return false;
3385
3386 if (!max_session || score > max_session->config.bugreport_score()) {
3387 max_session = &session;
3388 max_tsid = session_id_and_session.first;
3389 }
3390 }
3391
3392 // No eligible trace found.
3393 if (!max_session)
3394 return false;
3395
3396 PERFETTO_LOG("Seizing trace for bugreport. tsid:%" PRIu64
3397 " state:%d wf:%d score:%d name:\"%s\"",
3398 max_tsid, max_session->state, !!max_session->write_into_file,
3399 max_session->config.bugreport_score(),
3400 max_session->config.unique_session_name().c_str());
3401
3402 auto br_fd = CreateTraceFile(GetBugreportTmpPath(), /*overwrite=*/true);
3403 if (!br_fd)
3404 return false;
3405
3406 if (max_session->write_into_file) {
3407 auto fd = *max_session->write_into_file;
3408 // If we are stealing a write_into_file session, add a marker that explains
3409 // why the trace has been stolen rather than creating an empty file. This is
3410 // only for write_into_file traces. A similar code path deals with the case
3411 // of reading-back a seized trace from IPC in ReadBuffersIntoConsumer().
3412 if (!max_session->config.builtin_data_sources().disable_service_events()) {
3413 std::vector<TracePacket> packets;
3414 EmitSeizedForBugreportLifecycleEvent(&packets);
3415 for (auto& packet : packets) {
3416 char* preamble;
3417 size_t preamble_size = 0;
3418 std::tie(preamble, preamble_size) = packet.GetProtoPreamble();
3419 base::WriteAll(fd, preamble, preamble_size);
3420 for (const Slice& slice : packet.slices()) {
3421 base::WriteAll(fd, slice.start, slice.size);
3422 }
3423 } // for (packets)
3424 } // if (!disable_service_events())
3425 } // if (max_session->write_into_file)
3426 max_session->write_into_file = std::move(br_fd);
3427 max_session->on_disable_callback_for_bugreport = std::move(callback);
3428 max_session->seized_for_bugreport = true;
3429
3430 // Post a task to avoid that early FlushAndDisableTracing() failures invoke
3431 // the callback before we return. That would re-enter in a weird way the
3432 // callstack of the calling ConsumerEndpointImpl::SaveTraceForBugreport().
3433 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3434 task_runner_->PostTask([weak_this, max_tsid] {
3435 if (weak_this)
3436 weak_this->FlushAndDisableTracing(max_tsid);
3437 });
3438 return true;
3439 }
3440
MaybeLogUploadEvent(const TraceConfig & cfg,PerfettoStatsdAtom atom,const std::string & trigger_name)3441 void TracingServiceImpl::MaybeLogUploadEvent(const TraceConfig& cfg,
3442 PerfettoStatsdAtom atom,
3443 const std::string& trigger_name) {
3444 if (!ShouldLogEvent(cfg))
3445 return;
3446
3447 // If the UUID is not set for some reason, don't log anything.
3448 if (cfg.trace_uuid_lsb() == 0 && cfg.trace_uuid_msb() == 0)
3449 return;
3450
3451 android_stats::MaybeLogUploadEvent(atom, cfg.trace_uuid_lsb(),
3452 cfg.trace_uuid_msb(), trigger_name);
3453 }
3454
MaybeLogTriggerEvent(const TraceConfig & cfg,PerfettoTriggerAtom atom,const std::string & trigger_name)3455 void TracingServiceImpl::MaybeLogTriggerEvent(const TraceConfig& cfg,
3456 PerfettoTriggerAtom atom,
3457 const std::string& trigger_name) {
3458 if (!ShouldLogEvent(cfg))
3459 return;
3460 android_stats::MaybeLogTriggerEvent(atom, trigger_name);
3461 }
3462
PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,uint64_t trigger_name_hash)3463 size_t TracingServiceImpl::PurgeExpiredAndCountTriggerInWindow(
3464 int64_t now_ns,
3465 uint64_t trigger_name_hash) {
3466 PERFETTO_DCHECK(
3467 std::is_sorted(trigger_history_.begin(), trigger_history_.end()));
3468 size_t remove_count = 0;
3469 size_t trigger_count = 0;
3470 for (const TriggerHistory& h : trigger_history_) {
3471 if (h.timestamp_ns < now_ns - trigger_window_ns_) {
3472 remove_count++;
3473 } else if (h.name_hash == trigger_name_hash) {
3474 trigger_count++;
3475 }
3476 }
3477 trigger_history_.erase_front(remove_count);
3478 return trigger_count;
3479 }
3480
3481 ////////////////////////////////////////////////////////////////////////////////
3482 // TracingServiceImpl::ConsumerEndpointImpl implementation
3483 ////////////////////////////////////////////////////////////////////////////////
3484
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)3485 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
3486 TracingServiceImpl* service,
3487 base::TaskRunner* task_runner,
3488 Consumer* consumer,
3489 uid_t uid)
3490 : task_runner_(task_runner),
3491 service_(service),
3492 consumer_(consumer),
3493 uid_(uid),
3494 weak_ptr_factory_(this) {}
3495
~ConsumerEndpointImpl()3496 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
3497 service_->DisconnectConsumer(this);
3498 consumer_->OnDisconnect();
3499 }
3500
NotifyOnTracingDisabled(const std::string & error)3501 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled(
3502 const std::string& error) {
3503 PERFETTO_DCHECK_THREAD(thread_checker_);
3504 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3505 task_runner_->PostTask([weak_this, error /* deliberate copy */] {
3506 if (weak_this)
3507 weak_this->consumer_->OnTracingDisabled(error);
3508 });
3509 }
3510
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)3511 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
3512 const TraceConfig& cfg,
3513 base::ScopedFile fd) {
3514 PERFETTO_DCHECK_THREAD(thread_checker_);
3515 auto status = service_->EnableTracing(this, cfg, std::move(fd));
3516 if (!status.ok())
3517 NotifyOnTracingDisabled(status.message());
3518 }
3519
ChangeTraceConfig(const TraceConfig & cfg)3520 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
3521 const TraceConfig& cfg) {
3522 if (!tracing_session_id_) {
3523 PERFETTO_LOG(
3524 "Consumer called ChangeTraceConfig() but tracing was "
3525 "not active");
3526 return;
3527 }
3528 service_->ChangeTraceConfig(this, cfg);
3529 }
3530
StartTracing()3531 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
3532 PERFETTO_DCHECK_THREAD(thread_checker_);
3533 if (!tracing_session_id_) {
3534 PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
3535 return;
3536 }
3537 service_->StartTracing(tracing_session_id_);
3538 }
3539
DisableTracing()3540 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
3541 PERFETTO_DCHECK_THREAD(thread_checker_);
3542 if (!tracing_session_id_) {
3543 PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
3544 return;
3545 }
3546 service_->DisableTracing(tracing_session_id_);
3547 }
3548
ReadBuffers()3549 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
3550 PERFETTO_DCHECK_THREAD(thread_checker_);
3551 if (!tracing_session_id_) {
3552 PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
3553 consumer_->OnTraceData({}, /* has_more = */ false);
3554 return;
3555 }
3556 if (!service_->ReadBuffersIntoConsumer(tracing_session_id_, this)) {
3557 consumer_->OnTraceData({}, /* has_more = */ false);
3558 }
3559 }
3560
FreeBuffers()3561 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
3562 PERFETTO_DCHECK_THREAD(thread_checker_);
3563 if (!tracing_session_id_) {
3564 PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
3565 return;
3566 }
3567 service_->FreeBuffers(tracing_session_id_);
3568 tracing_session_id_ = 0;
3569 }
3570
Flush(uint32_t timeout_ms,FlushCallback callback)3571 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
3572 FlushCallback callback) {
3573 PERFETTO_DCHECK_THREAD(thread_checker_);
3574 if (!tracing_session_id_) {
3575 PERFETTO_LOG("Consumer called Flush() but tracing was not active");
3576 return;
3577 }
3578 service_->Flush(tracing_session_id_, timeout_ms, callback);
3579 }
3580
Detach(const std::string & key)3581 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
3582 PERFETTO_DCHECK_THREAD(thread_checker_);
3583 bool success = service_->DetachConsumer(this, key);
3584 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3585 task_runner_->PostTask([weak_this, success] {
3586 if (weak_this)
3587 weak_this->consumer_->OnDetach(success);
3588 });
3589 }
3590
Attach(const std::string & key)3591 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
3592 PERFETTO_DCHECK_THREAD(thread_checker_);
3593 bool success = service_->AttachConsumer(this, key);
3594 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3595 task_runner_->PostTask([weak_this, success] {
3596 if (!weak_this)
3597 return;
3598 Consumer* consumer = weak_this->consumer_;
3599 TracingSession* session =
3600 weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
3601 if (!session) {
3602 consumer->OnAttach(false, TraceConfig());
3603 return;
3604 }
3605 consumer->OnAttach(success, session->config);
3606 });
3607 }
3608
GetTraceStats()3609 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
3610 PERFETTO_DCHECK_THREAD(thread_checker_);
3611 bool success = false;
3612 TraceStats stats;
3613 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3614 if (session) {
3615 success = true;
3616 stats = service_->GetTraceStats(session);
3617 }
3618 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3619 task_runner_->PostTask([weak_this, success, stats] {
3620 if (weak_this)
3621 weak_this->consumer_->OnTraceStats(success, stats);
3622 });
3623 }
3624
ObserveEvents(uint32_t events_mask)3625 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
3626 uint32_t events_mask) {
3627 PERFETTO_DCHECK_THREAD(thread_checker_);
3628 observable_events_mask_ = events_mask;
3629 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
3630 if (!session)
3631 return;
3632
3633 if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
3634 // Issue initial states.
3635 for (const auto& kv : session->data_source_instances) {
3636 ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
3637 PERFETTO_DCHECK(producer);
3638 OnDataSourceInstanceStateChange(*producer, kv.second);
3639 }
3640 }
3641
3642 // If the ObserveEvents() call happens after data sources have acked already
3643 // notify immediately.
3644 if (observable_events_mask_ &
3645 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
3646 service_->MaybeNotifyAllDataSourcesStarted(session);
3647 }
3648 }
3649
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)3650 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
3651 const ProducerEndpointImpl& producer,
3652 const DataSourceInstance& instance) {
3653 if (!(observable_events_mask_ &
3654 ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
3655 return;
3656 }
3657
3658 if (instance.state != DataSourceInstance::CONFIGURED &&
3659 instance.state != DataSourceInstance::STARTED &&
3660 instance.state != DataSourceInstance::STOPPED) {
3661 return;
3662 }
3663
3664 auto* observable_events = AddObservableEvents();
3665 auto* change = observable_events->add_instance_state_changes();
3666 change->set_producer_name(producer.name_);
3667 change->set_data_source_name(instance.data_source_name);
3668 if (instance.state == DataSourceInstance::STARTED) {
3669 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
3670 } else {
3671 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
3672 }
3673 }
3674
OnAllDataSourcesStarted()3675 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
3676 if (!(observable_events_mask_ &
3677 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
3678 return;
3679 }
3680 auto* observable_events = AddObservableEvents();
3681 observable_events->set_all_data_sources_started(true);
3682 }
3683
3684 ObservableEvents*
AddObservableEvents()3685 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
3686 PERFETTO_DCHECK_THREAD(thread_checker_);
3687 if (!observable_events_) {
3688 observable_events_.reset(new ObservableEvents());
3689 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3690 task_runner_->PostTask([weak_this] {
3691 if (!weak_this)
3692 return;
3693
3694 // Move into a temporary to allow reentrancy in OnObservableEvents.
3695 auto observable_events = std::move(weak_this->observable_events_);
3696 weak_this->consumer_->OnObservableEvents(*observable_events);
3697 });
3698 }
3699 return observable_events_.get();
3700 }
3701
QueryServiceState(QueryServiceStateCallback callback)3702 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
3703 QueryServiceStateCallback callback) {
3704 PERFETTO_DCHECK_THREAD(thread_checker_);
3705 TracingServiceState svc_state;
3706
3707 const auto& sessions = service_->tracing_sessions_;
3708 svc_state.set_tracing_service_version(base::GetVersionString());
3709 svc_state.set_num_sessions(static_cast<int>(sessions.size()));
3710
3711 int num_started = 0;
3712 for (const auto& kv : sessions)
3713 num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
3714 svc_state.set_num_sessions_started(static_cast<int>(num_started));
3715
3716 for (const auto& kv : service_->producers_) {
3717 auto* producer = svc_state.add_producers();
3718 producer->set_id(static_cast<int>(kv.first));
3719 producer->set_name(kv.second->name_);
3720 producer->set_sdk_version(kv.second->sdk_version_);
3721 producer->set_uid(static_cast<int32_t>(kv.second->uid()));
3722 producer->set_pid(static_cast<int32_t>(kv.second->pid()));
3723 }
3724
3725 for (const auto& kv : service_->data_sources_) {
3726 const auto& registered_data_source = kv.second;
3727 auto* data_source = svc_state.add_data_sources();
3728 *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
3729 data_source->set_producer_id(
3730 static_cast<int>(registered_data_source.producer_id));
3731 }
3732
3733 svc_state.set_supports_tracing_sessions(true);
3734 for (const auto& kv : service_->tracing_sessions_) {
3735 const TracingSession& s = kv.second;
3736 // List only tracing sessions for the calling UID (or everything for root).
3737 if (uid_ != 0 && uid_ != s.consumer_uid)
3738 continue;
3739 auto* session = svc_state.add_tracing_sessions();
3740 session->set_id(s.id);
3741 session->set_consumer_uid(static_cast<int>(s.consumer_uid));
3742 session->set_duration_ms(s.config.duration_ms());
3743 session->set_num_data_sources(
3744 static_cast<uint32_t>(s.data_source_instances.size()));
3745 session->set_unique_session_name(s.config.unique_session_name());
3746 for (const auto& snap_kv : s.initial_clock_snapshot) {
3747 if (snap_kv.first == protos::pbzero::BUILTIN_CLOCK_REALTIME)
3748 session->set_start_realtime_ns(static_cast<int64_t>(snap_kv.second));
3749 }
3750 for (const auto& buf : s.config.buffers())
3751 session->add_buffer_size_kb(buf.size_kb());
3752
3753 switch (s.state) {
3754 case TracingSession::State::DISABLED:
3755 session->set_state("DISABLED");
3756 break;
3757 case TracingSession::State::CONFIGURED:
3758 session->set_state("CONFIGURED");
3759 break;
3760 case TracingSession::State::STARTED:
3761 session->set_state("STARTED");
3762 break;
3763 case TracingSession::State::DISABLING_WAITING_STOP_ACKS:
3764 session->set_state("STOP_WAIT");
3765 break;
3766 }
3767 }
3768 callback(/*success=*/true, svc_state);
3769 }
3770
QueryCapabilities(QueryCapabilitiesCallback callback)3771 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
3772 QueryCapabilitiesCallback callback) {
3773 PERFETTO_DCHECK_THREAD(thread_checker_);
3774 TracingServiceCapabilities caps;
3775 caps.set_has_query_capabilities(true);
3776 caps.set_has_trace_config_output_path(true);
3777 caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
3778 caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
3779 static_assert(ObservableEvents::Type_MAX ==
3780 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED,
3781 "");
3782 callback(caps);
3783 }
3784
SaveTraceForBugreport(SaveTraceForBugreportCallback consumer_callback)3785 void TracingServiceImpl::ConsumerEndpointImpl::SaveTraceForBugreport(
3786 SaveTraceForBugreportCallback consumer_callback) {
3787 PERFETTO_DCHECK_THREAD(thread_checker_);
3788 auto on_complete_callback = [consumer_callback] {
3789 if (rename(GetBugreportTmpPath().c_str(), GetBugreportPath().c_str())) {
3790 consumer_callback(false, "rename(" + GetBugreportTmpPath() + ", " +
3791 GetBugreportPath() + ") failed (" +
3792 strerror(errno) + ")");
3793 } else {
3794 consumer_callback(true, GetBugreportPath());
3795 }
3796 };
3797 if (!service_->MaybeSaveTraceForBugreport(std::move(on_complete_callback))) {
3798 consumer_callback(false,
3799 "No trace with TraceConfig.bugreport_score > 0 eligible "
3800 "for bug reporting was found");
3801 }
3802 }
3803
3804 ////////////////////////////////////////////////////////////////////////////////
3805 // TracingServiceImpl::ProducerEndpointImpl implementation
3806 ////////////////////////////////////////////////////////////////////////////////
3807
ProducerEndpointImpl(ProducerID id,uid_t uid,pid_t pid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,const std::string & sdk_version,bool in_process,bool smb_scraping_enabled)3808 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
3809 ProducerID id,
3810 uid_t uid,
3811 pid_t pid,
3812 TracingServiceImpl* service,
3813 base::TaskRunner* task_runner,
3814 Producer* producer,
3815 const std::string& producer_name,
3816 const std::string& sdk_version,
3817 bool in_process,
3818 bool smb_scraping_enabled)
3819 : id_(id),
3820 uid_(uid),
3821 pid_(pid),
3822 service_(service),
3823 task_runner_(task_runner),
3824 producer_(producer),
3825 name_(producer_name),
3826 sdk_version_(sdk_version),
3827 in_process_(in_process),
3828 smb_scraping_enabled_(smb_scraping_enabled),
3829 weak_ptr_factory_(this) {}
3830
~ProducerEndpointImpl()3831 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
3832 service_->DisconnectProducer(id_);
3833 producer_->OnDisconnect();
3834 }
3835
RegisterDataSource(const DataSourceDescriptor & desc)3836 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
3837 const DataSourceDescriptor& desc) {
3838 PERFETTO_DCHECK_THREAD(thread_checker_);
3839 service_->RegisterDataSource(id_, desc);
3840 }
3841
UpdateDataSource(const DataSourceDescriptor & desc)3842 void TracingServiceImpl::ProducerEndpointImpl::UpdateDataSource(
3843 const DataSourceDescriptor& desc) {
3844 PERFETTO_DCHECK_THREAD(thread_checker_);
3845 service_->UpdateDataSource(id_, desc);
3846 }
3847
UnregisterDataSource(const std::string & name)3848 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
3849 const std::string& name) {
3850 PERFETTO_DCHECK_THREAD(thread_checker_);
3851 service_->UnregisterDataSource(id_, name);
3852 }
3853
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)3854 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
3855 uint32_t writer_id,
3856 uint32_t target_buffer) {
3857 PERFETTO_DCHECK_THREAD(thread_checker_);
3858 writers_[static_cast<WriterID>(writer_id)] =
3859 static_cast<BufferID>(target_buffer);
3860 }
3861
UnregisterTraceWriter(uint32_t writer_id)3862 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
3863 uint32_t writer_id) {
3864 PERFETTO_DCHECK_THREAD(thread_checker_);
3865 writers_.erase(static_cast<WriterID>(writer_id));
3866 }
3867
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)3868 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
3869 const CommitDataRequest& req_untrusted,
3870 CommitDataCallback callback) {
3871 PERFETTO_DCHECK_THREAD(thread_checker_);
3872
3873 if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
3874 PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
3875 EncodeCommitDataRequest(id_, req_untrusted));
3876 }
3877
3878 if (!shared_memory_) {
3879 PERFETTO_DLOG(
3880 "Attempted to commit data before the shared memory was allocated.");
3881 return;
3882 }
3883 PERFETTO_DCHECK(shmem_abi_.is_valid());
3884 for (const auto& entry : req_untrusted.chunks_to_move()) {
3885 const uint32_t page_idx = entry.page();
3886 if (page_idx >= shmem_abi_.num_pages())
3887 continue; // A buggy or malicious producer.
3888
3889 SharedMemoryABI::Chunk chunk =
3890 shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
3891 if (!chunk.is_valid()) {
3892 PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
3893 entry.page(), entry.chunk());
3894 continue;
3895 }
3896
3897 // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
3898 // the ABI contract expects the producer to not touch the chunk anymore
3899 // (until the service marks that as free). This is why all the reads below
3900 // are just memory_order_relaxed. Also, the code here assumes that all this
3901 // data can be malicious and just gives up if anything is malformed.
3902 BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
3903 const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
3904 WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
3905 ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
3906 auto packets = chunk_header.packets.load(std::memory_order_relaxed);
3907 uint16_t num_fragments = packets.count;
3908 uint8_t chunk_flags = packets.flags;
3909
3910 service_->CopyProducerPageIntoLogBuffer(
3911 id_, uid_, pid_, writer_id, chunk_id, buffer_id, num_fragments,
3912 chunk_flags,
3913 /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
3914
3915 // This one has release-store semantics.
3916 shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
3917 } // for(chunks_to_move)
3918
3919 service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
3920
3921 if (req_untrusted.flush_request_id()) {
3922 service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
3923 }
3924
3925 // Keep this invocation last. ProducerIPCService::CommitData() relies on this
3926 // callback being invoked within the same callstack and not posted. If this
3927 // changes, the code there needs to be changed accordingly.
3928 if (callback)
3929 callback();
3930 }
3931
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)3932 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
3933 std::unique_ptr<SharedMemory> shared_memory,
3934 size_t page_size_bytes,
3935 bool provided_by_producer) {
3936 PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
3937 PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
3938
3939 shared_memory_ = std::move(shared_memory);
3940 shared_buffer_page_size_kb_ = page_size_bytes / 1024;
3941 is_shmem_provided_by_producer_ = provided_by_producer;
3942
3943 shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
3944 shared_memory_->size(),
3945 shared_buffer_page_size_kb() * 1024);
3946 if (in_process_) {
3947 inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
3948 shared_memory_->start(), shared_memory_->size(),
3949 shared_buffer_page_size_kb_ * 1024, this, task_runner_));
3950 inproc_shmem_arbiter_->SetDirectSMBPatchingSupportedByService();
3951 }
3952
3953 OnTracingSetup();
3954 service_->UpdateMemoryGuardrail();
3955 }
3956
shared_memory() const3957 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
3958 PERFETTO_DCHECK_THREAD(thread_checker_);
3959 return shared_memory_.get();
3960 }
3961
shared_buffer_page_size_kb() const3962 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
3963 const {
3964 return shared_buffer_page_size_kb_;
3965 }
3966
ActivateTriggers(const std::vector<std::string> & triggers)3967 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
3968 const std::vector<std::string>& triggers) {
3969 service_->ActivateTriggers(id_, triggers);
3970 }
3971
StopDataSource(DataSourceInstanceID ds_inst_id)3972 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
3973 DataSourceInstanceID ds_inst_id) {
3974 // TODO(primiano): When we'll support tearing down the SMB, at this point we
3975 // should send the Producer a TearDownTracing if all its data sources have
3976 // been disabled (see b/77532839 and aosp/655179 PS1).
3977 PERFETTO_DCHECK_THREAD(thread_checker_);
3978 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3979 task_runner_->PostTask([weak_this, ds_inst_id] {
3980 if (weak_this)
3981 weak_this->producer_->StopDataSource(ds_inst_id);
3982 });
3983 }
3984
3985 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()3986 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
3987 if (!inproc_shmem_arbiter_) {
3988 PERFETTO_FATAL(
3989 "The in-process SharedMemoryArbiter can only be used when "
3990 "CreateProducer has been called with in_process=true and after tracing "
3991 "has started.");
3992 }
3993
3994 PERFETTO_DCHECK(in_process_);
3995 return inproc_shmem_arbiter_.get();
3996 }
3997
IsShmemProvidedByProducer() const3998 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
3999 const {
4000 return is_shmem_provided_by_producer_;
4001 }
4002
4003 // Can be called on any thread.
4004 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)4005 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
4006 BufferID buf_id,
4007 BufferExhaustedPolicy buffer_exhausted_policy) {
4008 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4009 return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
4010 buffer_exhausted_policy);
4011 }
4012
NotifyFlushComplete(FlushRequestID id)4013 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
4014 FlushRequestID id) {
4015 PERFETTO_DCHECK_THREAD(thread_checker_);
4016 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
4017 return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
4018 }
4019
OnTracingSetup()4020 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
4021 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4022 task_runner_->PostTask([weak_this] {
4023 if (weak_this)
4024 weak_this->producer_->OnTracingSetup();
4025 });
4026 }
4027
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)4028 void TracingServiceImpl::ProducerEndpointImpl::Flush(
4029 FlushRequestID flush_request_id,
4030 const std::vector<DataSourceInstanceID>& data_sources) {
4031 PERFETTO_DCHECK_THREAD(thread_checker_);
4032 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4033 task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
4034 if (weak_this) {
4035 weak_this->producer_->Flush(flush_request_id, data_sources.data(),
4036 data_sources.size());
4037 }
4038 });
4039 }
4040
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4041 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
4042 DataSourceInstanceID ds_id,
4043 const DataSourceConfig& config) {
4044 PERFETTO_DCHECK_THREAD(thread_checker_);
4045 allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
4046 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4047 task_runner_->PostTask([weak_this, ds_id, config] {
4048 if (weak_this)
4049 weak_this->producer_->SetupDataSource(ds_id, std::move(config));
4050 });
4051 }
4052
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)4053 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
4054 DataSourceInstanceID ds_id,
4055 const DataSourceConfig& config) {
4056 PERFETTO_DCHECK_THREAD(thread_checker_);
4057 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4058 task_runner_->PostTask([weak_this, ds_id, config] {
4059 if (weak_this)
4060 weak_this->producer_->StartDataSource(ds_id, std::move(config));
4061 });
4062 }
4063
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)4064 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
4065 DataSourceInstanceID data_source_id) {
4066 PERFETTO_DCHECK_THREAD(thread_checker_);
4067 service_->NotifyDataSourceStarted(id_, data_source_id);
4068 }
4069
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)4070 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
4071 DataSourceInstanceID data_source_id) {
4072 PERFETTO_DCHECK_THREAD(thread_checker_);
4073 service_->NotifyDataSourceStopped(id_, data_source_id);
4074 }
4075
OnFreeBuffers(const std::vector<BufferID> & target_buffers)4076 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
4077 const std::vector<BufferID>& target_buffers) {
4078 if (allowed_target_buffers_.empty())
4079 return;
4080 for (BufferID buffer : target_buffers)
4081 allowed_target_buffers_.erase(buffer);
4082 }
4083
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)4084 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
4085 const std::vector<DataSourceInstanceID>& data_sources) {
4086 PERFETTO_DCHECK_THREAD(thread_checker_);
4087 auto weak_this = weak_ptr_factory_.GetWeakPtr();
4088 task_runner_->PostTask([weak_this, data_sources] {
4089 if (weak_this) {
4090 base::StringView producer_name(weak_this->name_);
4091 weak_this->producer_->ClearIncrementalState(data_sources.data(),
4092 data_sources.size());
4093 }
4094 });
4095 }
4096
Sync(std::function<void ()> callback)4097 void TracingServiceImpl::ProducerEndpointImpl::Sync(
4098 std::function<void()> callback) {
4099 task_runner_->PostTask(callback);
4100 }
4101
4102 ////////////////////////////////////////////////////////////////////////////////
4103 // TracingServiceImpl::TracingSession implementation
4104 ////////////////////////////////////////////////////////////////////////////////
4105
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config,base::TaskRunner * task_runner)4106 TracingServiceImpl::TracingSession::TracingSession(
4107 TracingSessionID session_id,
4108 ConsumerEndpointImpl* consumer,
4109 const TraceConfig& new_config,
4110 base::TaskRunner* task_runner)
4111 : id(session_id),
4112 consumer_maybe_null(consumer),
4113 consumer_uid(consumer->uid_),
4114 config(new_config),
4115 snapshot_periodic_task(task_runner) {
4116 // all_data_sources_flushed is special because we store up to 64 events of
4117 // this type. Other events will go through the default case in
4118 // SnapshotLifecycleEvent() where they will be given a max history of 1.
4119 lifecycle_events.emplace_back(
4120 protos::pbzero::TracingServiceEvent::kAllDataSourcesFlushedFieldNumber,
4121 64 /* max_size */);
4122 }
4123
4124 } // namespace perfetto
4125