1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/tracing_service_impl.h"
18
19 #include "perfetto/base/build_config.h"
20
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <string.h>
25 #include <regex>
26 #include <unordered_set>
27
28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
29 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
30 #include <sys/uio.h>
31 #include <sys/utsname.h>
32 #include <unistd.h>
33 #endif
34
35 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
36 #include <sys/system_properties.h>
37 #endif
38
39 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
40 PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
41 PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
42 #define PERFETTO_HAS_CHMOD
43 #include <sys/stat.h>
44 #endif
45
46 #include <algorithm>
47
48 #include "perfetto/base/build_config.h"
49 #include "perfetto/base/task_runner.h"
50 #include "perfetto/ext/base/file_utils.h"
51 #include "perfetto/ext/base/metatrace.h"
52 #include "perfetto/ext/base/string_utils.h"
53 #include "perfetto/ext/base/utils.h"
54 #include "perfetto/ext/base/watchdog.h"
55 #include "perfetto/ext/tracing/core/consumer.h"
56 #include "perfetto/ext/tracing/core/observable_events.h"
57 #include "perfetto/ext/tracing/core/producer.h"
58 #include "perfetto/ext/tracing/core/shared_memory.h"
59 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
60 #include "perfetto/ext/tracing/core/trace_packet.h"
61 #include "perfetto/ext/tracing/core/trace_writer.h"
62 #include "perfetto/protozero/scattered_heap_buffer.h"
63 #include "perfetto/protozero/static_buffer.h"
64 #include "perfetto/tracing/core/data_source_descriptor.h"
65 #include "perfetto/tracing/core/tracing_service_capabilities.h"
66 #include "perfetto/tracing/core/tracing_service_state.h"
67 #include "src/tracing/core/packet_stream_validator.h"
68 #include "src/tracing/core/shared_memory_arbiter_impl.h"
69 #include "src/tracing/core/trace_buffer.h"
70
71 #include "protos/perfetto/common/builtin_clock.gen.h"
72 #include "protos/perfetto/common/builtin_clock.pbzero.h"
73 #include "protos/perfetto/common/trace_stats.pbzero.h"
74 #include "protos/perfetto/config/trace_config.pbzero.h"
75 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
76 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
77 #include "protos/perfetto/trace/system_info.pbzero.h"
78 #include "protos/perfetto/trace/trace_packet.pbzero.h"
79 #include "protos/perfetto/trace/trigger.pbzero.h"
80
81 // General note: this class must assume that Producers are malicious and will
82 // try to crash / exploit this class. We can trust pointers because they come
83 // from the IPC layer, but we should never assume that that the producer calls
84 // come in the right order or their arguments are sane / within bounds.
85
86 namespace perfetto {
87
88 namespace {
89 constexpr int kMaxBuffersPerConsumer = 128;
90 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
91 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
92 constexpr int kMaxConcurrentTracingSessions = 15;
93 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
94 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
95 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
96
97 constexpr uint32_t kMillisPerHour = 3600000;
98 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
99
100 // These apply only if enable_extra_guardrails is true.
101 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
102 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
103
104 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
105 struct iovec {
106 void* iov_base; // Address
107 size_t iov_len; // Block size
108 };
109
110 // Simple implementation of writev. Note that this does not give the atomicity
111 // guarantees of a real writev, but we don't depend on these (we aren't writing
112 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)113 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
114 ssize_t total_size = 0;
115 for (int i = 0; i < iovcnt; ++i) {
116 ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
117 if (current_size != static_cast<ssize_t>(iov[i].iov_len))
118 return -1;
119 total_size += current_size;
120 }
121 return total_size;
122 }
123
124 #define IOV_MAX 1024 // Linux compatible limit.
125
126 // uid checking is a NOP on Windows.
getuid()127 uid_t getuid() {
128 return 0;
129 }
geteuid()130 uid_t geteuid() {
131 return 0;
132 }
133 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
134 // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
135
136 // Partially encodes a CommitDataRequest in an int32 for the purposes of
137 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
138 // (which is technically 16 bits wide).
139 //
140 // Format (by bit range):
141 // [ 31 ][ 30 ][ 29:20 ][ 19:10 ][ 9:0]
142 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)143 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
144 const CommitDataRequest& req_untrusted) {
145 uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
146 uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
147 uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
148
149 uint32_t mask = (1 << 10) - 1;
150 uint32_t acc = 0;
151 acc |= has_flush_id << 30;
152 acc |= (cpatch & mask) << 20;
153 acc |= (cmov & mask) << 10;
154 acc |= (producer_id & mask);
155 return static_cast<int32_t>(acc);
156 }
157
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)158 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
159 std::vector<uint8_t> packet) {
160 Slice slice = Slice::Allocate(packet.size());
161 memcpy(slice.own_data(), packet.data(), packet.size());
162 packets->emplace_back();
163 packets->back().AddSlice(std::move(slice));
164 }
165
EnsureValidShmSizes(size_t shm_size,size_t page_size)166 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
167 size_t shm_size,
168 size_t page_size) {
169 // Theoretically the max page size supported by the ABI is 64KB.
170 // However, the current implementation of TraceBuffer (the non-shared
171 // userspace buffer where the service copies data) supports at most
172 // 32K. Setting 64K "works" from the producer<>consumer viewpoint
173 // but then causes the data to be discarded when copying it into
174 // TraceBuffer.
175 constexpr size_t kMaxPageSize = 32 * 1024;
176 static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
177
178 if (page_size == 0)
179 page_size = TracingServiceImpl::kDefaultShmPageSize;
180 if (shm_size == 0)
181 shm_size = TracingServiceImpl::kDefaultShmSize;
182
183 page_size = std::min<size_t>(page_size, kMaxPageSize);
184 shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
185
186 // Page size has to be multiple of system's page size.
187 bool page_size_is_valid = page_size >= base::kPageSize;
188 page_size_is_valid &= page_size % base::kPageSize == 0;
189
190 // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
191 size_t num_pages = page_size / base::kPageSize;
192 page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
193
194 if (!page_size_is_valid || shm_size < page_size ||
195 shm_size % page_size != 0) {
196 return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
197 TracingServiceImpl::kDefaultShmPageSize);
198 }
199 return std::make_tuple(shm_size, page_size);
200 }
201
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)202 bool NameMatchesFilter(const std::string& name,
203 const std::vector<std::string>& name_filter,
204 const std::vector<std::string>& name_regex_filter) {
205 bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
206 if (!filter_is_set)
207 return true;
208 bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
209 name) != name_filter.end();
210 bool filter_regex_matches =
211 std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
212 [&](const std::string& regex) {
213 return std::regex_match(
214 name, std::regex(regex, std::regex::extended));
215 }) != name_regex_filter.end();
216 return filter_matches || filter_regex_matches;
217 }
218
219 // Used when write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path)220 base::ScopedFile CreateTraceFile(const std::string& path) {
221 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
222 static const char kBase[] = "/data/misc/perfetto-traces/";
223 if (!base::StartsWith(path, kBase) || path.rfind('/') != strlen(kBase) - 1) {
224 PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
225 path.c_str(), kBase);
226 return base::ScopedFile();
227 }
228 #endif
229 // O_CREAT | O_EXCL will fail if the file exists already.
230 auto fd = base::OpenFile(path, O_RDWR | O_CREAT | O_EXCL, 0600);
231 if (!fd)
232 PERFETTO_PLOG("Failed to create %s", path.c_str());
233 #if defined(PERFETTO_HAS_CHMOD)
234 // Passing 0644 directly above won't work because of umask.
235 PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
236 #endif
237 return fd;
238 }
239
240 } // namespace
241
242 // These constants instead are defined in the header because are used by tests.
243 constexpr size_t TracingServiceImpl::kDefaultShmSize;
244 constexpr size_t TracingServiceImpl::kDefaultShmPageSize;
245
246 constexpr size_t TracingServiceImpl::kMaxShmSize;
247 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
248 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
249
250 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)251 std::unique_ptr<TracingService> TracingService::CreateInstance(
252 std::unique_ptr<SharedMemory::Factory> shm_factory,
253 base::TaskRunner* task_runner) {
254 return std::unique_ptr<TracingService>(
255 new TracingServiceImpl(std::move(shm_factory), task_runner));
256 }
257
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)258 TracingServiceImpl::TracingServiceImpl(
259 std::unique_ptr<SharedMemory::Factory> shm_factory,
260 base::TaskRunner* task_runner)
261 : task_runner_(task_runner),
262 shm_factory_(std::move(shm_factory)),
263 uid_(getuid()),
264 buffer_ids_(kMaxTraceBufferID),
265 weak_ptr_factory_(this) {
266 PERFETTO_DCHECK(task_runner_);
267 }
268
~TracingServiceImpl()269 TracingServiceImpl::~TracingServiceImpl() {
270 // TODO(fmayer): handle teardown of all Producer.
271 }
272
273 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm)274 TracingServiceImpl::ConnectProducer(Producer* producer,
275 uid_t uid,
276 const std::string& producer_name,
277 size_t shared_memory_size_hint_bytes,
278 bool in_process,
279 ProducerSMBScrapingMode smb_scraping_mode,
280 size_t shared_memory_page_size_hint_bytes,
281 std::unique_ptr<SharedMemory> shm) {
282 PERFETTO_DCHECK_THREAD(thread_checker_);
283
284 if (lockdown_mode_ && uid != geteuid()) {
285 PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
286 static_cast<unsigned long>(uid));
287 return nullptr;
288 }
289
290 if (producers_.size() >= kMaxProducerID) {
291 PERFETTO_DFATAL("Too many producers.");
292 return nullptr;
293 }
294 const ProducerID id = GetNextProducerID();
295 PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
296
297 bool smb_scraping_enabled = smb_scraping_enabled_;
298 switch (smb_scraping_mode) {
299 case ProducerSMBScrapingMode::kDefault:
300 break;
301 case ProducerSMBScrapingMode::kEnabled:
302 smb_scraping_enabled = true;
303 break;
304 case ProducerSMBScrapingMode::kDisabled:
305 smb_scraping_enabled = false;
306 break;
307 }
308
309 std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
310 id, uid, this, task_runner_, producer, producer_name, in_process,
311 smb_scraping_enabled));
312 auto it_and_inserted = producers_.emplace(id, endpoint.get());
313 PERFETTO_DCHECK(it_and_inserted.second);
314 endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
315 endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
316
317 // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
318 // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
319 task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
320
321 if (shm) {
322 // The producer supplied an SMB. This is used only by Chrome; in the most
323 // common cases the SMB is created by the service and passed via
324 // OnTracingSetup(). Verify that it is correctly sized before we attempt to
325 // use it. The transport layer has to verify the integrity of the SMB (e.g.
326 // ensure that the producer can't resize if after the fact).
327 size_t shm_size, page_size;
328 std::tie(shm_size, page_size) =
329 EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
330 if (shm_size == shm->size() &&
331 page_size == endpoint->shmem_page_size_hint_bytes_) {
332 PERFETTO_DLOG(
333 "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
334 shm_size / 1024, endpoint->name_.c_str());
335 endpoint->SetupSharedMemory(std::move(shm), page_size,
336 /*provided_by_producer=*/true);
337 } else {
338 PERFETTO_LOG(
339 "Discarding incorrectly sized producer-provided SMB for producer "
340 "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
341 "B total, %zu B page size; suggested corrected sizes: %zu B total, "
342 "%zu B page size",
343 endpoint->name_.c_str(), shm->size(),
344 endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
345 shm.reset();
346 }
347 }
348
349 return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
350 }
351
DisconnectProducer(ProducerID id)352 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
353 PERFETTO_DCHECK_THREAD(thread_checker_);
354 PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
355 PERFETTO_DCHECK(producers_.count(id));
356
357 // Scrape remaining chunks for this producer to ensure we don't lose data.
358 if (auto* producer = GetProducer(id)) {
359 for (auto& session_id_and_session : tracing_sessions_)
360 ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
361 }
362
363 for (auto it = data_sources_.begin(); it != data_sources_.end();) {
364 auto next = it;
365 next++;
366 if (it->second.producer_id == id)
367 UnregisterDataSource(id, it->second.descriptor.name());
368 it = next;
369 }
370
371 producers_.erase(id);
372 UpdateMemoryGuardrail();
373 }
374
GetProducer(ProducerID id) const375 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
376 ProducerID id) const {
377 PERFETTO_DCHECK_THREAD(thread_checker_);
378 auto it = producers_.find(id);
379 if (it == producers_.end())
380 return nullptr;
381 return it->second;
382 }
383
384 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)385 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
386 PERFETTO_DCHECK_THREAD(thread_checker_);
387 PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
388 std::unique_ptr<ConsumerEndpointImpl> endpoint(
389 new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
390 auto it_and_inserted = consumers_.emplace(endpoint.get());
391 PERFETTO_DCHECK(it_and_inserted.second);
392 // Consumer might go away before we're able to send the connect notification,
393 // if that is the case just bail out.
394 auto weak_ptr = endpoint->GetWeakPtr();
395 task_runner_->PostTask([weak_ptr] {
396 if (!weak_ptr) {
397 return;
398 }
399 weak_ptr->consumer_->OnConnect();
400 });
401 return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
402 }
403
DisconnectConsumer(ConsumerEndpointImpl * consumer)404 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
405 PERFETTO_DCHECK_THREAD(thread_checker_);
406 PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
407 PERFETTO_DCHECK(consumers_.count(consumer));
408
409 // TODO(primiano) : Check that this is safe (what happens if there are
410 // ReadBuffers() calls posted in the meantime? They need to become noop).
411 if (consumer->tracing_session_id_)
412 FreeBuffers(consumer->tracing_session_id_); // Will also DisableTracing().
413 consumers_.erase(consumer);
414
415 // At this point no more pointers to |consumer| should be around.
416 PERFETTO_DCHECK(!std::any_of(
417 tracing_sessions_.begin(), tracing_sessions_.end(),
418 [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
419 return kv.second.consumer_maybe_null == consumer;
420 }));
421 }
422
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)423 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
424 const std::string& key) {
425 PERFETTO_DCHECK_THREAD(thread_checker_);
426 PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
427 PERFETTO_DCHECK(consumers_.count(consumer));
428
429 TracingSessionID tsid = consumer->tracing_session_id_;
430 TracingSession* tracing_session;
431 if (!tsid || !(tracing_session = GetTracingSession(tsid)))
432 return false;
433
434 if (GetDetachedSession(consumer->uid_, key)) {
435 PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
436 key.c_str());
437 return false;
438 }
439
440 PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
441 tracing_session->consumer_maybe_null = nullptr;
442 tracing_session->detach_key = key;
443 consumer->tracing_session_id_ = 0;
444 return true;
445 }
446
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)447 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
448 const std::string& key) {
449 PERFETTO_DCHECK_THREAD(thread_checker_);
450 PERFETTO_DLOG("Consumer %p attaching to session %s",
451 reinterpret_cast<void*>(consumer), key.c_str());
452 PERFETTO_DCHECK(consumers_.count(consumer));
453
454 if (consumer->tracing_session_id_) {
455 PERFETTO_ELOG(
456 "Cannot reattach consumer to session %s"
457 " while it already attached tracing session ID %" PRIu64,
458 key.c_str(), consumer->tracing_session_id_);
459 return false;
460 }
461
462 auto* tracing_session = GetDetachedSession(consumer->uid_, key);
463 if (!tracing_session) {
464 PERFETTO_ELOG(
465 "Failed to attach consumer, session '%s' not found for uid %d",
466 key.c_str(), static_cast<int>(consumer->uid_));
467 return false;
468 }
469
470 consumer->tracing_session_id_ = tracing_session->id;
471 tracing_session->consumer_maybe_null = consumer;
472 tracing_session->detach_key.clear();
473 return true;
474 }
475
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)476 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
477 const TraceConfig& cfg,
478 base::ScopedFile fd) {
479 PERFETTO_DCHECK_THREAD(thread_checker_);
480 PERFETTO_DLOG("Enabling tracing for consumer %p",
481 reinterpret_cast<void*>(consumer));
482 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
483 lockdown_mode_ = true;
484 if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
485 lockdown_mode_ = false;
486 TracingSession* tracing_session =
487 GetTracingSession(consumer->tracing_session_id_);
488 if (tracing_session) {
489 PERFETTO_DLOG(
490 "A Consumer is trying to EnableTracing() but another tracing session "
491 "is already active (forgot a call to FreeBuffers() ?)");
492 return false;
493 }
494
495 const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
496 ? kGuardrailsMaxTracingDurationMillis
497 : kMaxTracingDurationMillis;
498 if (cfg.duration_ms() > max_duration_ms) {
499 PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms > %" PRIu32 " ms)",
500 cfg.duration_ms(), max_duration_ms);
501 return false;
502 }
503
504 const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
505 TraceConfig::TriggerConfig::UNSPECIFIED;
506 if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
507 cfg.trigger_config().trigger_timeout_ms() >
508 kGuardrailsMaxTracingDurationMillis)) {
509 PERFETTO_ELOG(
510 "Traces with START_TRACING triggers must provide a positive "
511 "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
512 cfg.trigger_config().trigger_timeout_ms());
513 return false;
514 }
515
516 if (has_trigger_config && cfg.duration_ms() != 0) {
517 PERFETTO_ELOG(
518 "duration_ms was set, this must not be set for traces with triggers.");
519 return false;
520 }
521
522 std::unordered_set<std::string> triggers;
523 for (const auto& trigger : cfg.trigger_config().triggers()) {
524 if (!triggers.insert(trigger.name()).second) {
525 PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
526 return false;
527 }
528 }
529
530 if (cfg.enable_extra_guardrails()) {
531 if (cfg.deferred_start()) {
532 PERFETTO_ELOG(
533 "deferred_start=true is not supported in unsupervised traces");
534 return false;
535 }
536 uint64_t buf_size_sum = 0;
537 for (const auto& buf : cfg.buffers())
538 buf_size_sum += buf.size_kb();
539 if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
540 PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
541 "kB > %" PRIu32 " kB)",
542 buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
543 return false;
544 }
545 }
546
547 if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
548 PERFETTO_ELOG("Too many buffers configured (%d)", cfg.buffers_size());
549 return false;
550 }
551
552 if (!cfg.unique_session_name().empty()) {
553 const std::string& name = cfg.unique_session_name();
554 for (auto& kv : tracing_sessions_) {
555 if (kv.second.config.unique_session_name() == name) {
556 PERFETTO_ELOG(
557 "A trace with this unique session name (%s) already exists",
558 name.c_str());
559 return false;
560 }
561 }
562 }
563
564 if (cfg.enable_extra_guardrails()) {
565 // unique_session_name can be empty
566 const std::string& name = cfg.unique_session_name();
567 int64_t now_s = base::GetBootTimeS().count();
568
569 // Remove any entries where the time limit has passed so this map doesn't
570 // grow indefinitely:
571 std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
572 for (auto it = sessions.cbegin(); it != sessions.cend();) {
573 if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
574 it = sessions.erase(it);
575 } else {
576 ++it;
577 }
578 }
579
580 int64_t& previous_s = session_to_last_trace_s_[name];
581 if (previous_s == 0) {
582 previous_s = now_s;
583 } else {
584 PERFETTO_ELOG(
585 "A trace with unique session name \"%s\" began less than %" PRId64
586 "s ago (%" PRId64 "s)",
587 name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
588 return false;
589 }
590 }
591
592 const long sessions_for_uid = std::count_if(
593 tracing_sessions_.begin(), tracing_sessions_.end(),
594 [consumer](const decltype(tracing_sessions_)::value_type& s) {
595 return s.second.consumer_uid == consumer->uid_;
596 });
597
598 int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
599 if (consumer->uid_ == 1066 /* AID_STATSD*/) {
600 per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
601 }
602 if (sessions_for_uid >= per_uid_limit) {
603 PERFETTO_ELOG(
604 "Too many concurrent tracing sesions (%ld) for uid %d limit is %d",
605 sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
606 return false;
607 }
608
609 // TODO(primiano): This is a workaround to prevent that a producer gets stuck
610 // in a state where it stalls by design by having more TraceWriterImpl
611 // instances than free pages in the buffer. This is really a bug in
612 // trace_probes and the way it handles stalls in the shmem buffer.
613 if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
614 PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
615 tracing_sessions_.size());
616 return false;
617 }
618
619 const TracingSessionID tsid = ++last_tracing_session_id_;
620 tracing_session =
621 &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
622 .first->second;
623
624 if (cfg.write_into_file()) {
625 if (!fd ^ !cfg.output_path().empty()) {
626 PERFETTO_ELOG(
627 "When write_into_file==true either a FD needs to be passed or "
628 "output_path must be populated (but not both)");
629 tracing_sessions_.erase(tsid);
630 return false;
631 }
632 if (!cfg.output_path().empty()) {
633 fd = CreateTraceFile(cfg.output_path());
634 if (!fd) {
635 tracing_sessions_.erase(tsid);
636 return false;
637 }
638 }
639 tracing_session->write_into_file = std::move(fd);
640 uint32_t write_period_ms = cfg.file_write_period_ms();
641 if (write_period_ms == 0)
642 write_period_ms = kDefaultWriteIntoFilePeriodMs;
643 if (write_period_ms < min_write_period_ms_)
644 write_period_ms = min_write_period_ms_;
645 tracing_session->write_period_ms = write_period_ms;
646 tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
647 tracing_session->bytes_written_into_file = 0;
648 }
649
650 // Initialize the log buffers.
651 bool did_allocate_all_buffers = true;
652
653 // Allocate the trace buffers. Also create a map to translate a consumer
654 // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
655 // corresponding BufferID, which is a global ID namespace for the service and
656 // all producers.
657 size_t total_buf_size_kb = 0;
658 const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
659 tracing_session->buffers_index.reserve(num_buffers);
660 for (size_t i = 0; i < num_buffers; i++) {
661 const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
662 BufferID global_id = buffer_ids_.Allocate();
663 if (!global_id) {
664 did_allocate_all_buffers = false; // We ran out of IDs.
665 break;
666 }
667 tracing_session->buffers_index.push_back(global_id);
668 const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
669 total_buf_size_kb += buffer_cfg.size_kb();
670 TraceBuffer::OverwritePolicy policy =
671 buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
672 ? TraceBuffer::kDiscard
673 : TraceBuffer::kOverwrite;
674 auto it_and_inserted = buffers_.emplace(
675 global_id, TraceBuffer::Create(buf_size_bytes, policy));
676 PERFETTO_DCHECK(it_and_inserted.second); // buffers_.count(global_id) == 0.
677 std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
678 if (!trace_buffer) {
679 did_allocate_all_buffers = false;
680 break;
681 }
682 }
683
684 UpdateMemoryGuardrail();
685
686 // This can happen if either:
687 // - All the kMaxTraceBufferID slots are taken.
688 // - OOM, or, more relistically, we exhausted virtual memory.
689 // In any case, free all the previously allocated buffers and abort.
690 // TODO(fmayer): add a test to cover this case, this is quite subtle.
691 if (!did_allocate_all_buffers) {
692 for (BufferID global_id : tracing_session->buffers_index) {
693 buffer_ids_.Free(global_id);
694 buffers_.erase(global_id);
695 }
696 tracing_sessions_.erase(tsid);
697 return false;
698 }
699
700 consumer->tracing_session_id_ = tsid;
701
702 // Setup the data sources on the producers without starting them.
703 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
704 // Scan all the registered data sources with a matching name.
705 auto range = data_sources_.equal_range(cfg_data_source.config().name());
706 for (auto it = range.first; it != range.second; it++) {
707 TraceConfig::ProducerConfig producer_config;
708 for (auto& config : cfg.producers()) {
709 if (GetProducer(it->second.producer_id)->name_ ==
710 config.producer_name()) {
711 producer_config = config;
712 break;
713 }
714 }
715 SetupDataSource(cfg_data_source, producer_config, it->second,
716 tracing_session);
717 }
718 }
719
720 bool has_start_trigger = false;
721 auto weak_this = weak_ptr_factory_.GetWeakPtr();
722 switch (cfg.trigger_config().trigger_mode()) {
723 case TraceConfig::TriggerConfig::UNSPECIFIED:
724 // no triggers are specified so this isn't a trace that is using triggers.
725 PERFETTO_DCHECK(!has_trigger_config);
726 break;
727 case TraceConfig::TriggerConfig::START_TRACING:
728 // For traces which use START_TRACE triggers we need to ensure that the
729 // tracing session will be cleaned up when it times out.
730 has_start_trigger = true;
731 task_runner_->PostDelayedTask(
732 [weak_this, tsid]() {
733 if (weak_this)
734 weak_this->OnStartTriggersTimeout(tsid);
735 },
736 cfg.trigger_config().trigger_timeout_ms());
737 break;
738 case TraceConfig::TriggerConfig::STOP_TRACING:
739 // Update the tracing_session's duration_ms to ensure that if no trigger
740 // is received the session will end and be cleaned up equal to the
741 // timeout.
742 //
743 // TODO(nuskos): Refactor this so that rather then modifying the config we
744 // have a field we look at on the tracing_session.
745 tracing_session->config.set_duration_ms(
746 cfg.trigger_config().trigger_timeout_ms());
747 break;
748 }
749
750 tracing_session->state = TracingSession::CONFIGURED;
751 PERFETTO_LOG(
752 "Configured tracing session %" PRIu64
753 ", #sources:%zu, duration:%d ms, #buffers:%d, total "
754 "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
755 tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
756 cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
757 static_cast<unsigned int>(consumer->uid_),
758 cfg.unique_session_name().c_str());
759
760 // Start the data sources, unless this is a case of early setup + fast
761 // triggering, either through TraceConfig.deferred_start or
762 // TraceConfig.trigger_config(). If both are specified which ever one occurs
763 // first will initiate the trace.
764 if (!cfg.deferred_start() && !has_start_trigger)
765 return StartTracing(tsid);
766
767 return true;
768 }
769
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)770 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
771 const TraceConfig& updated_cfg) {
772 PERFETTO_DCHECK_THREAD(thread_checker_);
773 TracingSession* tracing_session =
774 GetTracingSession(consumer->tracing_session_id_);
775 PERFETTO_DCHECK(tracing_session);
776
777 if ((tracing_session->state != TracingSession::STARTED) &&
778 (tracing_session->state != TracingSession::CONFIGURED)) {
779 PERFETTO_ELOG(
780 "ChangeTraceConfig() was called for a tracing session which isn't "
781 "running.");
782 return;
783 }
784
785 // We only support updating producer_name_{,regex}_filter (and pass-through
786 // configs) for now; null out any changeable fields and make sure the rest are
787 // identical.
788 TraceConfig new_config_copy(updated_cfg);
789 for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
790 ds_cfg.clear_producer_name_filter();
791 ds_cfg.clear_producer_name_regex_filter();
792 }
793
794 TraceConfig current_config_copy(tracing_session->config);
795 for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
796 ds_cfg.clear_producer_name_filter();
797 ds_cfg.clear_producer_name_regex_filter();
798 }
799
800 if (new_config_copy != current_config_copy) {
801 PERFETTO_LOG(
802 "ChangeTraceConfig() was called with a config containing unsupported "
803 "changes; only adding to the producer_name_{,regex}_filter is "
804 "currently supported and will have an effect.");
805 }
806
807 for (TraceConfig::DataSource& cfg_data_source :
808 *tracing_session->config.mutable_data_sources()) {
809 // Find the updated producer_filter in the new config.
810 std::vector<std::string> new_producer_name_filter;
811 std::vector<std::string> new_producer_name_regex_filter;
812 bool found_data_source = false;
813 for (auto it : updated_cfg.data_sources()) {
814 if (cfg_data_source.config().name() == it.config().name()) {
815 new_producer_name_filter = it.producer_name_filter();
816 new_producer_name_regex_filter = it.producer_name_regex_filter();
817 found_data_source = true;
818 break;
819 }
820 }
821
822 // Bail out if data source not present in the new config.
823 if (!found_data_source) {
824 PERFETTO_ELOG(
825 "ChangeTraceConfig() called without a current data source also "
826 "present in the new config: %s",
827 cfg_data_source.config().name().c_str());
828 continue;
829 }
830
831 // TODO(oysteine): Just replacing the filter means that if
832 // there are any filter entries which were present in the original config,
833 // but removed from the config passed to ChangeTraceConfig, any matching
834 // producers will keep producing but newly added producers after this
835 // point will never start.
836 *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
837 *cfg_data_source.mutable_producer_name_regex_filter() =
838 new_producer_name_regex_filter;
839
840 // Scan all the registered data sources with a matching name.
841 auto range = data_sources_.equal_range(cfg_data_source.config().name());
842 for (auto it = range.first; it != range.second; it++) {
843 ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
844 PERFETTO_DCHECK(producer);
845
846 // Check if the producer name of this data source is present
847 // in the name filters. We currently only support new filters, not
848 // removing old ones.
849 if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
850 new_producer_name_regex_filter)) {
851 continue;
852 }
853
854 bool already_setup = false;
855 auto& ds_instances = tracing_session->data_source_instances;
856 for (auto instance_it = ds_instances.begin();
857 instance_it != ds_instances.end(); ++instance_it) {
858 if (instance_it->first == it->second.producer_id &&
859 instance_it->second.data_source_name ==
860 cfg_data_source.config().name()) {
861 already_setup = true;
862 break;
863 }
864 }
865
866 if (already_setup)
867 continue;
868
869 // If it wasn't previously setup, set it up now.
870 // (The per-producer config is optional).
871 TraceConfig::ProducerConfig producer_config;
872 for (auto& config : tracing_session->config.producers()) {
873 if (producer->name_ == config.producer_name()) {
874 producer_config = config;
875 break;
876 }
877 }
878
879 DataSourceInstance* ds_inst = SetupDataSource(
880 cfg_data_source, producer_config, it->second, tracing_session);
881
882 if (ds_inst && tracing_session->state == TracingSession::STARTED)
883 StartDataSourceInstance(producer, tracing_session, ds_inst);
884 }
885 }
886 }
887
StartTracing(TracingSessionID tsid)888 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
889 PERFETTO_DCHECK_THREAD(thread_checker_);
890 TracingSession* tracing_session = GetTracingSession(tsid);
891 if (!tracing_session) {
892 PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
893 return false;
894 }
895
896 if (tracing_session->state != TracingSession::CONFIGURED) {
897 PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
898 tracing_session->state);
899 return false;
900 }
901
902 tracing_session->state = TracingSession::STARTED;
903
904 // Periodically snapshot clocks, stats, sync markers while the trace is
905 // active. The snapshots are emitted on the future ReadBuffers() calls, which
906 // means that:
907 // (a) If we're streaming to a file (or to a consumer) while tracing, we
908 // write snapshots periodically into the trace.
909 // (b) If ReadBuffers() is only called after tracing ends, we emit the latest
910 // snapshot into the trace. For clock snapshots, we keep track of a
911 // snapshot recorded at the beginning of the session as well as the most
912 // recent snapshot that showed significant new drift between different
913 // clocks. This way, we emit useful snapshots for both stop-when-full and
914 // ring-buffer tracing modes.
915 PeriodicSnapshotTask(tracing_session, /*is_initial_snapshot=*/true);
916
917 // Trigger delayed task if the trace is time limited.
918 const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
919 if (trace_duration_ms > 0) {
920 auto weak_this = weak_ptr_factory_.GetWeakPtr();
921 task_runner_->PostDelayedTask(
922 [weak_this, tsid] {
923 // Skip entirely the flush if the trace session doesn't exist anymore.
924 // This is to prevent misleading error messages to be logged.
925 if (!weak_this)
926 return;
927 auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
928 if (!tracing_session_ptr)
929 return;
930 // If this trace was using STOP_TRACING triggers and we've seen
931 // one, then the trigger overrides the normal timeout. In this
932 // case we just return and let the other task clean up this trace.
933 if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
934 TraceConfig::TriggerConfig::STOP_TRACING &&
935 !tracing_session_ptr->received_triggers.empty())
936 return;
937 // In all other cases (START_TRACING or no triggers) we flush
938 // after |trace_duration_ms| unconditionally.
939 weak_this->FlushAndDisableTracing(tsid);
940 },
941 trace_duration_ms);
942 }
943
944 // Start the periodic drain tasks if we should to save the trace into a file.
945 if (tracing_session->config.write_into_file()) {
946 auto weak_this = weak_ptr_factory_.GetWeakPtr();
947 task_runner_->PostDelayedTask(
948 [weak_this, tsid] {
949 if (weak_this)
950 weak_this->ReadBuffers(tsid, nullptr);
951 },
952 tracing_session->delay_to_next_write_period_ms());
953 }
954
955 // Start the periodic flush tasks if the config specified a flush period.
956 if (tracing_session->config.flush_period_ms())
957 PeriodicFlushTask(tsid, /*post_next_only=*/true);
958
959 // Start the periodic incremental state clear tasks if the config specified a
960 // period.
961 if (tracing_session->config.incremental_state_config().clear_period_ms()) {
962 PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
963 }
964
965 for (auto& kv : tracing_session->data_source_instances) {
966 ProducerID producer_id = kv.first;
967 DataSourceInstance& data_source = kv.second;
968 ProducerEndpointImpl* producer = GetProducer(producer_id);
969 if (!producer) {
970 PERFETTO_DFATAL("Producer does not exist.");
971 continue;
972 }
973 StartDataSourceInstance(producer, tracing_session, &data_source);
974 }
975 return true;
976 }
977
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)978 void TracingServiceImpl::StartDataSourceInstance(
979 ProducerEndpointImpl* producer,
980 TracingSession* tracing_session,
981 TracingServiceImpl::DataSourceInstance* instance) {
982 PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
983 if (instance->will_notify_on_start) {
984 instance->state = DataSourceInstance::STARTING;
985 } else {
986 instance->state = DataSourceInstance::STARTED;
987 }
988 if (tracing_session->consumer_maybe_null) {
989 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
990 *producer, *instance);
991 }
992 producer->StartDataSource(instance->instance_id, instance->config);
993
994 // If all data sources are started, notify the consumer.
995 if (instance->state == DataSourceInstance::STARTED)
996 MaybeNotifyAllDataSourcesStarted(tracing_session);
997 }
998
999 // DisableTracing just stops the data sources but doesn't free up any buffer.
1000 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1001 // and then drain the buffers. The actual teardown of the TracingSession happens
1002 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1003 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1004 bool disable_immediately) {
1005 PERFETTO_DCHECK_THREAD(thread_checker_);
1006 TracingSession* tracing_session = GetTracingSession(tsid);
1007 if (!tracing_session) {
1008 // Can happen if the consumer calls this before EnableTracing() or after
1009 // FreeBuffers().
1010 PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1011 return;
1012 }
1013
1014 switch (tracing_session->state) {
1015 // Spurious call to DisableTracing() while already disabled, nothing to do.
1016 case TracingSession::DISABLED:
1017 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1018 return;
1019
1020 // This is either:
1021 // A) The case of a graceful DisableTracing() call followed by a call to
1022 // FreeBuffers(), iff |disable_immediately| == true. In this case we want
1023 // to forcefully transition in the disabled state without waiting for the
1024 // outstanding acks because the buffers are going to be destroyed soon.
1025 // B) A spurious call, iff |disable_immediately| == false, in which case
1026 // there is nothing to do.
1027 case TracingSession::DISABLING_WAITING_STOP_ACKS:
1028 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1029 if (disable_immediately)
1030 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1031 return;
1032
1033 // Continues below.
1034 case TracingSession::CONFIGURED:
1035 // If the session didn't even start there is no need to orchestrate a
1036 // graceful stop of data sources.
1037 disable_immediately = true;
1038 break;
1039
1040 // This is the nominal case, continues below.
1041 case TracingSession::STARTED:
1042 break;
1043 }
1044
1045 for (auto& data_source_inst : tracing_session->data_source_instances) {
1046 const ProducerID producer_id = data_source_inst.first;
1047 DataSourceInstance& instance = data_source_inst.second;
1048 ProducerEndpointImpl* producer = GetProducer(producer_id);
1049 PERFETTO_DCHECK(producer);
1050 PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1051 instance.state == DataSourceInstance::STARTING ||
1052 instance.state == DataSourceInstance::STARTED);
1053 StopDataSourceInstance(producer, tracing_session, &instance,
1054 disable_immediately);
1055 }
1056
1057 // Either this request is flagged with |disable_immediately| or there are no
1058 // data sources that are requesting a final handshake. In both cases just mark
1059 // the session as disabled immediately, notify the consumer and flush the
1060 // trace file (if used).
1061 if (tracing_session->AllDataSourceInstancesStopped())
1062 return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1063
1064 tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1065 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1066 task_runner_->PostDelayedTask(
1067 [weak_this, tsid] {
1068 if (weak_this)
1069 weak_this->OnDisableTracingTimeout(tsid);
1070 },
1071 tracing_session->data_source_stop_timeout_ms());
1072
1073 // Deliberately NOT removing the session from |tracing_session_|, it's still
1074 // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1075 }
1076
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1077 void TracingServiceImpl::NotifyDataSourceStarted(
1078 ProducerID producer_id,
1079 DataSourceInstanceID instance_id) {
1080 PERFETTO_DCHECK_THREAD(thread_checker_);
1081 for (auto& kv : tracing_sessions_) {
1082 TracingSession& tracing_session = kv.second;
1083 DataSourceInstance* instance =
1084 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1085
1086 if (!instance)
1087 continue;
1088
1089 // If the tracing session was already stopped, ignore this notification.
1090 if (tracing_session.state != TracingSession::STARTED)
1091 continue;
1092
1093 if (instance->state != DataSourceInstance::STARTING) {
1094 PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1095 instance->state);
1096 continue;
1097 }
1098
1099 instance->state = DataSourceInstance::STARTED;
1100
1101 ProducerEndpointImpl* producer = GetProducer(producer_id);
1102 PERFETTO_DCHECK(producer);
1103 if (tracing_session.consumer_maybe_null) {
1104 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1105 *producer, *instance);
1106 }
1107
1108 // If all data sources are started, notify the consumer.
1109 MaybeNotifyAllDataSourcesStarted(&tracing_session);
1110 } // for (tracing_session)
1111 }
1112
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1113 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1114 TracingSession* tracing_session) {
1115 if (!tracing_session->consumer_maybe_null)
1116 return;
1117
1118 if (!tracing_session->AllDataSourceInstancesStarted())
1119 return;
1120
1121 // In some rare cases, we can get in this state more than once. Consider the
1122 // following scenario: 3 data sources are registered -> trace starts ->
1123 // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1124 // Imagine now that a 4th data source registers while the trace is ongoing.
1125 // This would hit the AllDataSourceInstancesStarted() condition again.
1126 // In this case, however, we don't want to re-notify the consumer again.
1127 // That would be unexpected (even if, perhaps, technically correct) and
1128 // trigger bugs in the consumer.
1129 if (tracing_session->did_notify_all_data_source_started)
1130 return;
1131
1132 PERFETTO_DLOG("All data sources started");
1133 tracing_session->did_notify_all_data_source_started = true;
1134 tracing_session->time_all_data_source_started = base::GetBootTimeNs();
1135 tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1136 }
1137
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1138 void TracingServiceImpl::NotifyDataSourceStopped(
1139 ProducerID producer_id,
1140 DataSourceInstanceID instance_id) {
1141 PERFETTO_DCHECK_THREAD(thread_checker_);
1142 for (auto& kv : tracing_sessions_) {
1143 TracingSession& tracing_session = kv.second;
1144 DataSourceInstance* instance =
1145 tracing_session.GetDataSourceInstance(producer_id, instance_id);
1146
1147 if (!instance)
1148 continue;
1149
1150 if (instance->state != DataSourceInstance::STOPPING) {
1151 PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1152 instance->state);
1153 continue;
1154 }
1155
1156 instance->state = DataSourceInstance::STOPPED;
1157
1158 ProducerEndpointImpl* producer = GetProducer(producer_id);
1159 PERFETTO_DCHECK(producer);
1160 if (tracing_session.consumer_maybe_null) {
1161 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1162 *producer, *instance);
1163 }
1164
1165 if (!tracing_session.AllDataSourceInstancesStopped())
1166 continue;
1167
1168 if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1169 continue;
1170
1171 // All data sources acked the termination.
1172 DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1173 } // for (tracing_session)
1174 }
1175
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1176 void TracingServiceImpl::ActivateTriggers(
1177 ProducerID producer_id,
1178 const std::vector<std::string>& triggers) {
1179 PERFETTO_DCHECK_THREAD(thread_checker_);
1180 auto* producer = GetProducer(producer_id);
1181 PERFETTO_DCHECK(producer);
1182 for (const auto& trigger_name : triggers) {
1183 for (auto& id_and_tracing_session : tracing_sessions_) {
1184 auto& tracing_session = id_and_tracing_session.second;
1185 TracingSessionID tsid = id_and_tracing_session.first;
1186 auto iter = std::find_if(
1187 tracing_session.config.trigger_config().triggers().begin(),
1188 tracing_session.config.trigger_config().triggers().end(),
1189 [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1190 return trigger.name() == trigger_name;
1191 });
1192 if (iter == tracing_session.config.trigger_config().triggers().end()) {
1193 continue;
1194 }
1195
1196 // If this trigger requires a certain producer to have sent it
1197 // (non-empty producer_name()) ensure the producer who sent this trigger
1198 // matches.
1199 if (!iter->producer_name_regex().empty() &&
1200 !std::regex_match(
1201 producer->name_,
1202 std::regex(iter->producer_name_regex(), std::regex::extended))) {
1203 continue;
1204 }
1205
1206 const bool triggers_already_received =
1207 !tracing_session.received_triggers.empty();
1208 tracing_session.received_triggers.push_back(
1209 {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
1210 producer->name_, producer->uid_});
1211 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1212 switch (tracing_session.config.trigger_config().trigger_mode()) {
1213 case TraceConfig::TriggerConfig::START_TRACING:
1214 // If the session has already been triggered and moved past
1215 // CONFIGURED then we don't need to repeat StartTracing. This would
1216 // work fine (StartTracing would return false) but would add error
1217 // logs.
1218 if (tracing_session.state != TracingSession::CONFIGURED)
1219 break;
1220
1221 PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1222 " with duration of %" PRIu32 "ms.",
1223 iter->name().c_str(), tsid, iter->stop_delay_ms());
1224 // We override the trace duration to be the trigger's requested
1225 // value, this ensures that the trace will end after this amount
1226 // of time has passed.
1227 tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1228 StartTracing(tsid);
1229 break;
1230 case TraceConfig::TriggerConfig::STOP_TRACING:
1231 // Only stop the trace once to avoid confusing log messages. I.E.
1232 // when we've already hit the first trigger we've already Posted the
1233 // task to FlushAndDisable. So all future triggers will just break
1234 // out.
1235 if (triggers_already_received)
1236 break;
1237
1238 PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1239 " with duration of %" PRIu32 "ms.",
1240 iter->name().c_str(), tsid, iter->stop_delay_ms());
1241 // Now that we've seen a trigger we need to stop, flush, and disable
1242 // this session after the configured |stop_delay_ms|.
1243 task_runner_->PostDelayedTask(
1244 [weak_this, tsid] {
1245 // Skip entirely the flush if the trace session doesn't exist
1246 // anymore. This is to prevent misleading error messages to be
1247 // logged.
1248 if (weak_this && weak_this->GetTracingSession(tsid))
1249 weak_this->FlushAndDisableTracing(tsid);
1250 },
1251 // If this trigger is zero this will immediately executable and
1252 // will happen shortly.
1253 iter->stop_delay_ms());
1254 break;
1255 case TraceConfig::TriggerConfig::UNSPECIFIED:
1256 PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1257 break;
1258 }
1259 }
1260 }
1261 }
1262
1263 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
1264 // conditions all data sources should have acked the stop and this will early
1265 // out.
OnDisableTracingTimeout(TracingSessionID tsid)1266 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1267 PERFETTO_DCHECK_THREAD(thread_checker_);
1268 TracingSession* tracing_session = GetTracingSession(tsid);
1269 if (!tracing_session ||
1270 tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1271 return; // Tracing session was successfully disabled.
1272 }
1273
1274 PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1275 tsid);
1276 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1277 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1278 }
1279
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1280 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1281 TracingSession* tracing_session) {
1282 PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1283 for (auto& inst_kv : tracing_session->data_source_instances) {
1284 if (inst_kv.second.state == DataSourceInstance::STOPPED)
1285 continue;
1286 inst_kv.second.state = DataSourceInstance::STOPPED;
1287 ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1288 PERFETTO_DCHECK(producer);
1289 if (tracing_session->consumer_maybe_null) {
1290 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1291 *producer, inst_kv.second);
1292 }
1293 }
1294 tracing_session->state = TracingSession::DISABLED;
1295
1296 // Scrape any remaining chunks that weren't flushed by the producers.
1297 for (auto& producer_id_and_producer : producers_)
1298 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1299
1300 if (tracing_session->write_into_file) {
1301 tracing_session->write_period_ms = 0;
1302 ReadBuffers(tracing_session->id, nullptr);
1303 }
1304
1305 if (tracing_session->consumer_maybe_null)
1306 tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
1307 }
1308
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1309 void TracingServiceImpl::Flush(TracingSessionID tsid,
1310 uint32_t timeout_ms,
1311 ConsumerEndpoint::FlushCallback callback) {
1312 PERFETTO_DCHECK_THREAD(thread_checker_);
1313 TracingSession* tracing_session = GetTracingSession(tsid);
1314 if (!tracing_session) {
1315 PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1316 return;
1317 }
1318
1319 if (!timeout_ms)
1320 timeout_ms = tracing_session->flush_timeout_ms();
1321
1322 if (tracing_session->pending_flushes.size() > 1000) {
1323 PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1324 tracing_session->pending_flushes.size());
1325 callback(false);
1326 return;
1327 }
1328
1329 FlushRequestID flush_request_id = ++last_flush_request_id_;
1330 PendingFlush& pending_flush =
1331 tracing_session->pending_flushes
1332 .emplace_hint(tracing_session->pending_flushes.end(),
1333 flush_request_id, PendingFlush(std::move(callback)))
1334 ->second;
1335
1336 // Send a flush request to each producer involved in the tracing session. In
1337 // order to issue a flush request we have to build a map of all data source
1338 // instance ids enabled for each producer.
1339 std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1340 for (const auto& data_source_inst : tracing_session->data_source_instances) {
1341 const ProducerID producer_id = data_source_inst.first;
1342 const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1343 flush_map[producer_id].push_back(ds_inst_id);
1344 }
1345
1346 for (const auto& kv : flush_map) {
1347 ProducerID producer_id = kv.first;
1348 ProducerEndpointImpl* producer = GetProducer(producer_id);
1349 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1350 producer->Flush(flush_request_id, data_sources);
1351 pending_flush.producers.insert(producer_id);
1352 }
1353
1354 // If there are no producers to flush (realistically this happens only in
1355 // some tests) fire OnFlushTimeout() straight away, without waiting.
1356 if (flush_map.empty())
1357 timeout_ms = 0;
1358
1359 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1360 task_runner_->PostDelayedTask(
1361 [weak_this, tsid, flush_request_id] {
1362 if (weak_this)
1363 weak_this->OnFlushTimeout(tsid, flush_request_id);
1364 },
1365 timeout_ms);
1366 }
1367
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1368 void TracingServiceImpl::NotifyFlushDoneForProducer(
1369 ProducerID producer_id,
1370 FlushRequestID flush_request_id) {
1371 for (auto& kv : tracing_sessions_) {
1372 // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1373 auto& pending_flushes = kv.second.pending_flushes;
1374 auto end_it = pending_flushes.upper_bound(flush_request_id);
1375 for (auto it = pending_flushes.begin(); it != end_it;) {
1376 PendingFlush& pending_flush = it->second;
1377 pending_flush.producers.erase(producer_id);
1378 if (pending_flush.producers.empty()) {
1379 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1380 TracingSessionID tsid = kv.first;
1381 auto callback = std::move(pending_flush.callback);
1382 task_runner_->PostTask([weak_this, tsid, callback]() {
1383 if (weak_this) {
1384 weak_this->CompleteFlush(tsid, std::move(callback),
1385 /*success=*/true);
1386 }
1387 });
1388 it = pending_flushes.erase(it);
1389 } else {
1390 it++;
1391 }
1392 } // for (pending_flushes)
1393 } // for (tracing_session)
1394 }
1395
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1396 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1397 FlushRequestID flush_request_id) {
1398 TracingSession* tracing_session = GetTracingSession(tsid);
1399 if (!tracing_session)
1400 return;
1401 auto it = tracing_session->pending_flushes.find(flush_request_id);
1402 if (it == tracing_session->pending_flushes.end())
1403 return; // Nominal case: flush was completed and acked on time.
1404
1405 // If there were no producers to flush, consider it a success.
1406 bool success = it->second.producers.empty();
1407
1408 auto callback = std::move(it->second.callback);
1409 tracing_session->pending_flushes.erase(it);
1410 CompleteFlush(tsid, std::move(callback), success);
1411 }
1412
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1413 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1414 ConsumerEndpoint::FlushCallback callback,
1415 bool success) {
1416 TracingSession* tracing_session = GetTracingSession(tsid);
1417 if (tracing_session) {
1418 // Producers may not have been able to flush all their data, even if they
1419 // indicated flush completion. If possible, also collect uncommitted chunks
1420 // to make sure we have everything they wrote so far.
1421 for (auto& producer_id_and_producer : producers_) {
1422 ScrapeSharedMemoryBuffers(tracing_session,
1423 producer_id_and_producer.second);
1424 }
1425 }
1426 callback(success);
1427 }
1428
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1429 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1430 TracingSession* tracing_session,
1431 ProducerEndpointImpl* producer) {
1432 if (!producer->smb_scraping_enabled_)
1433 return;
1434
1435 // Can't copy chunks if we don't know about any trace writers.
1436 if (producer->writers_.empty())
1437 return;
1438
1439 // Performance optimization: On flush or session disconnect, this method is
1440 // called for each producer. If the producer doesn't participate in the
1441 // session, there's no need to scape its chunks right now. We can tell if a
1442 // producer participates in the session by checking if the producer is allowed
1443 // to write into the session's log buffers.
1444 const auto& session_buffers = tracing_session->buffers_index;
1445 bool producer_in_session =
1446 std::any_of(session_buffers.begin(), session_buffers.end(),
1447 [producer](BufferID buffer_id) {
1448 return producer->allowed_target_buffers_.count(buffer_id);
1449 });
1450 if (!producer_in_session)
1451 return;
1452
1453 PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1454
1455 // Find and copy any uncommitted chunks from the SMB.
1456 //
1457 // In nominal conditions, the page layout of the used SMB pages should never
1458 // change because the service is the only one who is supposed to modify used
1459 // pages (to make them free again).
1460 //
1461 // However, the code here needs to deal with the case of a malicious producer
1462 // altering the SMB in unpredictable ways. Thankfully the SMB size is
1463 // immutable, so a chunk will always point to some valid memory, even if the
1464 // producer alters the intended layout and chunk header concurrently.
1465 // Ultimately a malicious producer altering the SMB's chunk layout while we
1466 // are iterating in this function is not any different from the case of a
1467 // malicious producer asking to commit a chunk made of random data, which is
1468 // something this class has to deal with regardless.
1469 //
1470 // The only legitimate mutations that can happen from sane producers,
1471 // concurrently to this function, are:
1472 // A. free pages being partitioned,
1473 // B. free chunks being migrated to kChunkBeingWritten,
1474 // C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1475
1476 SharedMemoryABI* abi = &producer->shmem_abi_;
1477 // num_pages() is immutable after the SMB is initialized and cannot be changed
1478 // even by a producer even if malicious.
1479 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1480 uint32_t layout = abi->GetPageLayout(page_idx);
1481
1482 uint32_t used_chunks = abi->GetUsedChunks(layout); // Returns a bitmap.
1483 // Skip empty pages.
1484 if (used_chunks == 0)
1485 continue;
1486
1487 // Scrape the chunks that are currently used. These should be either in
1488 // state kChunkBeingWritten or kChunkComplete.
1489 for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1490 if (!(used_chunks & 1))
1491 continue;
1492
1493 SharedMemoryABI::ChunkState state =
1494 SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1495 PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1496 state == SharedMemoryABI::kChunkComplete);
1497 bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1498
1499 SharedMemoryABI::Chunk chunk =
1500 abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1501
1502 uint16_t packet_count;
1503 uint8_t flags;
1504 // GetPacketCountAndFlags has acquire_load semantics.
1505 std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1506
1507 // It only makes sense to copy an incomplete chunk if there's at least
1508 // one full packet available. (The producer may not have completed the
1509 // last packet in it yet, so we need at least 2.)
1510 if (!chunk_complete && packet_count < 2)
1511 continue;
1512
1513 // At this point, it is safe to access the remaining header fields of
1514 // the chunk. Even if the chunk was only just transferred from
1515 // kChunkFree into kChunkBeingWritten state, the header should be
1516 // written completely once the packet count increased above 1 (it was
1517 // reset to 0 by the service when the chunk was freed).
1518
1519 WriterID writer_id = chunk.writer_id();
1520 base::Optional<BufferID> target_buffer_id =
1521 producer->buffer_id_for_writer(writer_id);
1522
1523 // We can only scrape this chunk if we know which log buffer to copy it
1524 // into.
1525 if (!target_buffer_id)
1526 continue;
1527
1528 // Skip chunks that don't belong to the requested tracing session.
1529 bool target_buffer_belongs_to_session =
1530 std::find(session_buffers.begin(), session_buffers.end(),
1531 *target_buffer_id) != session_buffers.end();
1532 if (!target_buffer_belongs_to_session)
1533 continue;
1534
1535 uint32_t chunk_id =
1536 chunk.header()->chunk_id.load(std::memory_order_relaxed);
1537
1538 CopyProducerPageIntoLogBuffer(
1539 producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
1540 packet_count, flags, chunk_complete, chunk.payload_begin(),
1541 chunk.payload_size());
1542 }
1543 }
1544 }
1545
FlushAndDisableTracing(TracingSessionID tsid)1546 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1547 PERFETTO_DCHECK_THREAD(thread_checker_);
1548 PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1549 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1550 Flush(tsid, 0, [weak_this, tsid](bool success) {
1551 PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
1552 success, tsid);
1553 if (!weak_this)
1554 return;
1555 TracingSession* session = weak_this->GetTracingSession(tsid);
1556 if (session->consumer_maybe_null) {
1557 // If the consumer is still attached, just disable the session but give it
1558 // a chance to read the contents.
1559 weak_this->DisableTracing(tsid);
1560 } else {
1561 // If the consumer detached, destroy the session. If the consumer did
1562 // start the session in long-tracing mode, the service will have saved
1563 // the contents to the passed file. If not, the contents will be
1564 // destroyed.
1565 weak_this->FreeBuffers(tsid);
1566 }
1567 });
1568 }
1569
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1570 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1571 bool post_next_only) {
1572 PERFETTO_DCHECK_THREAD(thread_checker_);
1573 TracingSession* tracing_session = GetTracingSession(tsid);
1574 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1575 return;
1576
1577 uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1578 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1579 task_runner_->PostDelayedTask(
1580 [weak_this, tsid] {
1581 if (weak_this)
1582 weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1583 },
1584 flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
1585
1586 if (post_next_only)
1587 return;
1588
1589 PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1590 Flush(tsid, 0, [](bool success) {
1591 if (!success)
1592 PERFETTO_ELOG("Periodic flush timed out");
1593 });
1594 }
1595
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1596 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1597 TracingSessionID tsid,
1598 bool post_next_only) {
1599 PERFETTO_DCHECK_THREAD(thread_checker_);
1600 TracingSession* tracing_session = GetTracingSession(tsid);
1601 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1602 return;
1603
1604 uint32_t clear_period_ms =
1605 tracing_session->config.incremental_state_config().clear_period_ms();
1606 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1607 task_runner_->PostDelayedTask(
1608 [weak_this, tsid] {
1609 if (weak_this)
1610 weak_this->PeriodicClearIncrementalStateTask(
1611 tsid, /*post_next_only=*/false);
1612 },
1613 clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
1614
1615 if (post_next_only)
1616 return;
1617
1618 PERFETTO_DLOG(
1619 "Performing periodic incremental state clear for trace session %" PRIu64,
1620 tsid);
1621
1622 // Queue the IPCs to producers with active data sources that opted in.
1623 std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1624 for (const auto& kv : tracing_session->data_source_instances) {
1625 ProducerID producer_id = kv.first;
1626 const DataSourceInstance& data_source = kv.second;
1627 if (data_source.handles_incremental_state_clear)
1628 clear_map[producer_id].push_back(data_source.instance_id);
1629 }
1630
1631 for (const auto& kv : clear_map) {
1632 ProducerID producer_id = kv.first;
1633 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1634 ProducerEndpointImpl* producer = GetProducer(producer_id);
1635 if (!producer) {
1636 PERFETTO_DFATAL("Producer does not exist.");
1637 continue;
1638 }
1639 producer->ClearIncrementalState(data_sources);
1640 }
1641 }
1642
1643 // Note: when this is called to write into a file passed when starting tracing
1644 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
1645 // to send the trace data back over IPC).
ReadBuffers(TracingSessionID tsid,ConsumerEndpointImpl * consumer)1646 bool TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
1647 ConsumerEndpointImpl* consumer) {
1648 PERFETTO_DCHECK_THREAD(thread_checker_);
1649 TracingSession* tracing_session = GetTracingSession(tsid);
1650 if (!tracing_session) {
1651 // This will be hit systematically from the PostDelayedTask when directly
1652 // writing into the file (in which case consumer == nullptr). Suppress the
1653 // log in this case as it's just spam.
1654 if (consumer) {
1655 PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
1656 }
1657 return false;
1658 }
1659
1660 // When a tracing session is waiting for a trigger it is considered empty. If
1661 // a tracing session finishes and moves into DISABLED without ever receiving a
1662 // trigger the trace should never return any data. This includes the synthetic
1663 // packets like TraceConfig and Clock snapshots. So we bail out early and let
1664 // the consumer know there is no data.
1665 if (!tracing_session->config.trigger_config().triggers().empty() &&
1666 tracing_session->received_triggers.empty()) {
1667 PERFETTO_DLOG(
1668 "ReadBuffers(): tracing session has not received a trigger yet.");
1669 return false;
1670 }
1671
1672 // This can happen if the file is closed by a previous task because it reaches
1673 // |max_file_size_bytes|.
1674 if (!tracing_session->write_into_file && !consumer)
1675 return false;
1676
1677 if (tracing_session->write_into_file && consumer) {
1678 // If the consumer enabled tracing and asked to save the contents into the
1679 // passed file makes little sense to also try to read the buffers over IPC,
1680 // as that would just steal data from the periodic draining task.
1681 PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
1682 return false;
1683 }
1684
1685 std::vector<TracePacket> packets;
1686 packets.reserve(1024); // Just an educated guess to avoid trivial expansions.
1687
1688 if (!tracing_session->initial_clock_snapshot_.empty()) {
1689 EmitClockSnapshot(tracing_session,
1690 std::move(tracing_session->initial_clock_snapshot_),
1691 /*set_root_timestamp=*/true, &packets);
1692 }
1693
1694 if (!tracing_session->last_clock_snapshot_.empty()) {
1695 // We don't want to put a root timestamp in periodic clock snapshot packets
1696 // as they may be emitted very out of order with respect to the actual trace
1697 // packets, since consuming the trace may happen at any point after it
1698 // starts.
1699 EmitClockSnapshot(tracing_session,
1700 std::move(tracing_session->last_clock_snapshot_),
1701 /*set_root_timestamp=*/false, &packets);
1702 }
1703
1704 if (tracing_session->should_emit_sync_marker) {
1705 SnapshotSyncMarker(&packets);
1706 tracing_session->should_emit_sync_marker = false;
1707 }
1708
1709 if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
1710 MaybeEmitTraceConfig(tracing_session, &packets);
1711 MaybeEmitReceivedTriggers(tracing_session, &packets);
1712 }
1713 if (!tracing_session->config.builtin_data_sources().disable_system_info())
1714 MaybeEmitSystemInfo(tracing_session, &packets);
1715 if (!tracing_session->config.builtin_data_sources().disable_service_events())
1716 MaybeEmitServiceEvents(tracing_session, &packets);
1717
1718 size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|).
1719 size_t total_slices = 0; // SUM(#slices in |packets|).
1720
1721 // Add up size for packets added by the Maybe* calls above.
1722 for (const TracePacket& packet : packets) {
1723 packets_bytes += packet.size();
1724 total_slices += packet.slices().size();
1725 }
1726
1727 // This is a rough threshold to determine how much to read from the buffer in
1728 // each task. This is to avoid executing a single huge sending task for too
1729 // long and risk to hit the watchdog. This is *not* an upper bound: we just
1730 // stop accumulating new packets and PostTask *after* we cross this threshold.
1731 // This constant essentially balances the PostTask and IPC overhead vs the
1732 // responsiveness of the service. An extremely small value will cause one IPC
1733 // and one PostTask for each slice but will keep the service extremely
1734 // responsive. An extremely large value will batch the send for the full
1735 // buffer in one large task, will hit the blocking send() once the socket
1736 // buffers are full and hang the service for a bit (until the consumer
1737 // catches up).
1738 static constexpr size_t kApproxBytesPerTask = 32768;
1739 bool did_hit_threshold = false;
1740
1741 // TODO(primiano): Extend the ReadBuffers API to allow reading only some
1742 // buffers, not all of them in one go.
1743 for (size_t buf_idx = 0;
1744 buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
1745 buf_idx++) {
1746 auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
1747 if (tbuf_iter == buffers_.end()) {
1748 PERFETTO_DFATAL("Buffer not found.");
1749 continue;
1750 }
1751 TraceBuffer& tbuf = *tbuf_iter->second;
1752 tbuf.BeginRead();
1753 while (!did_hit_threshold) {
1754 TracePacket packet;
1755 TraceBuffer::PacketSequenceProperties sequence_properties{};
1756 bool previous_packet_dropped;
1757 if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
1758 &previous_packet_dropped)) {
1759 break;
1760 }
1761 PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
1762 PERFETTO_DCHECK(sequence_properties.writer_id != 0);
1763 PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
1764 PERFETTO_DCHECK(packet.size() > 0);
1765 if (!PacketStreamValidator::Validate(packet.slices())) {
1766 tracing_session->invalid_packets++;
1767 PERFETTO_DLOG("Dropping invalid packet");
1768 continue;
1769 }
1770
1771 // Append a slice with the trusted field data. This can't be spoofed
1772 // because above we validated that the existing slices don't contain any
1773 // trusted fields. For added safety we append instead of prepending
1774 // because according to protobuf semantics, if the same field is
1775 // encountered multiple times the last instance takes priority. Note that
1776 // truncated packets are also rejected, so the producer can't give us a
1777 // partial packet (e.g., a truncated string) which only becomes valid when
1778 // the trusted data is appended here.
1779 Slice slice = Slice::Allocate(32);
1780 protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
1781 slice.own_data(), slice.size);
1782 trusted_packet->set_trusted_uid(
1783 static_cast<int32_t>(sequence_properties.producer_uid_trusted));
1784 trusted_packet->set_trusted_packet_sequence_id(
1785 tracing_session->GetPacketSequenceID(
1786 sequence_properties.producer_id_trusted,
1787 sequence_properties.writer_id));
1788 if (previous_packet_dropped)
1789 trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
1790 slice.size = trusted_packet.Finalize();
1791 packet.AddSlice(std::move(slice));
1792
1793 // Append the packet (inclusive of the trusted uid) to |packets|.
1794 packets_bytes += packet.size();
1795 total_slices += packet.slices().size();
1796 did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
1797 !tracing_session->write_into_file;
1798 packets.emplace_back(std::move(packet));
1799 } // for(packets...)
1800 } // for(buffers...)
1801
1802 const bool has_more = did_hit_threshold;
1803
1804 // Only emit the stats when there is no more trace data is available to read.
1805 // That way, any problems that occur while reading from the buffers are
1806 // reflected in the emitted stats. This is particularly important for use
1807 // cases where ReadBuffers is only ever called after the tracing session is
1808 // stopped.
1809 if (!has_more && tracing_session->should_emit_stats) {
1810 size_t prev_packets_size = packets.size();
1811 SnapshotStats(tracing_session, &packets);
1812 tracing_session->should_emit_stats = false;
1813
1814 // Add sizes of packets emitted by SnapshotStats.
1815 for (size_t i = prev_packets_size; i < packets.size(); ++i) {
1816 packets_bytes += packets[i].size();
1817 total_slices += packets[i].slices().size();
1818 }
1819 }
1820
1821 // If the caller asked us to write into a file by setting
1822 // |write_into_file| == true in the trace config, drain the packets read
1823 // (if any) into the given file descriptor.
1824 if (tracing_session->write_into_file) {
1825 const uint64_t max_size = tracing_session->max_file_size_bytes
1826 ? tracing_session->max_file_size_bytes
1827 : std::numeric_limits<size_t>::max();
1828
1829 // When writing into a file, the file should look like a root trace.proto
1830 // message. Each packet should be prepended with a proto preamble stating
1831 // its field id (within trace.proto) and size. Hence the addition below.
1832 const size_t max_iovecs = total_slices + packets.size();
1833
1834 size_t num_iovecs = 0;
1835 bool stop_writing_into_file = tracing_session->write_period_ms == 0;
1836 std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
1837 size_t num_iovecs_at_last_packet = 0;
1838 uint64_t bytes_about_to_be_written = 0;
1839 for (TracePacket& packet : packets) {
1840 std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
1841 packet.GetProtoPreamble();
1842 bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
1843 num_iovecs++;
1844 for (const Slice& slice : packet.slices()) {
1845 // writev() doesn't change the passed pointer. However, struct iovec
1846 // take a non-const ptr because it's the same struct used by readv().
1847 // Hence the const_cast here.
1848 char* start = static_cast<char*>(const_cast<void*>(slice.start));
1849 bytes_about_to_be_written += slice.size;
1850 iovecs[num_iovecs++] = {start, slice.size};
1851 }
1852
1853 if (tracing_session->bytes_written_into_file +
1854 bytes_about_to_be_written >=
1855 max_size) {
1856 stop_writing_into_file = true;
1857 num_iovecs = num_iovecs_at_last_packet;
1858 break;
1859 }
1860
1861 num_iovecs_at_last_packet = num_iovecs;
1862 }
1863 PERFETTO_DCHECK(num_iovecs <= max_iovecs);
1864 int fd = *tracing_session->write_into_file;
1865
1866 uint64_t total_wr_size = 0;
1867
1868 // writev() can take at most IOV_MAX entries per call. Batch them.
1869 constexpr size_t kIOVMax = IOV_MAX;
1870 for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
1871 int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
1872 ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
1873 if (wr_size <= 0) {
1874 PERFETTO_PLOG("writev() failed");
1875 stop_writing_into_file = true;
1876 break;
1877 }
1878 total_wr_size += static_cast<size_t>(wr_size);
1879 }
1880
1881 tracing_session->bytes_written_into_file += total_wr_size;
1882
1883 PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
1884 (total_wr_size + 1023) / 1024, stop_writing_into_file);
1885 if (stop_writing_into_file) {
1886 // Ensure all data was written to the file before we close it.
1887 base::FlushFile(fd);
1888 tracing_session->write_into_file.reset();
1889 tracing_session->write_period_ms = 0;
1890 if (tracing_session->state == TracingSession::STARTED)
1891 DisableTracing(tsid);
1892 return true;
1893 }
1894
1895 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1896 task_runner_->PostDelayedTask(
1897 [weak_this, tsid] {
1898 if (weak_this)
1899 weak_this->ReadBuffers(tsid, nullptr);
1900 },
1901 tracing_session->delay_to_next_write_period_ms());
1902 return true;
1903 } // if (tracing_session->write_into_file)
1904
1905 if (has_more) {
1906 auto weak_consumer = consumer->GetWeakPtr();
1907 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1908 task_runner_->PostTask([weak_this, weak_consumer, tsid] {
1909 if (!weak_this || !weak_consumer)
1910 return;
1911 weak_this->ReadBuffers(tsid, weak_consumer.get());
1912 });
1913 }
1914
1915 // Keep this as tail call, just in case the consumer re-enters.
1916 consumer->consumer_->OnTraceData(std::move(packets), has_more);
1917 return true;
1918 }
1919
FreeBuffers(TracingSessionID tsid)1920 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
1921 PERFETTO_DCHECK_THREAD(thread_checker_);
1922 PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
1923 TracingSession* tracing_session = GetTracingSession(tsid);
1924 if (!tracing_session) {
1925 PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
1926 return; // TODO(primiano): signal failure?
1927 }
1928 DisableTracing(tsid, /*disable_immediately=*/true);
1929
1930 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1931 tracing_session->data_source_instances.clear();
1932
1933 for (auto& producer_entry : producers_) {
1934 ProducerEndpointImpl* producer = producer_entry.second;
1935 producer->OnFreeBuffers(tracing_session->buffers_index);
1936 }
1937
1938 for (BufferID buffer_id : tracing_session->buffers_index) {
1939 buffer_ids_.Free(buffer_id);
1940 PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
1941 buffers_.erase(buffer_id);
1942 }
1943 bool notify_traceur = tracing_session->config.notify_traceur();
1944 tracing_sessions_.erase(tsid);
1945 UpdateMemoryGuardrail();
1946
1947 PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
1948 tracing_sessions_.size());
1949
1950 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
1951 static const char kTraceurProp[] = "sys.trace.trace_end_signal";
1952 if (notify_traceur && __system_property_set(kTraceurProp, "1"))
1953 PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
1954 #else
1955 base::ignore_result(notify_traceur);
1956 #endif
1957 }
1958
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)1959 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
1960 const DataSourceDescriptor& desc) {
1961 PERFETTO_DCHECK_THREAD(thread_checker_);
1962 PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
1963 producer_id, desc.name().c_str());
1964
1965 PERFETTO_DCHECK(!desc.name().empty());
1966 auto reg_ds = data_sources_.emplace(desc.name(),
1967 RegisteredDataSource{producer_id, desc});
1968
1969 // If there are existing tracing sessions, we need to check if the new
1970 // data source is enabled by any of them.
1971 if (tracing_sessions_.empty())
1972 return;
1973
1974 ProducerEndpointImpl* producer = GetProducer(producer_id);
1975 if (!producer) {
1976 PERFETTO_DFATAL("Producer not found.");
1977 return;
1978 }
1979
1980 for (auto& iter : tracing_sessions_) {
1981 TracingSession& tracing_session = iter.second;
1982 if (tracing_session.state != TracingSession::STARTED &&
1983 tracing_session.state != TracingSession::CONFIGURED) {
1984 continue;
1985 }
1986
1987 TraceConfig::ProducerConfig producer_config;
1988 for (auto& config : tracing_session.config.producers()) {
1989 if (producer->name_ == config.producer_name()) {
1990 producer_config = config;
1991 break;
1992 }
1993 }
1994 for (const TraceConfig::DataSource& cfg_data_source :
1995 tracing_session.config.data_sources()) {
1996 if (cfg_data_source.config().name() != desc.name())
1997 continue;
1998 DataSourceInstance* ds_inst = SetupDataSource(
1999 cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2000 if (ds_inst && tracing_session.state == TracingSession::STARTED)
2001 StartDataSourceInstance(producer, &tracing_session, ds_inst);
2002 }
2003 }
2004 }
2005
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2006 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2007 TracingSession* tracing_session,
2008 DataSourceInstance* instance,
2009 bool disable_immediately) {
2010 const DataSourceInstanceID ds_inst_id = instance->instance_id;
2011 if (instance->will_notify_on_stop && !disable_immediately) {
2012 instance->state = DataSourceInstance::STOPPING;
2013 } else {
2014 instance->state = DataSourceInstance::STOPPED;
2015 }
2016 if (tracing_session->consumer_maybe_null) {
2017 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2018 *producer, *instance);
2019 }
2020 producer->StopDataSource(ds_inst_id);
2021 }
2022
UnregisterDataSource(ProducerID producer_id,const std::string & name)2023 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2024 const std::string& name) {
2025 PERFETTO_DCHECK_THREAD(thread_checker_);
2026 PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2027 producer_id, name.c_str());
2028 PERFETTO_CHECK(producer_id);
2029 ProducerEndpointImpl* producer = GetProducer(producer_id);
2030 PERFETTO_DCHECK(producer);
2031 for (auto& kv : tracing_sessions_) {
2032 auto& ds_instances = kv.second.data_source_instances;
2033 bool removed = false;
2034 for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2035 if (it->first == producer_id && it->second.data_source_name == name) {
2036 DataSourceInstanceID ds_inst_id = it->second.instance_id;
2037 if (it->second.state != DataSourceInstance::STOPPED) {
2038 if (it->second.state != DataSourceInstance::STOPPING)
2039 StopDataSourceInstance(producer, &kv.second, &it->second,
2040 /* disable_immediately = */ false);
2041 // Mark the instance as stopped immediately, since we are
2042 // unregistering it below.
2043 if (it->second.state == DataSourceInstance::STOPPING)
2044 NotifyDataSourceStopped(producer_id, ds_inst_id);
2045 }
2046 it = ds_instances.erase(it);
2047 removed = true;
2048 } else {
2049 ++it;
2050 }
2051 } // for (data_source_instances)
2052 if (removed)
2053 MaybeNotifyAllDataSourcesStarted(&kv.second);
2054 } // for (tracing_session)
2055
2056 for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2057 if (it->second.producer_id == producer_id &&
2058 it->second.descriptor.name() == name) {
2059 data_sources_.erase(it);
2060 return;
2061 }
2062 }
2063
2064 PERFETTO_DFATAL(
2065 "Tried to unregister a non-existent data source \"%s\" for "
2066 "producer %" PRIu16,
2067 name.c_str(), producer_id);
2068 }
2069
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2070 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2071 const TraceConfig::DataSource& cfg_data_source,
2072 const TraceConfig::ProducerConfig& producer_config,
2073 const RegisteredDataSource& data_source,
2074 TracingSession* tracing_session) {
2075 PERFETTO_DCHECK_THREAD(thread_checker_);
2076 ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2077 PERFETTO_DCHECK(producer);
2078 // An existing producer that is not ftrace could have registered itself as
2079 // ftrace, we must not enable it in that case.
2080 if (lockdown_mode_ && producer->uid_ != uid_) {
2081 PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2082 return nullptr;
2083 }
2084 // TODO(primiano): Add tests for registration ordering (data sources vs
2085 // consumers).
2086 if (!NameMatchesFilter(producer->name_,
2087 cfg_data_source.producer_name_filter(),
2088 cfg_data_source.producer_name_regex_filter())) {
2089 PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2090 cfg_data_source.config().name().c_str(),
2091 producer->name_.c_str());
2092 return nullptr;
2093 }
2094
2095 auto relative_buffer_id = cfg_data_source.config().target_buffer();
2096 if (relative_buffer_id >= tracing_session->num_buffers()) {
2097 PERFETTO_LOG(
2098 "The TraceConfig for DataSource %s specified a target_buffer out of "
2099 "bound (%d). Skipping it.",
2100 cfg_data_source.config().name().c_str(), relative_buffer_id);
2101 return nullptr;
2102 }
2103
2104 // Create a copy of the DataSourceConfig specified in the trace config. This
2105 // will be passed to the producer after translating the |target_buffer| id.
2106 // The |target_buffer| parameter passed by the consumer in the trace config is
2107 // relative to the buffers declared in the same trace config. This has to be
2108 // translated to the global BufferID before passing it to the producers, which
2109 // don't know anything about tracing sessions and consumers.
2110
2111 DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2112 auto insert_iter = tracing_session->data_source_instances.emplace(
2113 std::piecewise_construct, //
2114 std::forward_as_tuple(producer->id_),
2115 std::forward_as_tuple(
2116 inst_id,
2117 cfg_data_source.config(), // Deliberate copy.
2118 data_source.descriptor.name(),
2119 data_source.descriptor.will_notify_on_start(),
2120 data_source.descriptor.will_notify_on_stop(),
2121 data_source.descriptor.handles_incremental_state_clear()));
2122 DataSourceInstance* ds_instance = &insert_iter->second;
2123
2124 // New data source instance starts out in CONFIGURED state.
2125 if (tracing_session->consumer_maybe_null) {
2126 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2127 *producer, *ds_instance);
2128 }
2129
2130 DataSourceConfig& ds_config = ds_instance->config;
2131 ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2132 ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2133 ds_config.set_enable_extra_guardrails(
2134 tracing_session->config.enable_extra_guardrails());
2135 ds_config.set_tracing_session_id(tracing_session->id);
2136 BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2137 PERFETTO_DCHECK(global_id);
2138 ds_config.set_target_buffer(global_id);
2139
2140 PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2141 ds_config.name().c_str(), global_id);
2142 if (!producer->shared_memory()) {
2143 // Determine the SMB page size. Must be an integer multiple of 4k.
2144 // As for the SMB size below, the decision tree is as follows:
2145 // 1. Give priority to what is defined in the trace config.
2146 // 2. If unset give priority to the hint passed by the producer.
2147 // 3. Keep within bounds and ensure it's a multiple of 4k.
2148 size_t page_size = producer_config.page_size_kb() * 1024;
2149 if (page_size == 0)
2150 page_size = producer->shmem_page_size_hint_bytes_;
2151
2152 // Determine the SMB size. Must be an integer multiple of the SMB page size.
2153 // The decision tree is as follows:
2154 // 1. Give priority to what defined in the trace config.
2155 // 2. If unset give priority to the hint passed by the producer.
2156 // 3. Keep within bounds and ensure it's a multiple of the page size.
2157 size_t shm_size = producer_config.shm_size_kb() * 1024;
2158 if (shm_size == 0)
2159 shm_size = producer->shmem_size_hint_bytes_;
2160
2161 auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2162 if (valid_sizes != std::tie(shm_size, page_size)) {
2163 PERFETTO_DLOG(
2164 "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2165 "back to shm_size %zu page_size %zu.",
2166 shm_size, page_size, std::get<0>(valid_sizes),
2167 std::get<1>(valid_sizes));
2168 }
2169 std::tie(shm_size, page_size) = valid_sizes;
2170
2171 // TODO(primiano): right now Create() will suicide in case of OOM if the
2172 // mmap fails. We should instead gracefully fail the request and tell the
2173 // client to go away.
2174 PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2175 producer->name_.c_str());
2176 auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2177 producer->SetupSharedMemory(std::move(shared_memory), page_size,
2178 /*provided_by_producer=*/false);
2179 }
2180 producer->SetupDataSource(inst_id, ds_config);
2181 return ds_instance;
2182 }
2183
2184 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2185 // might be lying / returning garbage contents. |src| and |size| can be trusted
2186 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2187 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2188 ProducerID producer_id_trusted,
2189 uid_t producer_uid_trusted,
2190 WriterID writer_id,
2191 ChunkID chunk_id,
2192 BufferID buffer_id,
2193 uint16_t num_fragments,
2194 uint8_t chunk_flags,
2195 bool chunk_complete,
2196 const uint8_t* src,
2197 size_t size) {
2198 PERFETTO_DCHECK_THREAD(thread_checker_);
2199
2200 ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2201 if (!producer) {
2202 PERFETTO_DFATAL("Producer not found.");
2203 chunks_discarded_++;
2204 return;
2205 }
2206
2207 TraceBuffer* buf = GetBufferByID(buffer_id);
2208 if (!buf) {
2209 PERFETTO_DLOG("Could not find target buffer %" PRIu16
2210 " for producer %" PRIu16,
2211 buffer_id, producer_id_trusted);
2212 chunks_discarded_++;
2213 return;
2214 }
2215
2216 // Verify that the producer is actually allowed to write into the target
2217 // buffer specified in the request. This prevents a malicious producer from
2218 // injecting data into a log buffer that belongs to a tracing session the
2219 // producer is not part of.
2220 if (!producer->is_allowed_target_buffer(buffer_id)) {
2221 PERFETTO_ELOG("Producer %" PRIu16
2222 " tried to write into forbidden target buffer %" PRIu16,
2223 producer_id_trusted, buffer_id);
2224 PERFETTO_DFATAL("Forbidden target buffer");
2225 chunks_discarded_++;
2226 return;
2227 }
2228
2229 // If the writer was registered by the producer, it should only write into the
2230 // buffer it was registered with.
2231 base::Optional<BufferID> associated_buffer =
2232 producer->buffer_id_for_writer(writer_id);
2233 if (associated_buffer && *associated_buffer != buffer_id) {
2234 PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2235 " was registered to write into target buffer %" PRIu16
2236 ", but tried to write into buffer %" PRIu16,
2237 writer_id, producer_id_trusted, *associated_buffer,
2238 buffer_id);
2239 PERFETTO_DFATAL("Wrong target buffer");
2240 chunks_discarded_++;
2241 return;
2242 }
2243
2244 buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
2245 chunk_id, num_fragments, chunk_flags, chunk_complete,
2246 src, size);
2247 }
2248
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2249 void TracingServiceImpl::ApplyChunkPatches(
2250 ProducerID producer_id_trusted,
2251 const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2252 PERFETTO_DCHECK_THREAD(thread_checker_);
2253
2254 for (const auto& chunk : chunks_to_patch) {
2255 const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2256 const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2257 TraceBuffer* buf =
2258 GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2259 static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2260 "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2261 if (!writer_id || writer_id > kMaxWriterID || !buf) {
2262 // This can genuinely happen when the trace is stopped. The producers
2263 // might see the stop signal with some delay and try to keep sending
2264 // patches left soon after.
2265 PERFETTO_DLOG(
2266 "Received invalid chunks_to_patch request from Producer: %" PRIu16
2267 ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2268 producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2269 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2270 continue;
2271 }
2272
2273 // Note, there's no need to validate that the producer is allowed to write
2274 // to the specified buffer ID (or that it's the correct buffer ID for a
2275 // registered TraceWriter). That's because TraceBuffer uses the producer ID
2276 // and writer ID to look up the chunk to patch. If the producer specifies an
2277 // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2278 // patches. Because the producer ID is trusted, there's also no way for a
2279 // malicious producer to patch another producer's data.
2280
2281 // Speculate on the fact that there are going to be a limited amount of
2282 // patches per request, so we can allocate the |patches| array on the stack.
2283 std::array<TraceBuffer::Patch, 1024> patches; // Uninitialized.
2284 if (chunk.patches().size() > patches.size()) {
2285 PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2286 patches.size());
2287 PERFETTO_DFATAL("Too many patches");
2288 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2289 continue;
2290 }
2291
2292 size_t i = 0;
2293 for (const auto& patch : chunk.patches()) {
2294 const std::string& patch_data = patch.data();
2295 if (patch_data.size() != patches[i].data.size()) {
2296 PERFETTO_ELOG("Received patch from producer: %" PRIu16
2297 " of unexpected size %zu",
2298 producer_id_trusted, patch_data.size());
2299 patches_discarded_++;
2300 continue;
2301 }
2302 patches[i].offset_untrusted = patch.offset();
2303 memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
2304 i++;
2305 }
2306 buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
2307 &patches[0], i, chunk.has_more_patches());
2308 }
2309 }
2310
GetDetachedSession(uid_t uid,const std::string & key)2311 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
2312 uid_t uid,
2313 const std::string& key) {
2314 PERFETTO_DCHECK_THREAD(thread_checker_);
2315 for (auto& kv : tracing_sessions_) {
2316 TracingSession* session = &kv.second;
2317 if (session->consumer_uid == uid && session->detach_key == key) {
2318 PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
2319 return session;
2320 }
2321 }
2322 return nullptr;
2323 }
2324
GetTracingSession(TracingSessionID tsid)2325 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2326 TracingSessionID tsid) {
2327 PERFETTO_DCHECK_THREAD(thread_checker_);
2328 auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2329 if (it == tracing_sessions_.end())
2330 return nullptr;
2331 return &it->second;
2332 }
2333
GetNextProducerID()2334 ProducerID TracingServiceImpl::GetNextProducerID() {
2335 PERFETTO_DCHECK_THREAD(thread_checker_);
2336 PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2337 do {
2338 ++last_producer_id_;
2339 } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2340 PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2341 return last_producer_id_;
2342 }
2343
GetBufferByID(BufferID buffer_id)2344 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2345 auto buf_iter = buffers_.find(buffer_id);
2346 if (buf_iter == buffers_.end())
2347 return nullptr;
2348 return &*buf_iter->second;
2349 }
2350
OnStartTriggersTimeout(TracingSessionID tsid)2351 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2352 // Skip entirely the flush if the trace session doesn't exist anymore.
2353 // This is to prevent misleading error messages to be logged.
2354 //
2355 // if the trace has started from the trigger we rely on
2356 // the |stop_delay_ms| from the trigger so don't flush and
2357 // disable if we've moved beyond a CONFIGURED state
2358 auto* tracing_session_ptr = GetTracingSession(tsid);
2359 if (tracing_session_ptr &&
2360 tracing_session_ptr->state == TracingSession::CONFIGURED) {
2361 PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2362 " since no triggers activated.",
2363 tsid);
2364 // No data should be returned from ReadBuffers() regardless of if we
2365 // call FreeBuffers() or DisableTracing(). This is because in
2366 // STOP_TRACING we need this promise in either case, and using
2367 // DisableTracing() allows a graceful shutdown. Consumers can follow
2368 // their normal path and check the buffers through ReadBuffers() and
2369 // the code won't hang because the tracing session will still be
2370 // alive just disabled.
2371 DisableTracing(tsid);
2372 }
2373 }
2374
UpdateMemoryGuardrail()2375 void TracingServiceImpl::UpdateMemoryGuardrail() {
2376 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
2377 uint64_t total_buffer_bytes = 0;
2378
2379 // Sum up all the shared memory buffers.
2380 for (const auto& id_to_producer : producers_) {
2381 if (id_to_producer.second->shared_memory())
2382 total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2383 }
2384
2385 // Sum up all the trace buffers.
2386 for (const auto& id_to_buffer : buffers_) {
2387 total_buffer_bytes += id_to_buffer.second->size();
2388 }
2389
2390 // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2391 // interval.
2392 uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
2393 base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2394 #endif
2395 }
2396
PeriodicSnapshotTask(TracingSession * tracing_session,bool is_initial_snapshot)2397 void TracingServiceImpl::PeriodicSnapshotTask(TracingSession* tracing_session,
2398 bool is_initial_snapshot) {
2399 tracing_session->should_emit_sync_marker = true;
2400 tracing_session->should_emit_stats = true;
2401
2402 if (!tracing_session->config.builtin_data_sources()
2403 .disable_clock_snapshotting()) {
2404 if (is_initial_snapshot)
2405 SnapshotClocks(&tracing_session->initial_clock_snapshot_);
2406 SnapshotClocks(&tracing_session->last_clock_snapshot_);
2407 }
2408
2409 uint32_t interval_ms =
2410 tracing_session->config.builtin_data_sources().snapshot_interval_ms();
2411 if (!interval_ms)
2412 interval_ms = kDefaultSnapshotsIntervalMs;
2413
2414 TracingSessionID tsid = tracing_session->id;
2415 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2416 task_runner_->PostDelayedTask(
2417 [weak_this, tsid] {
2418 if (!weak_this)
2419 return;
2420 auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
2421 if (!tracing_session_ptr)
2422 return;
2423 if (tracing_session_ptr->state != TracingSession::STARTED)
2424 return;
2425 weak_this->PeriodicSnapshotTask(tracing_session_ptr,
2426 /*is_initial_snapshot=*/false);
2427 },
2428 interval_ms - (base::GetWallTimeMs().count() % interval_ms));
2429 }
2430
SnapshotSyncMarker(std::vector<TracePacket> * packets)2431 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
2432 // The sync marks are used to tokenize large traces efficiently.
2433 // See description in trace_packet.proto.
2434 if (sync_marker_packet_size_ == 0) {
2435 // The marker ABI expects that the marker is written after the uid.
2436 // Protozero guarantees that fields are written in the same order of the
2437 // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
2438 protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
2439 &sync_marker_packet_[0], sizeof(sync_marker_packet_));
2440 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2441 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2442
2443 // Keep this last.
2444 packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
2445 sync_marker_packet_size_ = packet.Finalize();
2446 }
2447 packets->emplace_back();
2448 packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
2449 }
2450
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)2451 void TracingServiceImpl::SnapshotClocks(
2452 TracingSession::ClockSnapshotData* snapshot_data) {
2453 // Minimum drift that justifies replacing a prior clock snapshot that hasn't
2454 // been emitted into the trace yet (see comment below).
2455 static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000; // 10 ms
2456
2457 TracingSession::ClockSnapshotData new_snapshot_data;
2458
2459 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
2460 !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
2461 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2462 struct {
2463 clockid_t id;
2464 protos::pbzero::BuiltinClock type;
2465 struct timespec ts;
2466 } clocks[] = {
2467 {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
2468 {CLOCK_REALTIME_COARSE,
2469 protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
2470 {0, 0}},
2471 {CLOCK_MONOTONIC_COARSE,
2472 protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
2473 {0, 0}},
2474 {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
2475 {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
2476 {CLOCK_MONOTONIC_RAW,
2477 protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
2478 {0, 0}},
2479 };
2480 // First snapshot all the clocks as atomically as we can.
2481 for (auto& clock : clocks) {
2482 if (clock_gettime(clock.id, &clock.ts) == -1)
2483 PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
2484 }
2485 for (auto& clock : clocks) {
2486 new_snapshot_data.push_back(std::make_pair(
2487 static_cast<uint32_t>(clock.type),
2488 static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
2489 }
2490 #else // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2491 // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&
2492 // !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2493 auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
2494 // The default trace clock is boot time, so we always need to emit a path to
2495 // it. However since we don't actually have a boot time source on these
2496 // platforms, pretend that wall time equals boot time.
2497 new_snapshot_data.push_back(
2498 std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
2499 new_snapshot_data.push_back(
2500 std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
2501 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2502 // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&
2503 // !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2504
2505 // If we're about to update a session's latest clock snapshot that hasn't been
2506 // emitted into the trace yet, check whether the clocks have drifted enough to
2507 // warrant overriding the current snapshot values. The older snapshot would be
2508 // valid for a larger part of the currently buffered trace data because the
2509 // clock sync protocol in trace processor uses the latest clock <= timestamp
2510 // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
2511 // we try to keep it if we can.
2512 if (!snapshot_data->empty()) {
2513 PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
2514 PERFETTO_DCHECK((*snapshot_data)[0].first ==
2515 protos::gen::BUILTIN_CLOCK_BOOTTIME);
2516
2517 bool update_snapshot = false;
2518 uint64_t old_boot_ns = (*snapshot_data)[0].second;
2519 uint64_t new_boot_ns = new_snapshot_data[0].second;
2520 int64_t boot_diff =
2521 static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
2522
2523 for (size_t i = 1; i < snapshot_data->size(); i++) {
2524 uint64_t old_ns = (*snapshot_data)[i].second;
2525 uint64_t new_ns = new_snapshot_data[i].second;
2526
2527 int64_t diff =
2528 static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
2529
2530 // Compare the boottime delta against the delta of this clock.
2531 if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
2532 update_snapshot = true;
2533 break;
2534 }
2535 }
2536 if (!update_snapshot)
2537 return;
2538 snapshot_data->clear();
2539 }
2540
2541 *snapshot_data = std::move(new_snapshot_data);
2542 }
2543
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,bool set_root_timestamp,std::vector<TracePacket> * packets)2544 void TracingServiceImpl::EmitClockSnapshot(
2545 TracingSession* tracing_session,
2546 TracingSession::ClockSnapshotData snapshot_data,
2547 bool set_root_timestamp,
2548 std::vector<TracePacket>* packets) {
2549 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2550 if (set_root_timestamp) {
2551 // First timestamp from the snapshot is the default time domain (BOOTTIME on
2552 // systems that support it, or walltime otherwise).
2553 PERFETTO_DCHECK(snapshot_data[0].first ==
2554 protos::gen::BUILTIN_CLOCK_BOOTTIME);
2555 packet->set_timestamp(snapshot_data[0].second);
2556 }
2557
2558 auto* snapshot = packet->set_clock_snapshot();
2559
2560 protos::gen::BuiltinClock trace_clock =
2561 tracing_session->config.builtin_data_sources().primary_trace_clock();
2562 if (!trace_clock)
2563 trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
2564 snapshot->set_primary_trace_clock(
2565 static_cast<protos::pbzero::BuiltinClock>(trace_clock));
2566
2567 for (auto& clock_id_and_ts : snapshot_data) {
2568 auto* c = snapshot->add_clocks();
2569 c->set_clock_id(clock_id_and_ts.first);
2570 c->set_timestamp(clock_id_and_ts.second);
2571 }
2572
2573 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2574 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2575 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2576 }
2577
SnapshotStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)2578 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
2579 std::vector<TracePacket>* packets) {
2580 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2581 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2582 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2583 GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
2584 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2585 }
2586
GetTraceStats(TracingSession * tracing_session)2587 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
2588 TraceStats trace_stats;
2589 trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
2590 trace_stats.set_producers_seen(last_producer_id_);
2591 trace_stats.set_data_sources_registered(
2592 static_cast<uint32_t>(data_sources_.size()));
2593 trace_stats.set_data_sources_seen(last_data_source_instance_id_);
2594 trace_stats.set_tracing_sessions(
2595 static_cast<uint32_t>(tracing_sessions_.size()));
2596 trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
2597 trace_stats.set_chunks_discarded(chunks_discarded_);
2598 trace_stats.set_patches_discarded(patches_discarded_);
2599 trace_stats.set_invalid_packets(tracing_session->invalid_packets);
2600
2601 for (BufferID buf_id : tracing_session->buffers_index) {
2602 TraceBuffer* buf = GetBufferByID(buf_id);
2603 if (!buf) {
2604 PERFETTO_DFATAL("Buffer not found.");
2605 continue;
2606 }
2607 *trace_stats.add_buffer_stats() = buf->stats();
2608 } // for (buf in session).
2609 return trace_stats;
2610 }
2611
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)2612 void TracingServiceImpl::MaybeEmitTraceConfig(
2613 TracingSession* tracing_session,
2614 std::vector<TracePacket>* packets) {
2615 if (tracing_session->did_emit_config)
2616 return;
2617 tracing_session->did_emit_config = true;
2618 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2619 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2620 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2621 tracing_session->config.Serialize(packet->set_trace_config());
2622 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2623 }
2624
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)2625 void TracingServiceImpl::MaybeEmitSystemInfo(
2626 TracingSession* tracing_session,
2627 std::vector<TracePacket>* packets) {
2628 if (tracing_session->did_emit_system_info)
2629 return;
2630 tracing_session->did_emit_system_info = true;
2631 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2632 auto* info = packet->set_system_info();
2633 base::ignore_result(info); // For PERFETTO_OS_WIN.
2634 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
2635 !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2636 struct utsname uname_info;
2637 if (uname(&uname_info) == 0) {
2638 auto* utsname_info = info->set_utsname();
2639 utsname_info->set_sysname(uname_info.sysname);
2640 utsname_info->set_version(uname_info.version);
2641 utsname_info->set_machine(uname_info.machine);
2642 utsname_info->set_release(uname_info.release);
2643 }
2644 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2645 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2646 char value[PROP_VALUE_MAX];
2647 if (__system_property_get("ro.build.fingerprint", value)) {
2648 info->set_android_build_fingerprint(value);
2649 } else {
2650 PERFETTO_ELOG("Unable to read ro.build.fingerprint");
2651 }
2652 info->set_hz(sysconf(_SC_CLK_TCK));
2653 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2654 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2655 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2656 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2657 }
2658
MaybeEmitServiceEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)2659 void TracingServiceImpl::MaybeEmitServiceEvents(
2660 TracingSession* tracing_session,
2661 std::vector<TracePacket>* packets) {
2662 int64_t all_start_ns = tracing_session->time_all_data_source_started.count();
2663 if (!tracing_session->did_emit_all_data_source_started && all_start_ns > 0) {
2664 tracing_session->did_emit_all_data_source_started = true;
2665 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2666 packet->set_timestamp(static_cast<uint64_t>(all_start_ns));
2667 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2668 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2669 packet->set_service_event()->set_all_data_sources_started(true);
2670 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2671 }
2672 }
2673
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)2674 void TracingServiceImpl::MaybeEmitReceivedTriggers(
2675 TracingSession* tracing_session,
2676 std::vector<TracePacket>* packets) {
2677 PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
2678 tracing_session->received_triggers.size());
2679 for (size_t i = tracing_session->num_triggers_emitted_into_trace;
2680 i < tracing_session->received_triggers.size(); ++i) {
2681 const auto& info = tracing_session->received_triggers[i];
2682 protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2683 auto* trigger = packet->set_trigger();
2684 trigger->set_trigger_name(info.trigger_name);
2685 trigger->set_producer_name(info.producer_name);
2686 trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
2687
2688 packet->set_timestamp(info.boot_time_ns);
2689 packet->set_trusted_uid(static_cast<int32_t>(uid_));
2690 packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2691 SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2692 ++tracing_session->num_triggers_emitted_into_trace;
2693 }
2694 }
2695
2696 ////////////////////////////////////////////////////////////////////////////////
2697 // TracingServiceImpl::ConsumerEndpointImpl implementation
2698 ////////////////////////////////////////////////////////////////////////////////
2699
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)2700 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
2701 TracingServiceImpl* service,
2702 base::TaskRunner* task_runner,
2703 Consumer* consumer,
2704 uid_t uid)
2705 : task_runner_(task_runner),
2706 service_(service),
2707 consumer_(consumer),
2708 uid_(uid),
2709 weak_ptr_factory_(this) {}
2710
~ConsumerEndpointImpl()2711 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
2712 service_->DisconnectConsumer(this);
2713 consumer_->OnDisconnect();
2714 }
2715
NotifyOnTracingDisabled()2716 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
2717 PERFETTO_DCHECK_THREAD(thread_checker_);
2718 auto weak_this = GetWeakPtr();
2719 task_runner_->PostTask([weak_this] {
2720 if (weak_this)
2721 weak_this->consumer_->OnTracingDisabled();
2722 });
2723 }
2724
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)2725 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
2726 const TraceConfig& cfg,
2727 base::ScopedFile fd) {
2728 PERFETTO_DCHECK_THREAD(thread_checker_);
2729 if (!service_->EnableTracing(this, cfg, std::move(fd)))
2730 NotifyOnTracingDisabled();
2731 }
2732
ChangeTraceConfig(const TraceConfig & cfg)2733 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
2734 const TraceConfig& cfg) {
2735 if (!tracing_session_id_) {
2736 PERFETTO_LOG(
2737 "Consumer called ChangeTraceConfig() but tracing was "
2738 "not active");
2739 return;
2740 }
2741 service_->ChangeTraceConfig(this, cfg);
2742 }
2743
StartTracing()2744 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
2745 PERFETTO_DCHECK_THREAD(thread_checker_);
2746 if (!tracing_session_id_) {
2747 PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
2748 return;
2749 }
2750 service_->StartTracing(tracing_session_id_);
2751 }
2752
DisableTracing()2753 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
2754 PERFETTO_DCHECK_THREAD(thread_checker_);
2755 if (!tracing_session_id_) {
2756 PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
2757 return;
2758 }
2759 service_->DisableTracing(tracing_session_id_);
2760 }
2761
ReadBuffers()2762 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
2763 PERFETTO_DCHECK_THREAD(thread_checker_);
2764 if (!tracing_session_id_) {
2765 PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
2766 consumer_->OnTraceData({}, /* has_more = */ false);
2767 return;
2768 }
2769 if (!service_->ReadBuffers(tracing_session_id_, this)) {
2770 consumer_->OnTraceData({}, /* has_more = */ false);
2771 }
2772 }
2773
FreeBuffers()2774 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
2775 PERFETTO_DCHECK_THREAD(thread_checker_);
2776 if (!tracing_session_id_) {
2777 PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
2778 return;
2779 }
2780 service_->FreeBuffers(tracing_session_id_);
2781 tracing_session_id_ = 0;
2782 }
2783
Flush(uint32_t timeout_ms,FlushCallback callback)2784 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
2785 FlushCallback callback) {
2786 PERFETTO_DCHECK_THREAD(thread_checker_);
2787 if (!tracing_session_id_) {
2788 PERFETTO_LOG("Consumer called Flush() but tracing was not active");
2789 return;
2790 }
2791 service_->Flush(tracing_session_id_, timeout_ms, callback);
2792 }
2793
Detach(const std::string & key)2794 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
2795 PERFETTO_DCHECK_THREAD(thread_checker_);
2796 bool success = service_->DetachConsumer(this, key);
2797 auto weak_this = GetWeakPtr();
2798 task_runner_->PostTask([weak_this, success] {
2799 if (weak_this)
2800 weak_this->consumer_->OnDetach(success);
2801 });
2802 }
2803
Attach(const std::string & key)2804 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
2805 PERFETTO_DCHECK_THREAD(thread_checker_);
2806 bool success = service_->AttachConsumer(this, key);
2807 auto weak_this = GetWeakPtr();
2808 task_runner_->PostTask([weak_this, success] {
2809 if (!weak_this)
2810 return;
2811 Consumer* consumer = weak_this->consumer_;
2812 TracingSession* session =
2813 weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
2814 if (!session) {
2815 consumer->OnAttach(false, TraceConfig());
2816 return;
2817 }
2818 consumer->OnAttach(success, session->config);
2819 });
2820 }
2821
GetTraceStats()2822 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
2823 PERFETTO_DCHECK_THREAD(thread_checker_);
2824 bool success = false;
2825 TraceStats stats;
2826 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2827 if (session) {
2828 success = true;
2829 stats = service_->GetTraceStats(session);
2830 }
2831 auto weak_this = GetWeakPtr();
2832 task_runner_->PostTask([weak_this, success, stats] {
2833 if (weak_this)
2834 weak_this->consumer_->OnTraceStats(success, stats);
2835 });
2836 }
2837
ObserveEvents(uint32_t events_mask)2838 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
2839 uint32_t events_mask) {
2840 PERFETTO_DCHECK_THREAD(thread_checker_);
2841 observable_events_mask_ = events_mask;
2842 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2843 if (!session)
2844 return;
2845
2846 if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
2847 // Issue initial states.
2848 for (const auto& kv : session->data_source_instances) {
2849 ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
2850 PERFETTO_DCHECK(producer);
2851 OnDataSourceInstanceStateChange(*producer, kv.second);
2852 }
2853 }
2854
2855 // If the ObserveEvents() call happens after data sources have acked already
2856 // notify immediately.
2857 if (observable_events_mask_ &
2858 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
2859 service_->MaybeNotifyAllDataSourcesStarted(session);
2860 }
2861 }
2862
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)2863 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
2864 const ProducerEndpointImpl& producer,
2865 const DataSourceInstance& instance) {
2866 if (!(observable_events_mask_ &
2867 ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
2868 return;
2869 }
2870
2871 if (instance.state != DataSourceInstance::CONFIGURED &&
2872 instance.state != DataSourceInstance::STARTED &&
2873 instance.state != DataSourceInstance::STOPPED) {
2874 return;
2875 }
2876
2877 auto* observable_events = AddObservableEvents();
2878 auto* change = observable_events->add_instance_state_changes();
2879 change->set_producer_name(producer.name_);
2880 change->set_data_source_name(instance.data_source_name);
2881 if (instance.state == DataSourceInstance::STARTED) {
2882 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
2883 } else {
2884 change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
2885 }
2886 }
2887
OnAllDataSourcesStarted()2888 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
2889 if (!(observable_events_mask_ &
2890 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
2891 return;
2892 }
2893 auto* observable_events = AddObservableEvents();
2894 observable_events->set_all_data_sources_started(true);
2895 }
2896
2897 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
GetWeakPtr()2898 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
2899 PERFETTO_DCHECK_THREAD(thread_checker_);
2900 return weak_ptr_factory_.GetWeakPtr();
2901 }
2902
2903 ObservableEvents*
AddObservableEvents()2904 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
2905 PERFETTO_DCHECK_THREAD(thread_checker_);
2906 if (!observable_events_) {
2907 observable_events_.reset(new ObservableEvents());
2908 auto weak_this = GetWeakPtr();
2909 task_runner_->PostTask([weak_this] {
2910 if (!weak_this)
2911 return;
2912
2913 // Move into a temporary to allow reentrancy in OnObservableEvents.
2914 auto observable_events = std::move(weak_this->observable_events_);
2915 weak_this->consumer_->OnObservableEvents(*observable_events);
2916 });
2917 }
2918 return observable_events_.get();
2919 }
2920
QueryServiceState(QueryServiceStateCallback callback)2921 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
2922 QueryServiceStateCallback callback) {
2923 PERFETTO_DCHECK_THREAD(thread_checker_);
2924 TracingServiceState svc_state;
2925
2926 const auto& sessions = service_->tracing_sessions_;
2927 svc_state.set_num_sessions(static_cast<int>(sessions.size()));
2928
2929 int num_started = 0;
2930 for (const auto& kv : sessions)
2931 num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
2932 svc_state.set_num_sessions_started(static_cast<int>(num_started));
2933
2934 for (const auto& kv : service_->producers_) {
2935 auto* producer = svc_state.add_producers();
2936 producer->set_id(static_cast<int>(kv.first));
2937 producer->set_name(kv.second->name_);
2938 producer->set_uid(static_cast<int32_t>(producer->uid()));
2939 }
2940
2941 for (const auto& kv : service_->data_sources_) {
2942 const auto& registered_data_source = kv.second;
2943 auto* data_source = svc_state.add_data_sources();
2944 *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
2945 data_source->set_producer_id(
2946 static_cast<int>(registered_data_source.producer_id));
2947 }
2948 callback(/*success=*/true, svc_state);
2949 }
2950
QueryCapabilities(QueryCapabilitiesCallback callback)2951 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
2952 QueryCapabilitiesCallback callback) {
2953 PERFETTO_DCHECK_THREAD(thread_checker_);
2954 TracingServiceCapabilities caps;
2955 caps.set_has_query_capabilities(true);
2956 caps.set_has_trace_config_output_path(true);
2957 caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
2958 caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
2959 static_assert(ObservableEvents::Type_MAX ==
2960 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED,
2961 "");
2962 callback(caps);
2963 }
2964
2965 ////////////////////////////////////////////////////////////////////////////////
2966 // TracingServiceImpl::ProducerEndpointImpl implementation
2967 ////////////////////////////////////////////////////////////////////////////////
2968
ProducerEndpointImpl(ProducerID id,uid_t uid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,bool in_process,bool smb_scraping_enabled)2969 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
2970 ProducerID id,
2971 uid_t uid,
2972 TracingServiceImpl* service,
2973 base::TaskRunner* task_runner,
2974 Producer* producer,
2975 const std::string& producer_name,
2976 bool in_process,
2977 bool smb_scraping_enabled)
2978 : id_(id),
2979 uid_(uid),
2980 service_(service),
2981 task_runner_(task_runner),
2982 producer_(producer),
2983 name_(producer_name),
2984 in_process_(in_process),
2985 smb_scraping_enabled_(smb_scraping_enabled),
2986 weak_ptr_factory_(this) {}
2987
~ProducerEndpointImpl()2988 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
2989 service_->DisconnectProducer(id_);
2990 producer_->OnDisconnect();
2991 }
2992
RegisterDataSource(const DataSourceDescriptor & desc)2993 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
2994 const DataSourceDescriptor& desc) {
2995 PERFETTO_DCHECK_THREAD(thread_checker_);
2996 if (desc.name().empty()) {
2997 PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2998 return;
2999 }
3000
3001 service_->RegisterDataSource(id_, desc);
3002 }
3003
UnregisterDataSource(const std::string & name)3004 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
3005 const std::string& name) {
3006 PERFETTO_DCHECK_THREAD(thread_checker_);
3007 service_->UnregisterDataSource(id_, name);
3008 }
3009
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)3010 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
3011 uint32_t writer_id,
3012 uint32_t target_buffer) {
3013 PERFETTO_DCHECK_THREAD(thread_checker_);
3014 writers_[static_cast<WriterID>(writer_id)] =
3015 static_cast<BufferID>(target_buffer);
3016 }
3017
UnregisterTraceWriter(uint32_t writer_id)3018 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
3019 uint32_t writer_id) {
3020 PERFETTO_DCHECK_THREAD(thread_checker_);
3021 writers_.erase(static_cast<WriterID>(writer_id));
3022 }
3023
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)3024 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
3025 const CommitDataRequest& req_untrusted,
3026 CommitDataCallback callback) {
3027 PERFETTO_DCHECK_THREAD(thread_checker_);
3028
3029 if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
3030 PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
3031 EncodeCommitDataRequest(id_, req_untrusted));
3032 }
3033
3034 if (!shared_memory_) {
3035 PERFETTO_DLOG(
3036 "Attempted to commit data before the shared memory was allocated.");
3037 return;
3038 }
3039 PERFETTO_DCHECK(shmem_abi_.is_valid());
3040 for (const auto& entry : req_untrusted.chunks_to_move()) {
3041 const uint32_t page_idx = entry.page();
3042 if (page_idx >= shmem_abi_.num_pages())
3043 continue; // A buggy or malicious producer.
3044
3045 SharedMemoryABI::Chunk chunk =
3046 shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
3047 if (!chunk.is_valid()) {
3048 PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
3049 entry.page(), entry.chunk());
3050 continue;
3051 }
3052
3053 // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
3054 // the ABI contract expects the producer to not touch the chunk anymore
3055 // (until the service marks that as free). This is why all the reads below
3056 // are just memory_order_relaxed. Also, the code here assumes that all this
3057 // data can be malicious and just gives up if anything is malformed.
3058 BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
3059 const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
3060 WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
3061 ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
3062 auto packets = chunk_header.packets.load(std::memory_order_relaxed);
3063 uint16_t num_fragments = packets.count;
3064 uint8_t chunk_flags = packets.flags;
3065
3066 service_->CopyProducerPageIntoLogBuffer(
3067 id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
3068 /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
3069
3070 // This one has release-store semantics.
3071 shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
3072 } // for(chunks_to_move)
3073
3074 service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
3075
3076 if (req_untrusted.flush_request_id()) {
3077 service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
3078 }
3079
3080 // Keep this invocation last. ProducerIPCService::CommitData() relies on this
3081 // callback being invoked within the same callstack and not posted. If this
3082 // changes, the code there needs to be changed accordingly.
3083 if (callback)
3084 callback();
3085 }
3086
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)3087 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
3088 std::unique_ptr<SharedMemory> shared_memory,
3089 size_t page_size_bytes,
3090 bool provided_by_producer) {
3091 PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
3092 PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
3093
3094 shared_memory_ = std::move(shared_memory);
3095 shared_buffer_page_size_kb_ = page_size_bytes / 1024;
3096 is_shmem_provided_by_producer_ = provided_by_producer;
3097
3098 shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
3099 shared_memory_->size(),
3100 shared_buffer_page_size_kb() * 1024);
3101 if (in_process_) {
3102 inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
3103 shared_memory_->start(), shared_memory_->size(),
3104 shared_buffer_page_size_kb_ * 1024, this, task_runner_));
3105 }
3106
3107 OnTracingSetup();
3108 service_->UpdateMemoryGuardrail();
3109 }
3110
shared_memory() const3111 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
3112 PERFETTO_DCHECK_THREAD(thread_checker_);
3113 return shared_memory_.get();
3114 }
3115
shared_buffer_page_size_kb() const3116 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
3117 const {
3118 return shared_buffer_page_size_kb_;
3119 }
3120
ActivateTriggers(const std::vector<std::string> & triggers)3121 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
3122 const std::vector<std::string>& triggers) {
3123 service_->ActivateTriggers(id_, triggers);
3124 }
3125
StopDataSource(DataSourceInstanceID ds_inst_id)3126 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
3127 DataSourceInstanceID ds_inst_id) {
3128 // TODO(primiano): When we'll support tearing down the SMB, at this point we
3129 // should send the Producer a TearDownTracing if all its data sources have
3130 // been disabled (see b/77532839 and aosp/655179 PS1).
3131 PERFETTO_DCHECK_THREAD(thread_checker_);
3132 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3133 task_runner_->PostTask([weak_this, ds_inst_id] {
3134 if (weak_this)
3135 weak_this->producer_->StopDataSource(ds_inst_id);
3136 });
3137 }
3138
3139 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()3140 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
3141 if (!inproc_shmem_arbiter_) {
3142 PERFETTO_FATAL(
3143 "The in-process SharedMemoryArbiter can only be used when "
3144 "CreateProducer has been called with in_process=true and after tracing "
3145 "has started.");
3146 }
3147
3148 PERFETTO_DCHECK(in_process_);
3149 return inproc_shmem_arbiter_.get();
3150 }
3151
IsShmemProvidedByProducer() const3152 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
3153 const {
3154 return is_shmem_provided_by_producer_;
3155 }
3156
3157 // Can be called on any thread.
3158 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)3159 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
3160 BufferID buf_id,
3161 BufferExhaustedPolicy buffer_exhausted_policy) {
3162 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
3163 return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
3164 buffer_exhausted_policy);
3165 }
3166
NotifyFlushComplete(FlushRequestID id)3167 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
3168 FlushRequestID id) {
3169 PERFETTO_DCHECK_THREAD(thread_checker_);
3170 PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
3171 return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
3172 }
3173
OnTracingSetup()3174 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
3175 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3176 task_runner_->PostTask([weak_this] {
3177 if (weak_this)
3178 weak_this->producer_->OnTracingSetup();
3179 });
3180 }
3181
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)3182 void TracingServiceImpl::ProducerEndpointImpl::Flush(
3183 FlushRequestID flush_request_id,
3184 const std::vector<DataSourceInstanceID>& data_sources) {
3185 PERFETTO_DCHECK_THREAD(thread_checker_);
3186 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3187 task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
3188 if (weak_this) {
3189 weak_this->producer_->Flush(flush_request_id, data_sources.data(),
3190 data_sources.size());
3191 }
3192 });
3193 }
3194
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)3195 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
3196 DataSourceInstanceID ds_id,
3197 const DataSourceConfig& config) {
3198 PERFETTO_DCHECK_THREAD(thread_checker_);
3199 allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
3200 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3201 task_runner_->PostTask([weak_this, ds_id, config] {
3202 if (weak_this)
3203 weak_this->producer_->SetupDataSource(ds_id, std::move(config));
3204 });
3205 }
3206
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)3207 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
3208 DataSourceInstanceID ds_id,
3209 const DataSourceConfig& config) {
3210 PERFETTO_DCHECK_THREAD(thread_checker_);
3211 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3212 task_runner_->PostTask([weak_this, ds_id, config] {
3213 if (weak_this)
3214 weak_this->producer_->StartDataSource(ds_id, std::move(config));
3215 });
3216 }
3217
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)3218 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
3219 DataSourceInstanceID data_source_id) {
3220 PERFETTO_DCHECK_THREAD(thread_checker_);
3221 service_->NotifyDataSourceStarted(id_, data_source_id);
3222 }
3223
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)3224 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
3225 DataSourceInstanceID data_source_id) {
3226 PERFETTO_DCHECK_THREAD(thread_checker_);
3227 service_->NotifyDataSourceStopped(id_, data_source_id);
3228 }
3229
OnFreeBuffers(const std::vector<BufferID> & target_buffers)3230 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
3231 const std::vector<BufferID>& target_buffers) {
3232 if (allowed_target_buffers_.empty())
3233 return;
3234 for (BufferID buffer : target_buffers)
3235 allowed_target_buffers_.erase(buffer);
3236 }
3237
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)3238 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
3239 const std::vector<DataSourceInstanceID>& data_sources) {
3240 PERFETTO_DCHECK_THREAD(thread_checker_);
3241 auto weak_this = weak_ptr_factory_.GetWeakPtr();
3242 task_runner_->PostTask([weak_this, data_sources] {
3243 if (weak_this) {
3244 weak_this->producer_->ClearIncrementalState(data_sources.data(),
3245 data_sources.size());
3246 }
3247 });
3248 }
3249
Sync(std::function<void ()> callback)3250 void TracingServiceImpl::ProducerEndpointImpl::Sync(
3251 std::function<void()> callback) {
3252 task_runner_->PostTask(callback);
3253 }
3254
3255 ////////////////////////////////////////////////////////////////////////////////
3256 // TracingServiceImpl::TracingSession implementation
3257 ////////////////////////////////////////////////////////////////////////////////
3258
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config)3259 TracingServiceImpl::TracingSession::TracingSession(
3260 TracingSessionID session_id,
3261 ConsumerEndpointImpl* consumer,
3262 const TraceConfig& new_config)
3263 : id(session_id),
3264 consumer_maybe_null(consumer),
3265 consumer_uid(consumer->uid_),
3266 config(new_config) {}
3267
3268 } // namespace perfetto
3269