1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/tracing_service_impl.h"
18
19 #include "perfetto/base/build_config.h"
20
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <string.h>
25 #include <regex>
26 #include <unordered_set>
27
28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
29 #include <sys/uio.h>
30 #include <sys/utsname.h>
31 #include <unistd.h>
32 #endif
33
34 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
35 #include <sys/system_properties.h>
36 #endif
37
38 #include <algorithm>
39
40 #include "perfetto/base/build_config.h"
41 #include "perfetto/base/file_utils.h"
42 #include "perfetto/base/task_runner.h"
43 #include "perfetto/base/utils.h"
44 #include "perfetto/tracing/core/consumer.h"
45 #include "perfetto/tracing/core/data_source_config.h"
46 #include "perfetto/tracing/core/producer.h"
47 #include "perfetto/tracing/core/shared_memory.h"
48 #include "perfetto/tracing/core/shared_memory_abi.h"
49 #include "perfetto/tracing/core/trace_packet.h"
50 #include "perfetto/tracing/core/trace_writer.h"
51 #include "src/tracing/core/packet_stream_validator.h"
52 #include "src/tracing/core/shared_memory_arbiter_impl.h"
53 #include "src/tracing/core/trace_buffer.h"
54
55 #include "perfetto/trace/clock_snapshot.pb.h"
56 #include "perfetto/trace/system_info.pb.h"
57 #include "perfetto/trace/trusted_packet.pb.h"
58
59 // General note: this class must assume that Producers are malicious and will
60 // try to crash / exploit this class. We can trust pointers because they come
61 // from the IPC layer, but we should never assume that that the producer calls
62 // come in the right order or their arguments are sane / within bounds.
63
64 namespace perfetto {
65
66 namespace {
67 constexpr size_t kDefaultShmPageSize = base::kPageSize;
68 constexpr int kMaxBuffersPerConsumer = 128;
69 constexpr base::TimeMillis kSnapshotsInterval(10 * 1000);
70 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
71 constexpr int kMaxConcurrentTracingSessions = 5;
72
73 constexpr uint32_t kMillisPerHour = 3600000;
74 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
75
76 // These apply only if enable_extra_guardrails is true.
77 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 32 * 1024;
78 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
79
80 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
81 struct iovec {
82 void* iov_base; // Address
83 size_t iov_len; // Block size
84 };
85
86 // Simple implementation of writev. Note that this does not give the atomicity
87 // guarantees of a real writev, but we don't depend on these (we aren't writing
88 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)89 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
90 ssize_t total_size = 0;
91 for (int i = 0; i < iovcnt; ++i) {
92 ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
93 if (current_size != static_cast<ssize_t>(iov[i].iov_len))
94 return -1;
95 total_size += current_size;
96 }
97 return total_size;
98 }
99
100 #define IOV_MAX 1024 // Linux compatible limit.
101
102 // uid checking is a NOP on Windows.
getuid()103 uid_t getuid() {
104 return 0;
105 }
geteuid()106 uid_t geteuid() {
107 return 0;
108 }
109 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
110
111 } // namespace
112
113 // These constants instead are defined in the header because are used by tests.
114 constexpr size_t TracingServiceImpl::kDefaultShmSize;
115 constexpr size_t TracingServiceImpl::kMaxShmSize;
116 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
117 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
118
119 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)120 std::unique_ptr<TracingService> TracingService::CreateInstance(
121 std::unique_ptr<SharedMemory::Factory> shm_factory,
122 base::TaskRunner* task_runner) {
123 return std::unique_ptr<TracingService>(
124 new TracingServiceImpl(std::move(shm_factory), task_runner));
125 }
126
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)127 TracingServiceImpl::TracingServiceImpl(
128 std::unique_ptr<SharedMemory::Factory> shm_factory,
129 base::TaskRunner* task_runner)
130 : task_runner_(task_runner),
131 shm_factory_(std::move(shm_factory)),
132 uid_(getuid()),
133 buffer_ids_(kMaxTraceBufferID),
134 weak_ptr_factory_(this) {
135 PERFETTO_DCHECK(task_runner_);
136 }
137
~TracingServiceImpl()138 TracingServiceImpl::~TracingServiceImpl() {
139 // TODO(fmayer): handle teardown of all Producer.
140 }
141
142 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode)143 TracingServiceImpl::ConnectProducer(Producer* producer,
144 uid_t uid,
145 const std::string& producer_name,
146 size_t shared_memory_size_hint_bytes,
147 bool in_process,
148 ProducerSMBScrapingMode smb_scraping_mode) {
149 PERFETTO_DCHECK_THREAD(thread_checker_);
150
151 if (lockdown_mode_ && uid != geteuid()) {
152 PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
153 static_cast<unsigned long>(uid));
154 return nullptr;
155 }
156
157 if (producers_.size() >= kMaxProducerID) {
158 PERFETTO_DFATAL("Too many producers.");
159 return nullptr;
160 }
161 const ProducerID id = GetNextProducerID();
162 PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
163
164 bool smb_scraping_enabled = smb_scraping_enabled_;
165 switch (smb_scraping_mode) {
166 case ProducerSMBScrapingMode::kDefault:
167 break;
168 case ProducerSMBScrapingMode::kEnabled:
169 smb_scraping_enabled = true;
170 break;
171 case ProducerSMBScrapingMode::kDisabled:
172 smb_scraping_enabled = false;
173 break;
174 }
175
176 std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
177 id, uid, this, task_runner_, producer, producer_name, in_process,
178 smb_scraping_enabled));
179 auto it_and_inserted = producers_.emplace(id, endpoint.get());
180 PERFETTO_DCHECK(it_and_inserted.second);
181 endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
182 task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
183
184 return std::move(endpoint);
185 }
186
DisconnectProducer(ProducerID id)187 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
188 PERFETTO_DCHECK_THREAD(thread_checker_);
189 PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
190 PERFETTO_DCHECK(producers_.count(id));
191
192 // Scrape remaining chunks for this producer to ensure we don't lose data.
193 if (auto* producer = GetProducer(id)) {
194 for (auto& session_id_and_session : tracing_sessions_)
195 ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
196 }
197
198 for (auto it = data_sources_.begin(); it != data_sources_.end();) {
199 auto next = it;
200 next++;
201 if (it->second.producer_id == id)
202 UnregisterDataSource(id, it->second.descriptor.name());
203 it = next;
204 }
205
206 producers_.erase(id);
207 UpdateMemoryGuardrail();
208 }
209
GetProducer(ProducerID id) const210 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
211 ProducerID id) const {
212 PERFETTO_DCHECK_THREAD(thread_checker_);
213 auto it = producers_.find(id);
214 if (it == producers_.end())
215 return nullptr;
216 return it->second;
217 }
218
219 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)220 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
221 PERFETTO_DCHECK_THREAD(thread_checker_);
222 PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
223 std::unique_ptr<ConsumerEndpointImpl> endpoint(
224 new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
225 auto it_and_inserted = consumers_.emplace(endpoint.get());
226 PERFETTO_DCHECK(it_and_inserted.second);
227 task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_));
228 return std::move(endpoint);
229 }
230
DisconnectConsumer(ConsumerEndpointImpl * consumer)231 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
232 PERFETTO_DCHECK_THREAD(thread_checker_);
233 PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
234 PERFETTO_DCHECK(consumers_.count(consumer));
235
236 // TODO(primiano) : Check that this is safe (what happens if there are
237 // ReadBuffers() calls posted in the meantime? They need to become noop).
238 if (consumer->tracing_session_id_)
239 FreeBuffers(consumer->tracing_session_id_); // Will also DisableTracing().
240 consumers_.erase(consumer);
241
242 // At this point no more pointers to |consumer| should be around.
243 PERFETTO_DCHECK(!std::any_of(
244 tracing_sessions_.begin(), tracing_sessions_.end(),
245 [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
246 return kv.second.consumer_maybe_null == consumer;
247 }));
248 }
249
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)250 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
251 const std::string& key) {
252 PERFETTO_DCHECK_THREAD(thread_checker_);
253 PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
254 PERFETTO_DCHECK(consumers_.count(consumer));
255
256 TracingSessionID tsid = consumer->tracing_session_id_;
257 TracingSession* tracing_session;
258 if (!tsid || !(tracing_session = GetTracingSession(tsid)))
259 return false;
260
261 if (GetDetachedSession(consumer->uid_, key)) {
262 PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
263 key.c_str());
264 return false;
265 }
266
267 PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
268 tracing_session->consumer_maybe_null = nullptr;
269 tracing_session->detach_key = key;
270 consumer->tracing_session_id_ = 0;
271 return true;
272 }
273
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)274 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
275 const std::string& key) {
276 PERFETTO_DCHECK_THREAD(thread_checker_);
277 PERFETTO_DLOG("Consumer %p attaching to session %s",
278 reinterpret_cast<void*>(consumer), key.c_str());
279 PERFETTO_DCHECK(consumers_.count(consumer));
280
281 if (consumer->tracing_session_id_) {
282 PERFETTO_ELOG(
283 "Cannot reattach consumer to session %s"
284 " while it already attached tracing session ID %" PRIu64,
285 key.c_str(), consumer->tracing_session_id_);
286 return false;
287 }
288
289 auto* tracing_session = GetDetachedSession(consumer->uid_, key);
290 if (!tracing_session) {
291 PERFETTO_ELOG(
292 "Failed to attach consumer, session '%s' not found for uid %d",
293 key.c_str(), static_cast<int>(consumer->uid_));
294 return false;
295 }
296
297 consumer->tracing_session_id_ = tracing_session->id;
298 tracing_session->consumer_maybe_null = consumer;
299 tracing_session->detach_key.clear();
300 return true;
301 }
302
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)303 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
304 const TraceConfig& cfg,
305 base::ScopedFile fd) {
306 PERFETTO_DCHECK_THREAD(thread_checker_);
307 PERFETTO_DLOG("Enabling tracing for consumer %p",
308 reinterpret_cast<void*>(consumer));
309 if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_SET)
310 lockdown_mode_ = true;
311 if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR)
312 lockdown_mode_ = false;
313 TracingSession* tracing_session =
314 GetTracingSession(consumer->tracing_session_id_);
315 if (tracing_session) {
316 PERFETTO_DLOG(
317 "A Consumer is trying to EnableTracing() but another tracing session "
318 "is already active (forgot a call to FreeBuffers() ?)");
319 return false;
320 }
321
322 const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
323 ? kGuardrailsMaxTracingDurationMillis
324 : kMaxTracingDurationMillis;
325 if (cfg.duration_ms() > max_duration_ms) {
326 PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms > %" PRIu32 " ms)",
327 cfg.duration_ms(), max_duration_ms);
328 return false;
329 }
330
331 const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
332 TraceConfig::TriggerConfig::UNSPECIFIED;
333 if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
334 cfg.trigger_config().trigger_timeout_ms() >
335 kGuardrailsMaxTracingDurationMillis)) {
336 PERFETTO_ELOG(
337 "Traces with START_TRACING triggers must provide a positive "
338 "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
339 cfg.trigger_config().trigger_timeout_ms());
340 return false;
341 }
342
343 if (has_trigger_config && cfg.duration_ms() != 0) {
344 PERFETTO_ELOG(
345 "duration_ms was set, this must not be set for traces with triggers.");
346 return false;
347 }
348
349 std::unordered_set<std::string> triggers;
350 for (const auto& trigger : cfg.trigger_config().triggers()) {
351 if (!triggers.insert(trigger.name()).second) {
352 PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
353 return false;
354 }
355 }
356
357 if (cfg.enable_extra_guardrails()) {
358 if (cfg.deferred_start()) {
359 PERFETTO_ELOG(
360 "deferred_start=true is not supported in unsupervised traces");
361 return false;
362 }
363 uint64_t buf_size_sum = 0;
364 for (const auto& buf : cfg.buffers())
365 buf_size_sum += buf.size_kb();
366 if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
367 PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
368 "kB > %" PRIu32 " kB)",
369 buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
370 return false;
371 }
372 }
373
374 if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
375 PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size());
376 return false;
377 }
378
379 if (!cfg.unique_session_name().empty()) {
380 const std::string& name = cfg.unique_session_name();
381 for (auto& kv : tracing_sessions_) {
382 if (kv.second.config.unique_session_name() == name) {
383 PERFETTO_ELOG(
384 "A trace wtih this unique session name (%s) already exists",
385 name.c_str());
386 return false;
387 }
388 }
389 }
390
391 // TODO(primiano): This is a workaround to prevent that a producer gets stuck
392 // in a state where it stalls by design by having more TraceWriterImpl
393 // instances than free pages in the buffer. This is really a bug in
394 // trace_probes and the way it handles stalls in the shmem buffer.
395 if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
396 PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
397 tracing_sessions_.size());
398 return false;
399 }
400
401 const TracingSessionID tsid = ++last_tracing_session_id_;
402 tracing_session =
403 &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
404 .first->second;
405
406 if (cfg.write_into_file()) {
407 if (!fd) {
408 PERFETTO_ELOG(
409 "The TraceConfig had write_into_file==true but no fd was passed");
410 tracing_sessions_.erase(tsid);
411 return false;
412 }
413 tracing_session->write_into_file = std::move(fd);
414 uint32_t write_period_ms = cfg.file_write_period_ms();
415 if (write_period_ms == 0)
416 write_period_ms = kDefaultWriteIntoFilePeriodMs;
417 if (write_period_ms < min_write_period_ms_)
418 write_period_ms = min_write_period_ms_;
419 tracing_session->write_period_ms = write_period_ms;
420 tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
421 tracing_session->bytes_written_into_file = 0;
422 }
423
424 // Initialize the log buffers.
425 bool did_allocate_all_buffers = true;
426
427 // Allocate the trace buffers. Also create a map to translate a consumer
428 // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
429 // corresponding BufferID, which is a global ID namespace for the service and
430 // all producers.
431 size_t total_buf_size_kb = 0;
432 const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
433 tracing_session->buffers_index.reserve(num_buffers);
434 for (size_t i = 0; i < num_buffers; i++) {
435 const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
436 BufferID global_id = buffer_ids_.Allocate();
437 if (!global_id) {
438 did_allocate_all_buffers = false; // We ran out of IDs.
439 break;
440 }
441 tracing_session->buffers_index.push_back(global_id);
442 const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
443 total_buf_size_kb += buffer_cfg.size_kb();
444 TraceBuffer::OverwritePolicy policy =
445 buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
446 ? TraceBuffer::kDiscard
447 : TraceBuffer::kOverwrite;
448 auto it_and_inserted = buffers_.emplace(
449 global_id, TraceBuffer::Create(buf_size_bytes, policy));
450 PERFETTO_DCHECK(it_and_inserted.second); // buffers_.count(global_id) == 0.
451 std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
452 if (!trace_buffer) {
453 did_allocate_all_buffers = false;
454 break;
455 }
456 }
457
458 UpdateMemoryGuardrail();
459
460 // This can happen if either:
461 // - All the kMaxTraceBufferID slots are taken.
462 // - OOM, or, more relistically, we exhausted virtual memory.
463 // In any case, free all the previously allocated buffers and abort.
464 // TODO(fmayer): add a test to cover this case, this is quite subtle.
465 if (!did_allocate_all_buffers) {
466 for (BufferID global_id : tracing_session->buffers_index) {
467 buffer_ids_.Free(global_id);
468 buffers_.erase(global_id);
469 }
470 tracing_sessions_.erase(tsid);
471 return false;
472 }
473
474 consumer->tracing_session_id_ = tsid;
475
476 // Setup the data sources on the producers without starting them.
477 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
478 // Scan all the registered data sources with a matching name.
479 auto range = data_sources_.equal_range(cfg_data_source.config().name());
480 for (auto it = range.first; it != range.second; it++) {
481 TraceConfig::ProducerConfig producer_config;
482 for (auto& config : cfg.producers()) {
483 if (GetProducer(it->second.producer_id)->name_ ==
484 config.producer_name()) {
485 producer_config = config;
486 break;
487 }
488 }
489 SetupDataSource(cfg_data_source, producer_config, it->second,
490 tracing_session);
491 }
492 }
493
494 bool has_start_trigger = false;
495 auto weak_this = weak_ptr_factory_.GetWeakPtr();
496 switch (cfg.trigger_config().trigger_mode()) {
497 case TraceConfig::TriggerConfig::UNSPECIFIED:
498 // no triggers are specified so this isn't a trace that is using triggers.
499 PERFETTO_DCHECK(!has_trigger_config);
500 break;
501 case TraceConfig::TriggerConfig::START_TRACING:
502 // For traces which use START_TRACE triggers we need to ensure that the
503 // tracing session will be cleaned up when it times out.
504 has_start_trigger = true;
505 task_runner_->PostDelayedTask(
506 [weak_this, tsid]() {
507 if (weak_this)
508 weak_this->OnStartTriggersTimeout(tsid);
509 },
510 cfg.trigger_config().trigger_timeout_ms());
511 break;
512 case TraceConfig::TriggerConfig::STOP_TRACING:
513 // Update the tracing_session's duration_ms to ensure that if no trigger
514 // is received the session will end and be cleaned up equal to the
515 // timeout.
516 //
517 // TODO(nuskos): Refactor this so that rather then modifying the config we
518 // have a field we look at on the tracing_session.
519 tracing_session->config.set_duration_ms(
520 cfg.trigger_config().trigger_timeout_ms());
521 break;
522 }
523
524 tracing_session->state = TracingSession::CONFIGURED;
525 PERFETTO_LOG(
526 "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
527 "buffer size:%zu KB, total sessions:%zu",
528 cfg.data_sources().size(), tracing_session->config.duration_ms(),
529 cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size());
530
531 // Start the data sources, unless this is a case of early setup + fast
532 // triggering, either through TraceConfig.deferred_start or
533 // TraceConfig.trigger_config(). If both are specified which ever one occurs
534 // first will initiate the trace.
535 if (!cfg.deferred_start() && !has_start_trigger)
536 return StartTracing(tsid);
537
538 return true;
539 }
540
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)541 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
542 const TraceConfig& updated_cfg) {
543 PERFETTO_DCHECK_THREAD(thread_checker_);
544 TracingSession* tracing_session =
545 GetTracingSession(consumer->tracing_session_id_);
546 PERFETTO_DCHECK(tracing_session);
547
548 if ((tracing_session->state != TracingSession::STARTED) &&
549 (tracing_session->state != TracingSession::CONFIGURED)) {
550 PERFETTO_ELOG(
551 "ChangeTraceConfig() was called for a tracing session which isn't "
552 "running.");
553 return;
554 }
555
556 // We only support updating producer_name_filter (and pass-through configs)
557 // for now; null out any changeable fields and make sure the rest are
558 // identical.
559 TraceConfig new_config_copy(updated_cfg);
560 for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
561 ds_cfg.clear_producer_name_filter();
562 }
563
564 TraceConfig current_config_copy(tracing_session->config);
565 for (auto& ds_cfg : *current_config_copy.mutable_data_sources())
566 ds_cfg.clear_producer_name_filter();
567
568 if (new_config_copy != current_config_copy) {
569 PERFETTO_LOG(
570 "ChangeTraceConfig() was called with a config containing unsupported "
571 "changes; only adding to the producer_name_filter is currently "
572 "supported and will have an effect.");
573 }
574
575 for (TraceConfig::DataSource& cfg_data_source :
576 *tracing_session->config.mutable_data_sources()) {
577 // Find the updated producer_filter in the new config.
578 std::vector<std::string> new_producer_name_filter;
579 bool found_data_source = false;
580 for (auto it : updated_cfg.data_sources()) {
581 if (cfg_data_source.config().name() == it.config().name()) {
582 new_producer_name_filter = it.producer_name_filter();
583 found_data_source = true;
584 break;
585 }
586 }
587
588 // Bail out if data source not present in the new config.
589 if (!found_data_source) {
590 PERFETTO_ELOG(
591 "ChangeTraceConfig() called without a current data source also "
592 "present in the new "
593 "config: %s",
594 cfg_data_source.config().name().c_str());
595 continue;
596 }
597
598 // TODO(oysteine): Just replacing the filter means that if
599 // there are any filter entries which were present in the original config,
600 // but removed from the config passed to ChangeTraceConfig, any matching
601 // producers will keep producing but newly added producers after this
602 // point will never start.
603 *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
604
605 // Scan all the registered data sources with a matching name.
606 auto range = data_sources_.equal_range(cfg_data_source.config().name());
607 for (auto it = range.first; it != range.second; it++) {
608 ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
609 PERFETTO_DCHECK(producer);
610
611 // Check if the producer name of this data source is present
612 // in the name filter. We currently only support new filters, not removing
613 // old ones.
614 if (!new_producer_name_filter.empty() &&
615 std::find(new_producer_name_filter.begin(),
616 new_producer_name_filter.end(),
617 producer->name_) == new_producer_name_filter.end()) {
618 continue;
619 }
620
621 bool already_setup = false;
622 auto& ds_instances = tracing_session->data_source_instances;
623 for (auto instance_it = ds_instances.begin();
624 instance_it != ds_instances.end(); ++instance_it) {
625 if (instance_it->first == it->second.producer_id &&
626 instance_it->second.data_source_name ==
627 cfg_data_source.config().name()) {
628 already_setup = true;
629 break;
630 }
631 }
632
633 if (already_setup)
634 continue;
635
636 // If it wasn't previously setup, set it up now.
637 // (The per-producer config is optional).
638 TraceConfig::ProducerConfig producer_config;
639 for (auto& config : tracing_session->config.producers()) {
640 if (producer->name_ == config.producer_name()) {
641 producer_config = config;
642 break;
643 }
644 }
645
646 DataSourceInstance* ds_inst = SetupDataSource(
647 cfg_data_source, producer_config, it->second, tracing_session);
648
649 if (ds_inst && tracing_session->state == TracingSession::STARTED)
650 StartDataSourceInstance(producer, tracing_session, ds_inst);
651 }
652 }
653 }
654
StartTracing(TracingSessionID tsid)655 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
656 PERFETTO_DCHECK_THREAD(thread_checker_);
657 TracingSession* tracing_session = GetTracingSession(tsid);
658 if (!tracing_session) {
659 PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
660 return false;
661 }
662
663 if (tracing_session->state != TracingSession::CONFIGURED) {
664 PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
665 tracing_session->state);
666 return false;
667 }
668
669 tracing_session->state = TracingSession::STARTED;
670
671 if (!tracing_session->config.builtin_data_sources()
672 .disable_clock_snapshotting()) {
673 SnapshotClocks(&tracing_session->initial_clock_snapshot_,
674 /*set_root_timestamp=*/true);
675 }
676
677 // Trigger delayed task if the trace is time limited.
678 const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
679 if (trace_duration_ms > 0) {
680 auto weak_this = weak_ptr_factory_.GetWeakPtr();
681 task_runner_->PostDelayedTask(
682 [weak_this, tsid] {
683 // Skip entirely the flush if the trace session doesn't exist anymore.
684 // This is to prevent misleading error messages to be logged.
685 if (!weak_this)
686 return;
687 auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
688 if (!tracing_session_ptr)
689 return;
690 // If this trace was using STOP_TRACING triggers and we've seen
691 // one, then the trigger overrides the normal timeout. In this
692 // case we just return and let the other task clean up this trace.
693 if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
694 TraceConfig::TriggerConfig::STOP_TRACING &&
695 !tracing_session_ptr->received_triggers.empty())
696 return;
697 // In all other cases (START_TRACING or no triggers) we flush
698 // after |trace_duration_ms| unconditionally.
699 weak_this->FlushAndDisableTracing(tsid);
700 },
701 trace_duration_ms);
702 }
703
704 // Start the periodic drain tasks if we should to save the trace into a file.
705 if (tracing_session->config.write_into_file()) {
706 auto weak_this = weak_ptr_factory_.GetWeakPtr();
707 task_runner_->PostDelayedTask(
708 [weak_this, tsid] {
709 if (weak_this)
710 weak_this->ReadBuffers(tsid, nullptr);
711 },
712 tracing_session->delay_to_next_write_period_ms());
713 }
714
715 // Start the periodic flush tasks if the config specified a flush period.
716 if (tracing_session->config.flush_period_ms())
717 PeriodicFlushTask(tsid, /*post_next_only=*/true);
718
719 // Start the periodic incremental state clear tasks if the config specified a
720 // period.
721 if (tracing_session->config.incremental_state_config().clear_period_ms()) {
722 PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
723 }
724
725 for (auto& kv : tracing_session->data_source_instances) {
726 ProducerID producer_id = kv.first;
727 DataSourceInstance& data_source = kv.second;
728 ProducerEndpointImpl* producer = GetProducer(producer_id);
729 if (!producer) {
730 PERFETTO_DFATAL("Producer does not exist.");
731 continue;
732 }
733 StartDataSourceInstance(producer, tracing_session, &data_source);
734 }
735 return true;
736 }
737
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)738 void TracingServiceImpl::StartDataSourceInstance(
739 ProducerEndpointImpl* producer,
740 TracingSession* tracing_session,
741 TracingServiceImpl::DataSourceInstance* instance) {
742 PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
743 if (instance->will_notify_on_start) {
744 instance->state = DataSourceInstance::STARTING;
745 } else {
746 instance->state = DataSourceInstance::STARTED;
747 }
748 if (tracing_session->consumer_maybe_null) {
749 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
750 *producer, *instance);
751 }
752 producer->StartDataSource(instance->instance_id, instance->config);
753 }
754
755 // DisableTracing just stops the data sources but doesn't free up any buffer.
756 // This is to allow the consumer to freeze the buffers (by stopping the trace)
757 // and then drain the buffers. The actual teardown of the TracingSession happens
758 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)759 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
760 bool disable_immediately) {
761 PERFETTO_DCHECK_THREAD(thread_checker_);
762 TracingSession* tracing_session = GetTracingSession(tsid);
763 if (!tracing_session) {
764 // Can happen if the consumer calls this before EnableTracing() or after
765 // FreeBuffers().
766 PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
767 return;
768 }
769
770 switch (tracing_session->state) {
771 // Spurious call to DisableTracing() while already disabled, nothing to do.
772 case TracingSession::DISABLED:
773 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
774 return;
775
776 // This is either:
777 // A) The case of a graceful DisableTracing() call followed by a call to
778 // FreeBuffers(), iff |disable_immediately| == true. In this case we want
779 // to forcefully transition in the disabled state without waiting for the
780 // outstanding acks because the buffers are going to be destroyed soon.
781 // B) A spurious call, iff |disable_immediately| == false, in which case
782 // there is nothing to do.
783 case TracingSession::DISABLING_WAITING_STOP_ACKS:
784 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
785 if (disable_immediately)
786 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
787 return;
788
789 // Continues below.
790 case TracingSession::CONFIGURED:
791 // If the session didn't even start there is no need to orchestrate a
792 // graceful stop of data sources.
793 disable_immediately = true;
794 break;
795
796 // This is the nominal case, continues below.
797 case TracingSession::STARTED:
798 break;
799 }
800
801 for (auto& data_source_inst : tracing_session->data_source_instances) {
802 const ProducerID producer_id = data_source_inst.first;
803 DataSourceInstance& instance = data_source_inst.second;
804 const DataSourceInstanceID ds_inst_id = instance.instance_id;
805 ProducerEndpointImpl* producer = GetProducer(producer_id);
806 PERFETTO_DCHECK(producer);
807 PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
808 instance.state == DataSourceInstance::STARTING ||
809 instance.state == DataSourceInstance::STARTED);
810 if (instance.will_notify_on_stop && !disable_immediately) {
811 instance.state = DataSourceInstance::STOPPING;
812 } else {
813 instance.state = DataSourceInstance::STOPPED;
814 }
815 if (tracing_session->consumer_maybe_null) {
816 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
817 *producer, instance);
818 }
819 producer->StopDataSource(ds_inst_id);
820 }
821
822 // Either this request is flagged with |disable_immediately| or there are no
823 // data sources that are requesting a final handshake. In both cases just mark
824 // the session as disabled immediately, notify the consumer and flush the
825 // trace file (if used).
826 if (tracing_session->AllDataSourceInstancesStopped())
827 return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
828
829 tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
830 auto weak_this = weak_ptr_factory_.GetWeakPtr();
831 auto timeout_ms = override_data_source_test_timeout_ms_for_testing
832 ? override_data_source_test_timeout_ms_for_testing
833 : kDataSourceStopTimeoutMs;
834 task_runner_->PostDelayedTask(
835 [weak_this, tsid] {
836 if (weak_this)
837 weak_this->OnDisableTracingTimeout(tsid);
838 },
839 timeout_ms);
840
841 // Deliberately NOT removing the session from |tracing_session_|, it's still
842 // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
843 }
844
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)845 void TracingServiceImpl::NotifyDataSourceStarted(
846 ProducerID producer_id,
847 DataSourceInstanceID instance_id) {
848 PERFETTO_DCHECK_THREAD(thread_checker_);
849 for (auto& kv : tracing_sessions_) {
850 TracingSession& tracing_session = kv.second;
851 DataSourceInstance* instance =
852 tracing_session.GetDataSourceInstance(producer_id, instance_id);
853
854 if (!instance)
855 continue;
856
857 if (instance->state != DataSourceInstance::STARTING) {
858 PERFETTO_ELOG("Data source instance in incorrect state.");
859 continue;
860 }
861
862 instance->state = DataSourceInstance::STARTED;
863
864 ProducerEndpointImpl* producer = GetProducer(producer_id);
865 PERFETTO_DCHECK(producer);
866 if (tracing_session.consumer_maybe_null) {
867 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
868 *producer, *instance);
869 }
870 } // for (tracing_session)
871 }
872
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)873 void TracingServiceImpl::NotifyDataSourceStopped(
874 ProducerID producer_id,
875 DataSourceInstanceID instance_id) {
876 PERFETTO_DCHECK_THREAD(thread_checker_);
877 for (auto& kv : tracing_sessions_) {
878 TracingSession& tracing_session = kv.second;
879 DataSourceInstance* instance =
880 tracing_session.GetDataSourceInstance(producer_id, instance_id);
881
882 if (!instance)
883 continue;
884
885 if (instance->state != DataSourceInstance::STOPPING) {
886 PERFETTO_ELOG("Data source instance in incorrect state.");
887 continue;
888 }
889 PERFETTO_DCHECK(tracing_session.state ==
890 TracingSession::DISABLING_WAITING_STOP_ACKS);
891
892 instance->state = DataSourceInstance::STOPPED;
893
894 ProducerEndpointImpl* producer = GetProducer(producer_id);
895 PERFETTO_DCHECK(producer);
896 if (tracing_session.consumer_maybe_null) {
897 tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
898 *producer, *instance);
899 }
900
901 if (!tracing_session.AllDataSourceInstancesStopped())
902 continue;
903
904 // All data sources acked the termination.
905 DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
906 } // for (tracing_session)
907 }
908
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)909 void TracingServiceImpl::ActivateTriggers(
910 ProducerID producer_id,
911 const std::vector<std::string>& triggers) {
912 PERFETTO_DCHECK_THREAD(thread_checker_);
913 auto* producer = GetProducer(producer_id);
914 PERFETTO_DCHECK(producer);
915 for (const auto& trigger_name : triggers) {
916 for (auto& id_and_tracing_session : tracing_sessions_) {
917 auto& tracing_session = id_and_tracing_session.second;
918 TracingSessionID tsid = id_and_tracing_session.first;
919 auto iter = std::find_if(
920 tracing_session.config.trigger_config().triggers().begin(),
921 tracing_session.config.trigger_config().triggers().end(),
922 [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
923 return trigger.name() == trigger_name;
924 });
925 if (iter == tracing_session.config.trigger_config().triggers().end()) {
926 continue;
927 }
928
929 // If this trigger requires a certain producer to have sent it
930 // (non-empty producer_name()) ensure the producer who sent this trigger
931 // matches.
932 if (!iter->producer_name_regex().empty() &&
933 !std::regex_match(producer->name_,
934 std::regex(iter->producer_name_regex()))) {
935 continue;
936 }
937
938 const bool triggers_already_received =
939 !tracing_session.received_triggers.empty();
940 tracing_session.received_triggers.push_back(
941 {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
942 producer->name_, producer->uid_});
943 auto weak_this = weak_ptr_factory_.GetWeakPtr();
944 switch (tracing_session.config.trigger_config().trigger_mode()) {
945 case TraceConfig::TriggerConfig::START_TRACING:
946 // If the session has already been triggered and moved past
947 // CONFIGURED then we don't need to repeat StartTracing. This would
948 // work fine (StartTracing would return false) but would add error
949 // logs.
950 if (tracing_session.state != TracingSession::CONFIGURED)
951 break;
952
953 PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
954 " with duration of %" PRIu32 "ms.",
955 iter->name().c_str(), tsid, iter->stop_delay_ms());
956 // We override the trace duration to be the trigger's requested
957 // value, this ensures that the trace will end after this amount
958 // of time has passed.
959 tracing_session.config.set_duration_ms(iter->stop_delay_ms());
960 StartTracing(tsid);
961 break;
962 case TraceConfig::TriggerConfig::STOP_TRACING:
963 // Only stop the trace once to avoid confusing log messages. I.E.
964 // when we've already hit the first trigger we've already Posted the
965 // task to FlushAndDisable. So all future triggers will just break
966 // out.
967 if (triggers_already_received)
968 break;
969
970 PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
971 " with duration of %" PRIu32 "ms.",
972 iter->name().c_str(), tsid, iter->stop_delay_ms());
973 // Now that we've seen a trigger we need to stop, flush, and disable
974 // this session after the configured |stop_delay_ms|.
975 task_runner_->PostDelayedTask(
976 [weak_this, tsid] {
977 // Skip entirely the flush if the trace session doesn't exist
978 // anymore. This is to prevent misleading error messages to be
979 // logged.
980 if (weak_this && weak_this->GetTracingSession(tsid))
981 weak_this->FlushAndDisableTracing(tsid);
982 },
983 // If this trigger is zero this will immediately executable and
984 // will happen shortly.
985 iter->stop_delay_ms());
986 break;
987 case TraceConfig::TriggerConfig::UNSPECIFIED:
988 PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
989 break;
990 }
991 }
992 }
993 }
994
995 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
996 // conditions all data sources should have acked the stop and this will early
997 // out.
OnDisableTracingTimeout(TracingSessionID tsid)998 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
999 PERFETTO_DCHECK_THREAD(thread_checker_);
1000 TracingSession* tracing_session = GetTracingSession(tsid);
1001 if (!tracing_session ||
1002 tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1003 return; // Tracing session was successfully disabled.
1004 }
1005
1006 PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1007 tsid);
1008 PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1009 DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1010 }
1011
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1012 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1013 TracingSession* tracing_session) {
1014 PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1015 for (auto& inst_kv : tracing_session->data_source_instances) {
1016 if (inst_kv.second.state == DataSourceInstance::STOPPED)
1017 continue;
1018 inst_kv.second.state = DataSourceInstance::STOPPED;
1019 ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1020 PERFETTO_DCHECK(producer);
1021 if (tracing_session->consumer_maybe_null) {
1022 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1023 *producer, inst_kv.second);
1024 }
1025 }
1026 tracing_session->state = TracingSession::DISABLED;
1027
1028 // Scrape any remaining chunks that weren't flushed by the producers.
1029 for (auto& producer_id_and_producer : producers_)
1030 ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1031
1032 if (tracing_session->write_into_file) {
1033 tracing_session->write_period_ms = 0;
1034 ReadBuffers(tracing_session->id, nullptr);
1035 }
1036
1037 if (tracing_session->consumer_maybe_null)
1038 tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
1039 }
1040
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1041 void TracingServiceImpl::Flush(TracingSessionID tsid,
1042 uint32_t timeout_ms,
1043 ConsumerEndpoint::FlushCallback callback) {
1044 PERFETTO_DCHECK_THREAD(thread_checker_);
1045 TracingSession* tracing_session = GetTracingSession(tsid);
1046 if (!tracing_session) {
1047 PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1048 return;
1049 }
1050
1051 if (!timeout_ms)
1052 timeout_ms = tracing_session->flush_timeout_ms();
1053
1054 if (tracing_session->pending_flushes.size() > 1000) {
1055 PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1056 tracing_session->pending_flushes.size());
1057 callback(false);
1058 return;
1059 }
1060
1061 FlushRequestID flush_request_id = ++last_flush_request_id_;
1062 PendingFlush& pending_flush =
1063 tracing_session->pending_flushes
1064 .emplace_hint(tracing_session->pending_flushes.end(),
1065 flush_request_id, PendingFlush(std::move(callback)))
1066 ->second;
1067
1068 // Send a flush request to each producer involved in the tracing session. In
1069 // order to issue a flush request we have to build a map of all data source
1070 // instance ids enabled for each producer.
1071 std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1072 for (const auto& data_source_inst : tracing_session->data_source_instances) {
1073 const ProducerID producer_id = data_source_inst.first;
1074 const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1075 flush_map[producer_id].push_back(ds_inst_id);
1076 }
1077
1078 for (const auto& kv : flush_map) {
1079 ProducerID producer_id = kv.first;
1080 ProducerEndpointImpl* producer = GetProducer(producer_id);
1081 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1082 producer->Flush(flush_request_id, data_sources);
1083 pending_flush.producers.insert(producer_id);
1084 }
1085
1086 // If there are no producers to flush (realistically this happens only in
1087 // some tests) fire OnFlushTimeout() straight away, without waiting.
1088 if (flush_map.empty())
1089 timeout_ms = 0;
1090
1091 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1092 task_runner_->PostDelayedTask(
1093 [weak_this, tsid, flush_request_id] {
1094 if (weak_this)
1095 weak_this->OnFlushTimeout(tsid, flush_request_id);
1096 },
1097 timeout_ms);
1098 }
1099
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1100 void TracingServiceImpl::NotifyFlushDoneForProducer(
1101 ProducerID producer_id,
1102 FlushRequestID flush_request_id) {
1103 for (auto& kv : tracing_sessions_) {
1104 // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1105 auto& pending_flushes = kv.second.pending_flushes;
1106 auto end_it = pending_flushes.upper_bound(flush_request_id);
1107 for (auto it = pending_flushes.begin(); it != end_it;) {
1108 PendingFlush& pending_flush = it->second;
1109 pending_flush.producers.erase(producer_id);
1110 if (pending_flush.producers.empty()) {
1111 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1112 TracingSessionID tsid = kv.first;
1113 auto callback = std::move(pending_flush.callback);
1114 task_runner_->PostTask([weak_this, tsid, callback]() {
1115 if (weak_this) {
1116 weak_this->CompleteFlush(tsid, std::move(callback),
1117 /*success=*/true);
1118 }
1119 });
1120 it = pending_flushes.erase(it);
1121 } else {
1122 it++;
1123 }
1124 } // for (pending_flushes)
1125 } // for (tracing_session)
1126 }
1127
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1128 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1129 FlushRequestID flush_request_id) {
1130 TracingSession* tracing_session = GetTracingSession(tsid);
1131 if (!tracing_session)
1132 return;
1133 auto it = tracing_session->pending_flushes.find(flush_request_id);
1134 if (it == tracing_session->pending_flushes.end())
1135 return; // Nominal case: flush was completed and acked on time.
1136
1137 // If there were no producers to flush, consider it a success.
1138 bool success = it->second.producers.empty();
1139
1140 auto callback = std::move(it->second.callback);
1141 tracing_session->pending_flushes.erase(it);
1142 CompleteFlush(tsid, std::move(callback), success);
1143 }
1144
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1145 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1146 ConsumerEndpoint::FlushCallback callback,
1147 bool success) {
1148 TracingSession* tracing_session = GetTracingSession(tsid);
1149 if (tracing_session) {
1150 // Producers may not have been able to flush all their data, even if they
1151 // indicated flush completion. If possible, also collect uncommitted chunks
1152 // to make sure we have everything they wrote so far.
1153 for (auto& producer_id_and_producer : producers_) {
1154 ScrapeSharedMemoryBuffers(tracing_session,
1155 producer_id_and_producer.second);
1156 }
1157 }
1158 callback(success);
1159 }
1160
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1161 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1162 TracingSession* tracing_session,
1163 ProducerEndpointImpl* producer) {
1164 if (!producer->smb_scraping_enabled_)
1165 return;
1166
1167 // Can't copy chunks if we don't know about any trace writers.
1168 if (producer->writers_.empty())
1169 return;
1170
1171 // Performance optimization: On flush or session disconnect, this method is
1172 // called for each producer. If the producer doesn't participate in the
1173 // session, there's no need to scape its chunks right now. We can tell if a
1174 // producer participates in the session by checking if the producer is allowed
1175 // to write into the session's log buffers.
1176 const auto& session_buffers = tracing_session->buffers_index;
1177 bool producer_in_session =
1178 std::any_of(session_buffers.begin(), session_buffers.end(),
1179 [producer](BufferID buffer_id) {
1180 return producer->allowed_target_buffers_.count(buffer_id);
1181 });
1182 if (!producer_in_session)
1183 return;
1184
1185 PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1186
1187 // Find and copy any uncommitted chunks from the SMB.
1188 //
1189 // In nominal conditions, the page layout of the used SMB pages should never
1190 // change because the service is the only one who is supposed to modify used
1191 // pages (to make them free again).
1192 //
1193 // However, the code here needs to deal with the case of a malicious producer
1194 // altering the SMB in unpredictable ways. Thankfully the SMB size is
1195 // immutable, so a chunk will always point to some valid memory, even if the
1196 // producer alters the intended layout and chunk header concurrently.
1197 // Ultimately a malicious producer altering the SMB's chunk layout while we
1198 // are iterating in this function is not any different from the case of a
1199 // malicious producer asking to commit a chunk made of random data, which is
1200 // something this class has to deal with regardless.
1201 //
1202 // The only legitimate mutations that can happen from sane producers,
1203 // concurrently to this function, are:
1204 // A. free pages being partitioned,
1205 // B. free chunks being migrated to kChunkBeingWritten,
1206 // C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1207
1208 SharedMemoryABI* abi = &producer->shmem_abi_;
1209 // num_pages() is immutable after the SMB is initialized and cannot be changed
1210 // even by a producer even if malicious.
1211 for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1212 uint32_t layout = abi->GetPageLayout(page_idx);
1213
1214 uint32_t used_chunks = abi->GetUsedChunks(layout); // Returns a bitmap.
1215 // Skip empty pages.
1216 if (used_chunks == 0)
1217 continue;
1218
1219 // Scrape the chunks that are currently used. These should be either in
1220 // state kChunkBeingWritten or kChunkComplete.
1221 for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1222 if (!(used_chunks & 1))
1223 continue;
1224
1225 SharedMemoryABI::ChunkState state =
1226 SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1227 PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1228 state == SharedMemoryABI::kChunkComplete);
1229 bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1230
1231 SharedMemoryABI::Chunk chunk =
1232 abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1233
1234 uint16_t packet_count;
1235 uint8_t flags;
1236 // GetPacketCountAndFlags has acquire_load semantics.
1237 std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1238
1239 // It only makes sense to copy an incomplete chunk if there's at least
1240 // one full packet available. (The producer may not have completed the
1241 // last packet in it yet, so we need at least 2.)
1242 if (!chunk_complete && packet_count < 2)
1243 continue;
1244
1245 // At this point, it is safe to access the remaining header fields of
1246 // the chunk. Even if the chunk was only just transferred from
1247 // kChunkFree into kChunkBeingWritten state, the header should be
1248 // written completely once the packet count increased above 1 (it was
1249 // reset to 0 by the service when the chunk was freed).
1250
1251 WriterID writer_id = chunk.writer_id();
1252 base::Optional<BufferID> target_buffer_id =
1253 producer->buffer_id_for_writer(writer_id);
1254
1255 // We can only scrape this chunk if we know which log buffer to copy it
1256 // into.
1257 if (!target_buffer_id)
1258 continue;
1259
1260 // Skip chunks that don't belong to the requested tracing session.
1261 bool target_buffer_belongs_to_session =
1262 std::find(session_buffers.begin(), session_buffers.end(),
1263 *target_buffer_id) != session_buffers.end();
1264 if (!target_buffer_belongs_to_session)
1265 continue;
1266
1267 uint32_t chunk_id =
1268 chunk.header()->chunk_id.load(std::memory_order_relaxed);
1269
1270 CopyProducerPageIntoLogBuffer(
1271 producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
1272 packet_count, flags, chunk_complete, chunk.payload_begin(),
1273 chunk.payload_size());
1274 }
1275 }
1276 }
1277
FlushAndDisableTracing(TracingSessionID tsid)1278 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1279 PERFETTO_DCHECK_THREAD(thread_checker_);
1280 PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1281 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1282 Flush(tsid, 0, [weak_this, tsid](bool success) {
1283 PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
1284 success, tsid);
1285 if (!weak_this)
1286 return;
1287 TracingSession* session = weak_this->GetTracingSession(tsid);
1288 if (session->consumer_maybe_null) {
1289 // If the consumer is still attached, just disable the session but give it
1290 // a chance to read the contents.
1291 weak_this->DisableTracing(tsid);
1292 } else {
1293 // If the consumer detached, destroy the session. If the consumer did
1294 // start the session in long-tracing mode, the service will have saved
1295 // the contents to the passed file. If not, the contents will be
1296 // destroyed.
1297 weak_this->FreeBuffers(tsid);
1298 }
1299 });
1300 }
1301
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1302 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1303 bool post_next_only) {
1304 PERFETTO_DCHECK_THREAD(thread_checker_);
1305 TracingSession* tracing_session = GetTracingSession(tsid);
1306 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1307 return;
1308
1309 uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1310 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1311 task_runner_->PostDelayedTask(
1312 [weak_this, tsid] {
1313 if (weak_this)
1314 weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1315 },
1316 flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
1317
1318 if (post_next_only)
1319 return;
1320
1321 PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1322 Flush(tsid, 0, [](bool success) {
1323 if (!success)
1324 PERFETTO_ELOG("Periodic flush timed out");
1325 });
1326 }
1327
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1328 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1329 TracingSessionID tsid,
1330 bool post_next_only) {
1331 PERFETTO_DCHECK_THREAD(thread_checker_);
1332 TracingSession* tracing_session = GetTracingSession(tsid);
1333 if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1334 return;
1335
1336 uint32_t clear_period_ms =
1337 tracing_session->config.incremental_state_config().clear_period_ms();
1338 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1339 task_runner_->PostDelayedTask(
1340 [weak_this, tsid] {
1341 if (weak_this)
1342 weak_this->PeriodicClearIncrementalStateTask(
1343 tsid, /*post_next_only=*/false);
1344 },
1345 clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
1346
1347 if (post_next_only)
1348 return;
1349
1350 PERFETTO_DLOG(
1351 "Performing periodic incremental state clear for trace session %" PRIu64,
1352 tsid);
1353
1354 // Queue the IPCs to producers with active data sources that opted in.
1355 std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1356 for (const auto& kv : tracing_session->data_source_instances) {
1357 ProducerID producer_id = kv.first;
1358 const DataSourceInstance& data_source = kv.second;
1359 if (data_source.handles_incremental_state_clear)
1360 clear_map[producer_id].push_back(data_source.instance_id);
1361 }
1362
1363 for (const auto& kv : clear_map) {
1364 ProducerID producer_id = kv.first;
1365 const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1366 ProducerEndpointImpl* producer = GetProducer(producer_id);
1367 if (!producer) {
1368 PERFETTO_DFATAL("Producer does not exist.");
1369 continue;
1370 }
1371 producer->ClearIncrementalState(data_sources);
1372 }
1373 }
1374
1375 // Note: when this is called to write into a file passed when starting tracing
1376 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
1377 // to send the trace data back over IPC).
ReadBuffers(TracingSessionID tsid,ConsumerEndpointImpl * consumer)1378 void TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
1379 ConsumerEndpointImpl* consumer) {
1380 PERFETTO_DCHECK_THREAD(thread_checker_);
1381 TracingSession* tracing_session = GetTracingSession(tsid);
1382 if (!tracing_session) {
1383 // This will be hit systematically from the PostDelayedTask when directly
1384 // writing into the file (in which case consumer == nullptr). Suppress the
1385 // log in this case as it's just spam.
1386 if (consumer)
1387 PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
1388 return; // TODO(primiano): signal failure?
1389 }
1390
1391 // When a tracing session is waiting for a trigger it is considered empty. If
1392 // a tracing session finishes and moves into DISABLED without ever receiving a
1393 // trigger the trace should never return any data. This includes the synthetic
1394 // packets like TraceConfig and Clock snapshots. So we bail out early and let
1395 // the consumer know there is no data.
1396 if (!tracing_session->config.trigger_config().triggers().empty() &&
1397 tracing_session->received_triggers.empty()) {
1398 if (consumer)
1399 consumer->consumer_->OnTraceData({}, /* has_more = */ false);
1400 PERFETTO_DLOG(
1401 "ReadBuffers(): tracing session has not received a trigger yet.");
1402 return;
1403 }
1404
1405 // This can happen if the file is closed by a previous task because it reaches
1406 // |max_file_size_bytes|.
1407 if (!tracing_session->write_into_file && !consumer)
1408 return;
1409
1410 if (tracing_session->write_into_file && consumer) {
1411 // If the consumer enabled tracing and asked to save the contents into the
1412 // passed file makes little sense to also try to read the buffers over IPC,
1413 // as that would just steal data from the periodic draining task.
1414 PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
1415 return;
1416 }
1417
1418 std::vector<TracePacket> packets;
1419 packets.reserve(1024); // Just an educated guess to avoid trivial expansions.
1420
1421 std::move(tracing_session->initial_clock_snapshot_.begin(),
1422 tracing_session->initial_clock_snapshot_.end(),
1423 std::back_inserter(packets));
1424 tracing_session->initial_clock_snapshot_.clear();
1425
1426 base::TimeMillis now = base::GetWallTimeMs();
1427 if (now >= tracing_session->last_snapshot_time + kSnapshotsInterval) {
1428 tracing_session->last_snapshot_time = now;
1429 SnapshotSyncMarker(&packets);
1430 SnapshotStats(tracing_session, &packets);
1431
1432 if (!tracing_session->config.builtin_data_sources()
1433 .disable_clock_snapshotting()) {
1434 // We don't want to put a root timestamp in this snapshot as the packet
1435 // may be very out of order with respect to the actual trace packets
1436 // since consuming the trace may happen at any point after it starts.
1437 SnapshotClocks(&packets, /*set_root_timestamp=*/false);
1438 }
1439 }
1440 if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
1441 MaybeEmitTraceConfig(tracing_session, &packets);
1442 MaybeEmitReceivedTriggers(tracing_session, &packets);
1443 }
1444 if (!tracing_session->config.builtin_data_sources().disable_system_info())
1445 MaybeEmitSystemInfo(tracing_session, &packets);
1446
1447 size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|).
1448 size_t total_slices = 0; // SUM(#slices in |packets|).
1449
1450 // Add up size for packets added by the Maybe* calls above.
1451 for (const TracePacket& packet : packets) {
1452 packets_bytes += packet.size();
1453 total_slices += packet.slices().size();
1454 }
1455
1456 // This is a rough threshold to determine how much to read from the buffer in
1457 // each task. This is to avoid executing a single huge sending task for too
1458 // long and risk to hit the watchdog. This is *not* an upper bound: we just
1459 // stop accumulating new packets and PostTask *after* we cross this threshold.
1460 // This constant essentially balances the PostTask and IPC overhead vs the
1461 // responsiveness of the service. An extremely small value will cause one IPC
1462 // and one PostTask for each slice but will keep the service extremely
1463 // responsive. An extremely large value will batch the send for the full
1464 // buffer in one large task, will hit the blocking send() once the socket
1465 // buffers are full and hang the service for a bit (until the consumer
1466 // catches up).
1467 static constexpr size_t kApproxBytesPerTask = 32768;
1468 bool did_hit_threshold = false;
1469
1470 // TODO(primiano): Extend the ReadBuffers API to allow reading only some
1471 // buffers, not all of them in one go.
1472 for (size_t buf_idx = 0;
1473 buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
1474 buf_idx++) {
1475 auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
1476 if (tbuf_iter == buffers_.end()) {
1477 PERFETTO_DFATAL("Buffer not found.");
1478 continue;
1479 }
1480 TraceBuffer& tbuf = *tbuf_iter->second;
1481 tbuf.BeginRead();
1482 while (!did_hit_threshold) {
1483 TracePacket packet;
1484 TraceBuffer::PacketSequenceProperties sequence_properties{};
1485 bool previous_packet_dropped;
1486 if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
1487 &previous_packet_dropped)) {
1488 break;
1489 }
1490 PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
1491 PERFETTO_DCHECK(sequence_properties.writer_id != 0);
1492 PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
1493 PERFETTO_DCHECK(packet.size() > 0);
1494 if (!PacketStreamValidator::Validate(packet.slices())) {
1495 PERFETTO_DLOG("Dropping invalid packet");
1496 continue;
1497 }
1498
1499 // Append a slice with the trusted field data. This can't be spoofed
1500 // because above we validated that the existing slices don't contain any
1501 // trusted fields. For added safety we append instead of prepending
1502 // because according to protobuf semantics, if the same field is
1503 // encountered multiple times the last instance takes priority. Note that
1504 // truncated packets are also rejected, so the producer can't give us a
1505 // partial packet (e.g., a truncated string) which only becomes valid when
1506 // the trusted data is appended here.
1507 protos::TrustedPacket trusted_packet;
1508 trusted_packet.set_trusted_uid(
1509 static_cast<int32_t>(sequence_properties.producer_uid_trusted));
1510 trusted_packet.set_trusted_packet_sequence_id(
1511 tracing_session->GetPacketSequenceID(
1512 sequence_properties.producer_id_trusted,
1513 sequence_properties.writer_id));
1514 if (previous_packet_dropped)
1515 trusted_packet.set_previous_packet_dropped(previous_packet_dropped);
1516 static constexpr size_t kTrustedBufSize = 16;
1517 Slice slice = Slice::Allocate(kTrustedBufSize);
1518 PERFETTO_CHECK(
1519 trusted_packet.SerializeToArray(slice.own_data(), kTrustedBufSize));
1520 slice.size = static_cast<size_t>(trusted_packet.GetCachedSize());
1521 PERFETTO_DCHECK(slice.size > 0 && slice.size <= kTrustedBufSize);
1522 packet.AddSlice(std::move(slice));
1523
1524 // Append the packet (inclusive of the trusted uid) to |packets|.
1525 packets_bytes += packet.size();
1526 total_slices += packet.slices().size();
1527 did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
1528 !tracing_session->write_into_file;
1529 packets.emplace_back(std::move(packet));
1530 } // for(packets...)
1531 } // for(buffers...)
1532
1533 // If the caller asked us to write into a file by setting
1534 // |write_into_file| == true in the trace config, drain the packets read
1535 // (if any) into the given file descriptor.
1536 if (tracing_session->write_into_file) {
1537 const uint64_t max_size = tracing_session->max_file_size_bytes
1538 ? tracing_session->max_file_size_bytes
1539 : std::numeric_limits<size_t>::max();
1540
1541 // When writing into a file, the file should look like a root trace.proto
1542 // message. Each packet should be prepended with a proto preamble stating
1543 // its field id (within trace.proto) and size. Hence the addition below.
1544 const size_t max_iovecs = total_slices + packets.size();
1545
1546 size_t num_iovecs = 0;
1547 bool stop_writing_into_file = tracing_session->write_period_ms == 0;
1548 std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
1549 size_t num_iovecs_at_last_packet = 0;
1550 uint64_t bytes_about_to_be_written = 0;
1551 for (TracePacket& packet : packets) {
1552 std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
1553 packet.GetProtoPreamble();
1554 bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
1555 num_iovecs++;
1556 for (const Slice& slice : packet.slices()) {
1557 // writev() doesn't change the passed pointer. However, struct iovec
1558 // take a non-const ptr because it's the same struct used by readv().
1559 // Hence the const_cast here.
1560 char* start = static_cast<char*>(const_cast<void*>(slice.start));
1561 bytes_about_to_be_written += slice.size;
1562 iovecs[num_iovecs++] = {start, slice.size};
1563 }
1564
1565 if (tracing_session->bytes_written_into_file +
1566 bytes_about_to_be_written >=
1567 max_size) {
1568 stop_writing_into_file = true;
1569 num_iovecs = num_iovecs_at_last_packet;
1570 break;
1571 }
1572
1573 num_iovecs_at_last_packet = num_iovecs;
1574 }
1575 PERFETTO_DCHECK(num_iovecs <= max_iovecs);
1576 int fd = *tracing_session->write_into_file;
1577
1578 uint64_t total_wr_size = 0;
1579
1580 // writev() can take at most IOV_MAX entries per call. Batch them.
1581 constexpr size_t kIOVMax = IOV_MAX;
1582 for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
1583 int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
1584 ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
1585 if (wr_size <= 0) {
1586 PERFETTO_PLOG("writev() failed");
1587 stop_writing_into_file = true;
1588 break;
1589 }
1590 total_wr_size += static_cast<size_t>(wr_size);
1591 }
1592
1593 tracing_session->bytes_written_into_file += total_wr_size;
1594
1595 PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
1596 (total_wr_size + 1023) / 1024, stop_writing_into_file);
1597 if (stop_writing_into_file) {
1598 // Ensure all data was written to the file before we close it.
1599 base::FlushFile(fd);
1600 tracing_session->write_into_file.reset();
1601 tracing_session->write_period_ms = 0;
1602 if (tracing_session->state == TracingSession::STARTED)
1603 DisableTracing(tsid);
1604 return;
1605 }
1606
1607 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1608 task_runner_->PostDelayedTask(
1609 [weak_this, tsid] {
1610 if (weak_this)
1611 weak_this->ReadBuffers(tsid, nullptr);
1612 },
1613 tracing_session->delay_to_next_write_period_ms());
1614 return;
1615 } // if (tracing_session->write_into_file)
1616
1617 const bool has_more = did_hit_threshold;
1618 if (has_more) {
1619 auto weak_consumer = consumer->GetWeakPtr();
1620 auto weak_this = weak_ptr_factory_.GetWeakPtr();
1621 task_runner_->PostTask([weak_this, weak_consumer, tsid] {
1622 if (!weak_this || !weak_consumer)
1623 return;
1624 weak_this->ReadBuffers(tsid, weak_consumer.get());
1625 });
1626 }
1627
1628 // Keep this as tail call, just in case the consumer re-enters.
1629 consumer->consumer_->OnTraceData(std::move(packets), has_more);
1630 }
1631
FreeBuffers(TracingSessionID tsid)1632 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
1633 PERFETTO_DCHECK_THREAD(thread_checker_);
1634 PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
1635 TracingSession* tracing_session = GetTracingSession(tsid);
1636 if (!tracing_session) {
1637 PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
1638 return; // TODO(primiano): signal failure?
1639 }
1640 DisableTracing(tsid, /*disable_immediately=*/true);
1641
1642 PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1643 tracing_session->data_source_instances.clear();
1644
1645 for (auto& producer_entry : producers_) {
1646 ProducerEndpointImpl* producer = producer_entry.second;
1647 producer->OnFreeBuffers(tracing_session->buffers_index);
1648 }
1649
1650 for (BufferID buffer_id : tracing_session->buffers_index) {
1651 buffer_ids_.Free(buffer_id);
1652 PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
1653 buffers_.erase(buffer_id);
1654 }
1655 bool notify_traceur = tracing_session->config.notify_traceur();
1656 tracing_sessions_.erase(tsid);
1657 UpdateMemoryGuardrail();
1658
1659 PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
1660 tracing_sessions_.size());
1661
1662 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
1663 static const char kTraceurProp[] = "sys.trace.trace_end_signal";
1664 if (notify_traceur && __system_property_set(kTraceurProp, "1"))
1665 PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
1666 #else
1667 base::ignore_result(notify_traceur);
1668 #endif
1669 }
1670
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)1671 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
1672 const DataSourceDescriptor& desc) {
1673 PERFETTO_DCHECK_THREAD(thread_checker_);
1674 PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
1675 producer_id, desc.name().c_str());
1676
1677 PERFETTO_DCHECK(!desc.name().empty());
1678 auto reg_ds = data_sources_.emplace(desc.name(),
1679 RegisteredDataSource{producer_id, desc});
1680
1681 // If there are existing tracing sessions, we need to check if the new
1682 // data source is enabled by any of them.
1683 if (tracing_sessions_.empty())
1684 return;
1685
1686 ProducerEndpointImpl* producer = GetProducer(producer_id);
1687 if (!producer) {
1688 PERFETTO_DFATAL("Producer not found.");
1689 return;
1690 }
1691
1692 for (auto& iter : tracing_sessions_) {
1693 TracingSession& tracing_session = iter.second;
1694 if (tracing_session.state != TracingSession::STARTED &&
1695 tracing_session.state != TracingSession::CONFIGURED) {
1696 continue;
1697 }
1698
1699 TraceConfig::ProducerConfig producer_config;
1700 for (auto& config : tracing_session.config.producers()) {
1701 if (producer->name_ == config.producer_name()) {
1702 producer_config = config;
1703 break;
1704 }
1705 }
1706 for (const TraceConfig::DataSource& cfg_data_source :
1707 tracing_session.config.data_sources()) {
1708 if (cfg_data_source.config().name() != desc.name())
1709 continue;
1710 DataSourceInstance* ds_inst = SetupDataSource(
1711 cfg_data_source, producer_config, reg_ds->second, &tracing_session);
1712 if (ds_inst && tracing_session.state == TracingSession::STARTED)
1713 StartDataSourceInstance(producer, &tracing_session, ds_inst);
1714 }
1715 }
1716 }
1717
UnregisterDataSource(ProducerID producer_id,const std::string & name)1718 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
1719 const std::string& name) {
1720 PERFETTO_DCHECK_THREAD(thread_checker_);
1721 PERFETTO_CHECK(producer_id);
1722 ProducerEndpointImpl* producer = GetProducer(producer_id);
1723 PERFETTO_DCHECK(producer);
1724 for (auto& kv : tracing_sessions_) {
1725 auto& ds_instances = kv.second.data_source_instances;
1726 for (auto it = ds_instances.begin(); it != ds_instances.end();) {
1727 if (it->first == producer_id && it->second.data_source_name == name) {
1728 DataSourceInstanceID ds_inst_id = it->second.instance_id;
1729 if (it->second.state != DataSourceInstance::STOPPED) {
1730 if (it->second.state != DataSourceInstance::STOPPING)
1731 producer->StopDataSource(ds_inst_id);
1732 // Mark the instance as stopped immediately, since we are
1733 // unregistering it below.
1734 if (it->second.state == DataSourceInstance::STOPPING)
1735 NotifyDataSourceStopped(producer_id, ds_inst_id);
1736 }
1737 it = ds_instances.erase(it);
1738 } else {
1739 ++it;
1740 }
1741 } // for (data_source_instances)
1742 } // for (tracing_session)
1743
1744 for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
1745 if (it->second.producer_id == producer_id &&
1746 it->second.descriptor.name() == name) {
1747 data_sources_.erase(it);
1748 return;
1749 }
1750 }
1751
1752 PERFETTO_DFATAL(
1753 "Tried to unregister a non-existent data source \"%s\" for "
1754 "producer %" PRIu16,
1755 name.c_str(), producer_id);
1756 }
1757
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)1758 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
1759 const TraceConfig::DataSource& cfg_data_source,
1760 const TraceConfig::ProducerConfig& producer_config,
1761 const RegisteredDataSource& data_source,
1762 TracingSession* tracing_session) {
1763 PERFETTO_DCHECK_THREAD(thread_checker_);
1764 ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
1765 PERFETTO_DCHECK(producer);
1766 // An existing producer that is not ftrace could have registered itself as
1767 // ftrace, we must not enable it in that case.
1768 if (lockdown_mode_ && producer->uid_ != uid_) {
1769 PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
1770 return nullptr;
1771 }
1772 // TODO(primiano): Add tests for registration ordering
1773 // (data sources vs consumers).
1774 // TODO: This logic is duplicated in ChangeTraceConfig, consider refactoring
1775 // it. Meanwhile update both.
1776 if (!cfg_data_source.producer_name_filter().empty()) {
1777 if (std::find(cfg_data_source.producer_name_filter().begin(),
1778 cfg_data_source.producer_name_filter().end(),
1779 producer->name_) ==
1780 cfg_data_source.producer_name_filter().end()) {
1781 PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
1782 cfg_data_source.config().name().c_str(),
1783 producer->name_.c_str());
1784 return nullptr;
1785 }
1786 }
1787
1788 auto relative_buffer_id = cfg_data_source.config().target_buffer();
1789 if (relative_buffer_id >= tracing_session->num_buffers()) {
1790 PERFETTO_LOG(
1791 "The TraceConfig for DataSource %s specified a target_buffer out of "
1792 "bound (%d). Skipping it.",
1793 cfg_data_source.config().name().c_str(), relative_buffer_id);
1794 return nullptr;
1795 }
1796
1797 // Create a copy of the DataSourceConfig specified in the trace config. This
1798 // will be passed to the producer after translating the |target_buffer| id.
1799 // The |target_buffer| parameter passed by the consumer in the trace config is
1800 // relative to the buffers declared in the same trace config. This has to be
1801 // translated to the global BufferID before passing it to the producers, which
1802 // don't know anything about tracing sessions and consumers.
1803
1804 DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
1805 auto insert_iter = tracing_session->data_source_instances.emplace(
1806 std::piecewise_construct, //
1807 std::forward_as_tuple(producer->id_),
1808 std::forward_as_tuple(
1809 inst_id,
1810 cfg_data_source.config(), // Deliberate copy.
1811 data_source.descriptor.name(),
1812 data_source.descriptor.will_notify_on_start(),
1813 data_source.descriptor.will_notify_on_stop(),
1814 data_source.descriptor.handles_incremental_state_clear()));
1815 DataSourceInstance* ds_instance = &insert_iter->second;
1816
1817 // New data source instance starts out in CONFIGURED state.
1818 if (tracing_session->consumer_maybe_null) {
1819 tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1820 *producer, *ds_instance);
1821 }
1822
1823 DataSourceConfig& ds_config = ds_instance->config;
1824 ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
1825 ds_config.set_enable_extra_guardrails(
1826 tracing_session->config.enable_extra_guardrails());
1827 ds_config.set_tracing_session_id(tracing_session->id);
1828 BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
1829 PERFETTO_DCHECK(global_id);
1830 ds_config.set_target_buffer(global_id);
1831
1832 PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
1833 ds_config.name().c_str(), global_id);
1834 if (!producer->shared_memory()) {
1835 // Determine the SMB page size. Must be an integer multiple of 4k.
1836 size_t page_size = std::min<size_t>(producer_config.page_size_kb() * 1024,
1837 SharedMemoryABI::kMaxPageSize);
1838 if (page_size < base::kPageSize || page_size % base::kPageSize != 0)
1839 page_size = kDefaultShmPageSize;
1840 producer->shared_buffer_page_size_kb_ = page_size / 1024;
1841
1842 // Determine the SMB size. Must be an integer multiple of the SMB page size.
1843 // The decisional tree is as follows:
1844 // 1. Give priority to what defined in the trace config.
1845 // 2. If unset give priority to the hint passed by the producer.
1846 // 3. Keep within bounds and ensure it's a multiple of the page size.
1847 size_t shm_size = producer_config.shm_size_kb() * 1024;
1848 if (shm_size == 0)
1849 shm_size = producer->shmem_size_hint_bytes_;
1850 shm_size = std::min<size_t>(shm_size, kMaxShmSize);
1851 if (shm_size < page_size || shm_size % page_size)
1852 shm_size = kDefaultShmSize;
1853
1854 // TODO(primiano): right now Create() will suicide in case of OOM if the
1855 // mmap fails. We should instead gracefully fail the request and tell the
1856 // client to go away.
1857 auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
1858 producer->SetSharedMemory(std::move(shared_memory));
1859 producer->OnTracingSetup();
1860 UpdateMemoryGuardrail();
1861 }
1862 producer->SetupDataSource(inst_id, ds_config);
1863 return ds_instance;
1864 }
1865
1866 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
1867 // might be lying / returning garbage contents. |src| and |size| can be trusted
1868 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)1869 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
1870 ProducerID producer_id_trusted,
1871 uid_t producer_uid_trusted,
1872 WriterID writer_id,
1873 ChunkID chunk_id,
1874 BufferID buffer_id,
1875 uint16_t num_fragments,
1876 uint8_t chunk_flags,
1877 bool chunk_complete,
1878 const uint8_t* src,
1879 size_t size) {
1880 PERFETTO_DCHECK_THREAD(thread_checker_);
1881
1882 ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
1883 if (!producer) {
1884 PERFETTO_DFATAL("Producer not found.");
1885 chunks_discarded_++;
1886 return;
1887 }
1888
1889 TraceBuffer* buf = GetBufferByID(buffer_id);
1890 if (!buf) {
1891 PERFETTO_DLOG("Could not find target buffer %" PRIu16
1892 " for producer %" PRIu16,
1893 buffer_id, producer_id_trusted);
1894 chunks_discarded_++;
1895 return;
1896 }
1897
1898 // Verify that the producer is actually allowed to write into the target
1899 // buffer specified in the request. This prevents a malicious producer from
1900 // injecting data into a log buffer that belongs to a tracing session the
1901 // producer is not part of.
1902 if (!producer->is_allowed_target_buffer(buffer_id)) {
1903 PERFETTO_ELOG("Producer %" PRIu16
1904 " tried to write into forbidden target buffer %" PRIu16,
1905 producer_id_trusted, buffer_id);
1906 PERFETTO_DFATAL("Forbidden target buffer");
1907 chunks_discarded_++;
1908 return;
1909 }
1910
1911 // If the writer was registered by the producer, it should only write into the
1912 // buffer it was registered with.
1913 base::Optional<BufferID> associated_buffer =
1914 producer->buffer_id_for_writer(writer_id);
1915 if (associated_buffer && *associated_buffer != buffer_id) {
1916 PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
1917 " was registered to write into target buffer %" PRIu16
1918 ", but tried to write into buffer %" PRIu16,
1919 writer_id, producer_id_trusted, *associated_buffer,
1920 buffer_id);
1921 PERFETTO_DFATAL("Wrong target buffer");
1922 chunks_discarded_++;
1923 return;
1924 }
1925
1926 buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
1927 chunk_id, num_fragments, chunk_flags, chunk_complete,
1928 src, size);
1929 }
1930
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)1931 void TracingServiceImpl::ApplyChunkPatches(
1932 ProducerID producer_id_trusted,
1933 const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
1934 PERFETTO_DCHECK_THREAD(thread_checker_);
1935
1936 for (const auto& chunk : chunks_to_patch) {
1937 const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
1938 const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
1939 TraceBuffer* buf =
1940 GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
1941 static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
1942 "Add a '|| chunk_id > kMaxChunkID' below if this fails");
1943 if (!writer_id || writer_id > kMaxWriterID || !buf) {
1944 PERFETTO_ELOG(
1945 "Received invalid chunks_to_patch request from Producer: %" PRIu16
1946 ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
1947 producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
1948 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
1949 continue;
1950 }
1951
1952 // Note, there's no need to validate that the producer is allowed to write
1953 // to the specified buffer ID (or that it's the correct buffer ID for a
1954 // registered TraceWriter). That's because TraceBuffer uses the producer ID
1955 // and writer ID to look up the chunk to patch. If the producer specifies an
1956 // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
1957 // patches. Because the producer ID is trusted, there's also no way for a
1958 // malicious producer to patch another producer's data.
1959
1960 // Speculate on the fact that there are going to be a limited amount of
1961 // patches per request, so we can allocate the |patches| array on the stack.
1962 std::array<TraceBuffer::Patch, 1024> patches; // Uninitialized.
1963 if (chunk.patches().size() > patches.size()) {
1964 PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
1965 patches.size());
1966 PERFETTO_DFATAL("Too many patches");
1967 patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
1968 continue;
1969 }
1970
1971 size_t i = 0;
1972 for (const auto& patch : chunk.patches()) {
1973 const std::string& patch_data = patch.data();
1974 if (patch_data.size() != patches[i].data.size()) {
1975 PERFETTO_ELOG("Received patch from producer: %" PRIu16
1976 " of unexpected size %zu",
1977 producer_id_trusted, patch_data.size());
1978 patches_discarded_++;
1979 continue;
1980 }
1981 patches[i].offset_untrusted = patch.offset();
1982 memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
1983 i++;
1984 }
1985 buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
1986 &patches[0], i, chunk.has_more_patches());
1987 }
1988 }
1989
GetDetachedSession(uid_t uid,const std::string & key)1990 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
1991 uid_t uid,
1992 const std::string& key) {
1993 PERFETTO_DCHECK_THREAD(thread_checker_);
1994 for (auto& kv : tracing_sessions_) {
1995 TracingSession* session = &kv.second;
1996 if (session->consumer_uid == uid && session->detach_key == key) {
1997 PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
1998 return session;
1999 }
2000 }
2001 return nullptr;
2002 }
2003
GetTracingSession(TracingSessionID tsid)2004 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2005 TracingSessionID tsid) {
2006 PERFETTO_DCHECK_THREAD(thread_checker_);
2007 auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2008 if (it == tracing_sessions_.end())
2009 return nullptr;
2010 return &it->second;
2011 }
2012
GetNextProducerID()2013 ProducerID TracingServiceImpl::GetNextProducerID() {
2014 PERFETTO_DCHECK_THREAD(thread_checker_);
2015 PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2016 do {
2017 ++last_producer_id_;
2018 } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2019 PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2020 return last_producer_id_;
2021 }
2022
GetBufferByID(BufferID buffer_id)2023 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2024 auto buf_iter = buffers_.find(buffer_id);
2025 if (buf_iter == buffers_.end())
2026 return nullptr;
2027 return &*buf_iter->second;
2028 }
2029
OnStartTriggersTimeout(TracingSessionID tsid)2030 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2031 // Skip entirely the flush if the trace session doesn't exist anymore.
2032 // This is to prevent misleading error messages to be logged.
2033 //
2034 // if the trace has started from the trigger we rely on
2035 // the |stop_delay_ms| from the trigger so don't flush and
2036 // disable if we've moved beyond a CONFIGURED state
2037 auto* tracing_session_ptr = GetTracingSession(tsid);
2038 if (tracing_session_ptr &&
2039 tracing_session_ptr->state == TracingSession::CONFIGURED) {
2040 PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2041 " since no triggers activated.",
2042 tsid);
2043 // No data should be returned from ReadBuffers() regardless of if we
2044 // call FreeBuffers() or DisableTracing(). This is because in
2045 // STOP_TRACING we need this promise in either case, and using
2046 // DisableTracing() allows a graceful shutdown. Consumers can follow
2047 // their normal path and check the buffers through ReadBuffers() and
2048 // the code won't hang because the tracing session will still be
2049 // alive just disabled.
2050 DisableTracing(tsid);
2051 }
2052 }
2053
UpdateMemoryGuardrail()2054 void TracingServiceImpl::UpdateMemoryGuardrail() {
2055 #if !PERFETTO_BUILDFLAG(PERFETTO_EMBEDDER_BUILD) && \
2056 !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2057 uint64_t total_buffer_bytes = 0;
2058
2059 // Sum up all the shared memory buffers.
2060 for (const auto& id_to_producer : producers_) {
2061 if (id_to_producer.second->shared_memory())
2062 total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2063 }
2064
2065 // Sum up all the trace buffers.
2066 for (const auto& id_to_buffer : buffers_) {
2067 total_buffer_bytes += id_to_buffer.second->size();
2068 }
2069
2070 // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2071 // interval.
2072 uint64_t guardrail = 32 * 1024 * 1024 + total_buffer_bytes;
2073 base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2074 #endif
2075 }
2076
SnapshotSyncMarker(std::vector<TracePacket> * packets)2077 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
2078 // The sync markes is used to tokenize large traces efficiently.
2079 // See description in trace_packet.proto.
2080 if (sync_marker_packet_size_ == 0) {
2081 // Serialize the marker and the uid separately to guarantee that the marker
2082 // is serialzied at the end and is adjacent to the start of the next packet.
2083 int size_left = static_cast<int>(sizeof(sync_marker_packet_));
2084 uint8_t* dst = &sync_marker_packet_[0];
2085 protos::TrustedPacket packet;
2086 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2087 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2088 PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
2089 size_left -= packet.ByteSize();
2090 sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
2091 dst += sync_marker_packet_size_;
2092
2093 packet.Clear();
2094 packet.set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
2095 PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
2096 sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
2097 PERFETTO_CHECK(sync_marker_packet_size_ <= sizeof(sync_marker_packet_));
2098 };
2099 packets->emplace_back();
2100 packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
2101 }
2102
SnapshotClocks(std::vector<TracePacket> * packets,bool set_root_timestamp)2103 void TracingServiceImpl::SnapshotClocks(std::vector<TracePacket>* packets,
2104 bool set_root_timestamp) {
2105 protos::TrustedPacket packet;
2106 protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot();
2107
2108 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
2109 !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2110 struct {
2111 clockid_t id;
2112 protos::ClockSnapshot::Clock::Type type;
2113 struct timespec ts;
2114 } clocks[] = {
2115 {CLOCK_BOOTTIME, protos::ClockSnapshot::Clock::BOOTTIME, {0, 0}},
2116 {CLOCK_REALTIME_COARSE,
2117 protos::ClockSnapshot::Clock::REALTIME_COARSE,
2118 {0, 0}},
2119 {CLOCK_MONOTONIC_COARSE,
2120 protos::ClockSnapshot::Clock::MONOTONIC_COARSE,
2121 {0, 0}},
2122 {CLOCK_REALTIME, protos::ClockSnapshot::Clock::REALTIME, {0, 0}},
2123 {CLOCK_MONOTONIC, protos::ClockSnapshot::Clock::MONOTONIC, {0, 0}},
2124 {CLOCK_MONOTONIC_RAW,
2125 protos::ClockSnapshot::Clock::MONOTONIC_RAW,
2126 {0, 0}},
2127 {CLOCK_PROCESS_CPUTIME_ID,
2128 protos::ClockSnapshot::Clock::PROCESS_CPUTIME,
2129 {0, 0}},
2130 {CLOCK_THREAD_CPUTIME_ID,
2131 protos::ClockSnapshot::Clock::THREAD_CPUTIME,
2132 {0, 0}},
2133 };
2134 // First snapshot all the clocks as atomically as we can.
2135 for (auto& clock : clocks) {
2136 if (clock_gettime(clock.id, &clock.ts) == -1)
2137 PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
2138 }
2139 for (auto& clock : clocks) {
2140 if (set_root_timestamp &&
2141 clock.type == protos::ClockSnapshot::Clock::BOOTTIME) {
2142 packet.set_timestamp(
2143 static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
2144 };
2145 protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
2146 c->set_type(clock.type);
2147 c->set_timestamp(
2148 static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
2149 }
2150 #else // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2151 auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
2152 if (set_root_timestamp)
2153 packet.set_timestamp(wall_time_ns);
2154 protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
2155 c->set_type(protos::ClockSnapshot::Clock::MONOTONIC);
2156 c->set_timestamp(wall_time_ns);
2157 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2158
2159 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2160 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2161 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2162 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2163 packets->emplace_back();
2164 packets->back().AddSlice(std::move(slice));
2165 }
2166
SnapshotStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)2167 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
2168 std::vector<TracePacket>* packets) {
2169 protos::TrustedPacket packet;
2170 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2171 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2172
2173 protos::TraceStats* trace_stats = packet.mutable_trace_stats();
2174 GetTraceStats(tracing_session).ToProto(trace_stats);
2175 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2176 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2177 packets->emplace_back();
2178 packets->back().AddSlice(std::move(slice));
2179 }
2180
GetTraceStats(TracingSession * tracing_session)2181 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
2182 TraceStats trace_stats;
2183 trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
2184 trace_stats.set_producers_seen(last_producer_id_);
2185 trace_stats.set_data_sources_registered(
2186 static_cast<uint32_t>(data_sources_.size()));
2187 trace_stats.set_data_sources_seen(last_data_source_instance_id_);
2188 trace_stats.set_tracing_sessions(
2189 static_cast<uint32_t>(tracing_sessions_.size()));
2190 trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
2191 trace_stats.set_chunks_discarded(chunks_discarded_);
2192 trace_stats.set_patches_discarded(patches_discarded_);
2193
2194 for (BufferID buf_id : tracing_session->buffers_index) {
2195 TraceBuffer* buf = GetBufferByID(buf_id);
2196 if (!buf) {
2197 PERFETTO_DFATAL("Buffer not found.");
2198 continue;
2199 }
2200 *trace_stats.add_buffer_stats() = buf->stats();
2201 } // for (buf in session).
2202 return trace_stats;
2203 }
2204
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)2205 void TracingServiceImpl::MaybeEmitTraceConfig(
2206 TracingSession* tracing_session,
2207 std::vector<TracePacket>* packets) {
2208 if (tracing_session->did_emit_config)
2209 return;
2210 tracing_session->did_emit_config = true;
2211 protos::TrustedPacket packet;
2212 tracing_session->config.ToProto(packet.mutable_trace_config());
2213 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2214 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2215 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2216 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2217 packets->emplace_back();
2218 packets->back().AddSlice(std::move(slice));
2219 }
2220
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)2221 void TracingServiceImpl::MaybeEmitSystemInfo(
2222 TracingSession* tracing_session,
2223 std::vector<TracePacket>* packets) {
2224 if (tracing_session->did_emit_system_info)
2225 return;
2226 tracing_session->did_emit_system_info = true;
2227 protos::TrustedPacket packet;
2228 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2229 protos::SystemInfo* info = packet.mutable_system_info();
2230 struct utsname uname_info;
2231 if (uname(&uname_info) == 0) {
2232 protos::Utsname* utsname_info = info->mutable_utsname();
2233 utsname_info->set_sysname(uname_info.sysname);
2234 utsname_info->set_version(uname_info.version);
2235 utsname_info->set_machine(uname_info.machine);
2236 utsname_info->set_release(uname_info.release);
2237 }
2238 #endif
2239 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2240 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2241 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2242 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2243 packets->emplace_back();
2244 packets->back().AddSlice(std::move(slice));
2245 }
2246
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)2247 void TracingServiceImpl::MaybeEmitReceivedTriggers(
2248 TracingSession* tracing_session,
2249 std::vector<TracePacket>* packets) {
2250 PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
2251 tracing_session->received_triggers.size());
2252 for (size_t i = tracing_session->num_triggers_emitted_into_trace;
2253 i < tracing_session->received_triggers.size(); ++i) {
2254 const auto& info = tracing_session->received_triggers[i];
2255 protos::TrustedPacket packet;
2256
2257 protos::Trigger* trigger = packet.mutable_trigger();
2258 trigger->set_trigger_name(info.trigger_name);
2259 trigger->set_producer_name(info.producer_name);
2260 trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
2261
2262 packet.set_timestamp(info.boot_time_ns);
2263 packet.set_trusted_uid(static_cast<int32_t>(uid_));
2264 packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2265 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2266 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2267 packets->emplace_back();
2268 packets->back().AddSlice(std::move(slice));
2269 ++tracing_session->num_triggers_emitted_into_trace;
2270 }
2271 }
2272
2273 ////////////////////////////////////////////////////////////////////////////////
2274 // TracingServiceImpl::ConsumerEndpointImpl implementation
2275 ////////////////////////////////////////////////////////////////////////////////
2276
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)2277 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
2278 TracingServiceImpl* service,
2279 base::TaskRunner* task_runner,
2280 Consumer* consumer,
2281 uid_t uid)
2282 : task_runner_(task_runner),
2283 service_(service),
2284 consumer_(consumer),
2285 uid_(uid),
2286 weak_ptr_factory_(this) {}
2287
~ConsumerEndpointImpl()2288 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
2289 service_->DisconnectConsumer(this);
2290 consumer_->OnDisconnect();
2291 }
2292
NotifyOnTracingDisabled()2293 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
2294 PERFETTO_DCHECK_THREAD(thread_checker_);
2295 auto weak_this = GetWeakPtr();
2296 task_runner_->PostTask([weak_this] {
2297 if (weak_this)
2298 weak_this->consumer_->OnTracingDisabled();
2299 });
2300 }
2301
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)2302 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
2303 const TraceConfig& cfg,
2304 base::ScopedFile fd) {
2305 PERFETTO_DCHECK_THREAD(thread_checker_);
2306 if (!service_->EnableTracing(this, cfg, std::move(fd)))
2307 NotifyOnTracingDisabled();
2308 }
2309
ChangeTraceConfig(const TraceConfig & cfg)2310 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
2311 const TraceConfig& cfg) {
2312 if (!tracing_session_id_) {
2313 PERFETTO_LOG(
2314 "Consumer called ChangeTraceConfig() but tracing was "
2315 "not active");
2316 return;
2317 }
2318 service_->ChangeTraceConfig(this, cfg);
2319 }
2320
StartTracing()2321 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
2322 PERFETTO_DCHECK_THREAD(thread_checker_);
2323 if (!tracing_session_id_) {
2324 PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
2325 return;
2326 }
2327 service_->StartTracing(tracing_session_id_);
2328 }
2329
DisableTracing()2330 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
2331 PERFETTO_DCHECK_THREAD(thread_checker_);
2332 if (!tracing_session_id_) {
2333 PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
2334 return;
2335 }
2336 service_->DisableTracing(tracing_session_id_);
2337 }
2338
ReadBuffers()2339 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
2340 PERFETTO_DCHECK_THREAD(thread_checker_);
2341 if (!tracing_session_id_) {
2342 PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
2343 return;
2344 }
2345 service_->ReadBuffers(tracing_session_id_, this);
2346 }
2347
FreeBuffers()2348 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
2349 PERFETTO_DCHECK_THREAD(thread_checker_);
2350 if (!tracing_session_id_) {
2351 PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
2352 return;
2353 }
2354 service_->FreeBuffers(tracing_session_id_);
2355 tracing_session_id_ = 0;
2356 }
2357
Flush(uint32_t timeout_ms,FlushCallback callback)2358 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
2359 FlushCallback callback) {
2360 PERFETTO_DCHECK_THREAD(thread_checker_);
2361 if (!tracing_session_id_) {
2362 PERFETTO_LOG("Consumer called Flush() but tracing was not active");
2363 return;
2364 }
2365 service_->Flush(tracing_session_id_, timeout_ms, callback);
2366 }
2367
Detach(const std::string & key)2368 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
2369 PERFETTO_DCHECK_THREAD(thread_checker_);
2370 bool success = service_->DetachConsumer(this, key);
2371 auto weak_this = GetWeakPtr();
2372 task_runner_->PostTask([weak_this, success] {
2373 if (weak_this)
2374 weak_this->consumer_->OnDetach(success);
2375 });
2376 }
2377
Attach(const std::string & key)2378 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
2379 PERFETTO_DCHECK_THREAD(thread_checker_);
2380 bool success = service_->AttachConsumer(this, key);
2381 auto weak_this = GetWeakPtr();
2382 task_runner_->PostTask([weak_this, success] {
2383 if (!weak_this)
2384 return;
2385 Consumer* consumer = weak_this->consumer_;
2386 TracingSession* session =
2387 weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
2388 if (!session) {
2389 consumer->OnAttach(false, TraceConfig());
2390 return;
2391 }
2392 consumer->OnAttach(success, session->config);
2393 });
2394 }
2395
GetTraceStats()2396 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
2397 PERFETTO_DCHECK_THREAD(thread_checker_);
2398 bool success = false;
2399 TraceStats stats;
2400 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2401 if (session) {
2402 success = true;
2403 stats = service_->GetTraceStats(session);
2404 }
2405 auto weak_this = GetWeakPtr();
2406 task_runner_->PostTask([weak_this, success, stats] {
2407 if (weak_this)
2408 weak_this->consumer_->OnTraceStats(success, stats);
2409 });
2410 }
2411
ObserveEvents(uint32_t enabled_event_types)2412 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
2413 uint32_t enabled_event_types) {
2414 PERFETTO_DCHECK_THREAD(thread_checker_);
2415 enabled_observable_event_types_ = enabled_event_types;
2416
2417 if (enabled_observable_event_types_ == ObservableEventType::kNone)
2418 return;
2419
2420 PERFETTO_DCHECK(enabled_observable_event_types_ ==
2421 ObservableEventType::kDataSourceInstances);
2422
2423 TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2424 if (!session)
2425 return;
2426
2427 // Issue initial states
2428 for (const auto& kv : session->data_source_instances) {
2429 ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
2430 PERFETTO_DCHECK(producer);
2431 OnDataSourceInstanceStateChange(*producer, kv.second);
2432 }
2433 }
2434
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)2435 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
2436 const ProducerEndpointImpl& producer,
2437 const DataSourceInstance& instance) {
2438 if (!(enabled_observable_event_types_ &
2439 ObservableEventType::kDataSourceInstances)) {
2440 return;
2441 }
2442
2443 if (instance.state != DataSourceInstance::CONFIGURED &&
2444 instance.state != DataSourceInstance::STARTED &&
2445 instance.state != DataSourceInstance::STOPPED) {
2446 return;
2447 }
2448
2449 auto* observable_events = AddObservableEvents();
2450 auto* change = observable_events->add_instance_state_changes();
2451 change->set_producer_name(producer.name_);
2452 change->set_data_source_name(instance.data_source_name);
2453 if (instance.state == DataSourceInstance::STARTED) {
2454 change->set_state(ObservableEvents::DataSourceInstanceStateChange::
2455 DATA_SOURCE_INSTANCE_STATE_STARTED);
2456 } else {
2457 change->set_state(ObservableEvents::DataSourceInstanceStateChange::
2458 DATA_SOURCE_INSTANCE_STATE_STOPPED);
2459 }
2460 }
2461
2462 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
GetWeakPtr()2463 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
2464 PERFETTO_DCHECK_THREAD(thread_checker_);
2465 return weak_ptr_factory_.GetWeakPtr();
2466 }
2467
2468 ObservableEvents*
AddObservableEvents()2469 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
2470 PERFETTO_DCHECK_THREAD(thread_checker_);
2471 if (!observable_events_) {
2472 observable_events_.reset(new ObservableEvents());
2473 auto weak_this = GetWeakPtr();
2474 task_runner_->PostTask([weak_this] {
2475 if (!weak_this)
2476 return;
2477
2478 // Move into a temporary to allow reentrancy in OnObservableEvents.
2479 auto observable_events = std::move(weak_this->observable_events_);
2480 weak_this->consumer_->OnObservableEvents(*observable_events);
2481 });
2482 }
2483 return observable_events_.get();
2484 }
2485
2486 ////////////////////////////////////////////////////////////////////////////////
2487 // TracingServiceImpl::ProducerEndpointImpl implementation
2488 ////////////////////////////////////////////////////////////////////////////////
2489
ProducerEndpointImpl(ProducerID id,uid_t uid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,bool in_process,bool smb_scraping_enabled)2490 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
2491 ProducerID id,
2492 uid_t uid,
2493 TracingServiceImpl* service,
2494 base::TaskRunner* task_runner,
2495 Producer* producer,
2496 const std::string& producer_name,
2497 bool in_process,
2498 bool smb_scraping_enabled)
2499 : id_(id),
2500 uid_(uid),
2501 service_(service),
2502 task_runner_(task_runner),
2503 producer_(producer),
2504 name_(producer_name),
2505 in_process_(in_process),
2506 smb_scraping_enabled_(smb_scraping_enabled),
2507 weak_ptr_factory_(this) {}
2508
~ProducerEndpointImpl()2509 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
2510 service_->DisconnectProducer(id_);
2511 producer_->OnDisconnect();
2512 }
2513
RegisterDataSource(const DataSourceDescriptor & desc)2514 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
2515 const DataSourceDescriptor& desc) {
2516 PERFETTO_DCHECK_THREAD(thread_checker_);
2517 if (desc.name().empty()) {
2518 PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2519 return;
2520 }
2521
2522 service_->RegisterDataSource(id_, desc);
2523 }
2524
UnregisterDataSource(const std::string & name)2525 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
2526 const std::string& name) {
2527 PERFETTO_DCHECK_THREAD(thread_checker_);
2528 service_->UnregisterDataSource(id_, name);
2529 }
2530
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)2531 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
2532 uint32_t writer_id,
2533 uint32_t target_buffer) {
2534 PERFETTO_DCHECK_THREAD(thread_checker_);
2535 PERFETTO_DCHECK(!buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2536 writers_[static_cast<WriterID>(writer_id)] =
2537 static_cast<BufferID>(target_buffer);
2538 }
2539
UnregisterTraceWriter(uint32_t writer_id)2540 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
2541 uint32_t writer_id) {
2542 PERFETTO_DCHECK_THREAD(thread_checker_);
2543 PERFETTO_DCHECK(buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2544 writers_.erase(static_cast<WriterID>(writer_id));
2545 }
2546
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)2547 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
2548 const CommitDataRequest& req_untrusted,
2549 CommitDataCallback callback) {
2550 PERFETTO_DCHECK_THREAD(thread_checker_);
2551
2552 if (!shared_memory_) {
2553 PERFETTO_DLOG(
2554 "Attempted to commit data before the shared memory was allocated.");
2555 return;
2556 }
2557 PERFETTO_DCHECK(shmem_abi_.is_valid());
2558 for (const auto& entry : req_untrusted.chunks_to_move()) {
2559 const uint32_t page_idx = entry.page();
2560 if (page_idx >= shmem_abi_.num_pages())
2561 continue; // A buggy or malicious producer.
2562
2563 SharedMemoryABI::Chunk chunk =
2564 shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
2565 if (!chunk.is_valid()) {
2566 PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
2567 entry.page(), entry.chunk());
2568 continue;
2569 }
2570
2571 // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
2572 // the ABI contract expects the producer to not touch the chunk anymore
2573 // (until the service marks that as free). This is why all the reads below
2574 // are just memory_order_relaxed. Also, the code here assumes that all this
2575 // data can be malicious and just gives up if anything is malformed.
2576 BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
2577 const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
2578 WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
2579 ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
2580 auto packets = chunk_header.packets.load(std::memory_order_relaxed);
2581 uint16_t num_fragments = packets.count;
2582 uint8_t chunk_flags = packets.flags;
2583
2584 service_->CopyProducerPageIntoLogBuffer(
2585 id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
2586 /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
2587
2588 // This one has release-store semantics.
2589 shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
2590 } // for(chunks_to_move)
2591
2592 service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
2593
2594 if (req_untrusted.flush_request_id()) {
2595 service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
2596 }
2597
2598 // Keep this invocation last. ProducerIPCService::CommitData() relies on this
2599 // callback being invoked within the same callstack and not posted. If this
2600 // changes, the code there needs to be changed accordingly.
2601 if (callback)
2602 callback();
2603 }
2604
SetSharedMemory(std::unique_ptr<SharedMemory> shared_memory)2605 void TracingServiceImpl::ProducerEndpointImpl::SetSharedMemory(
2606 std::unique_ptr<SharedMemory> shared_memory) {
2607 PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
2608 shared_memory_ = std::move(shared_memory);
2609 shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
2610 shared_memory_->size(),
2611 shared_buffer_page_size_kb() * 1024);
2612 if (in_process_) {
2613 inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
2614 shared_memory_->start(), shared_memory_->size(),
2615 shared_buffer_page_size_kb_ * 1024, this, task_runner_));
2616 }
2617 }
2618
shared_memory() const2619 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
2620 PERFETTO_DCHECK_THREAD(thread_checker_);
2621 return shared_memory_.get();
2622 }
2623
shared_buffer_page_size_kb() const2624 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
2625 const {
2626 return shared_buffer_page_size_kb_;
2627 }
2628
ActivateTriggers(const std::vector<std::string> & triggers)2629 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
2630 const std::vector<std::string>& triggers) {
2631 service_->ActivateTriggers(id_, triggers);
2632 }
2633
StopDataSource(DataSourceInstanceID ds_inst_id)2634 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
2635 DataSourceInstanceID ds_inst_id) {
2636 // TODO(primiano): When we'll support tearing down the SMB, at this point we
2637 // should send the Producer a TearDownTracing if all its data sources have
2638 // been disabled (see b/77532839 and aosp/655179 PS1).
2639 PERFETTO_DCHECK_THREAD(thread_checker_);
2640 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2641 task_runner_->PostTask([weak_this, ds_inst_id] {
2642 if (weak_this)
2643 weak_this->producer_->StopDataSource(ds_inst_id);
2644 });
2645 }
2646
2647 SharedMemoryArbiter*
GetInProcessShmemArbiter()2648 TracingServiceImpl::ProducerEndpointImpl::GetInProcessShmemArbiter() {
2649 if (!inproc_shmem_arbiter_) {
2650 PERFETTO_FATAL(
2651 "The in-process SharedMemoryArbiter can only be used when "
2652 "CreateProducer has been called with in_process=true and after tracing "
2653 "has started.");
2654 }
2655
2656 PERFETTO_DCHECK(in_process_);
2657 return inproc_shmem_arbiter_.get();
2658 }
2659
2660 // Can be called on any thread.
2661 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id)2662 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
2663 return GetInProcessShmemArbiter()->CreateTraceWriter(buf_id);
2664 }
2665
NotifyFlushComplete(FlushRequestID id)2666 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
2667 FlushRequestID id) {
2668 PERFETTO_DCHECK_THREAD(thread_checker_);
2669 return GetInProcessShmemArbiter()->NotifyFlushComplete(id);
2670 }
2671
OnTracingSetup()2672 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
2673 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2674 task_runner_->PostTask([weak_this] {
2675 if (weak_this)
2676 weak_this->producer_->OnTracingSetup();
2677 });
2678 }
2679
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)2680 void TracingServiceImpl::ProducerEndpointImpl::Flush(
2681 FlushRequestID flush_request_id,
2682 const std::vector<DataSourceInstanceID>& data_sources) {
2683 PERFETTO_DCHECK_THREAD(thread_checker_);
2684 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2685 task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
2686 if (weak_this) {
2687 weak_this->producer_->Flush(flush_request_id, data_sources.data(),
2688 data_sources.size());
2689 }
2690 });
2691 }
2692
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2693 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
2694 DataSourceInstanceID ds_id,
2695 const DataSourceConfig& config) {
2696 PERFETTO_DCHECK_THREAD(thread_checker_);
2697 allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
2698 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2699 task_runner_->PostTask([weak_this, ds_id, config] {
2700 if (weak_this)
2701 weak_this->producer_->SetupDataSource(ds_id, std::move(config));
2702 });
2703 }
2704
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2705 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
2706 DataSourceInstanceID ds_id,
2707 const DataSourceConfig& config) {
2708 PERFETTO_DCHECK_THREAD(thread_checker_);
2709 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2710 task_runner_->PostTask([weak_this, ds_id, config] {
2711 if (weak_this)
2712 weak_this->producer_->StartDataSource(ds_id, std::move(config));
2713 });
2714 }
2715
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)2716 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
2717 DataSourceInstanceID data_source_id) {
2718 PERFETTO_DCHECK_THREAD(thread_checker_);
2719 service_->NotifyDataSourceStarted(id_, data_source_id);
2720 }
2721
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)2722 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
2723 DataSourceInstanceID data_source_id) {
2724 PERFETTO_DCHECK_THREAD(thread_checker_);
2725 service_->NotifyDataSourceStopped(id_, data_source_id);
2726 }
2727
OnFreeBuffers(const std::vector<BufferID> & target_buffers)2728 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
2729 const std::vector<BufferID>& target_buffers) {
2730 if (allowed_target_buffers_.empty())
2731 return;
2732 for (BufferID buffer : target_buffers)
2733 allowed_target_buffers_.erase(buffer);
2734 }
2735
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)2736 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
2737 const std::vector<DataSourceInstanceID>& data_sources) {
2738 PERFETTO_DCHECK_THREAD(thread_checker_);
2739 auto weak_this = weak_ptr_factory_.GetWeakPtr();
2740 task_runner_->PostTask([weak_this, data_sources] {
2741 if (weak_this) {
2742 weak_this->producer_->ClearIncrementalState(data_sources.data(),
2743 data_sources.size());
2744 }
2745 });
2746 }
2747
2748 ////////////////////////////////////////////////////////////////////////////////
2749 // TracingServiceImpl::TracingSession implementation
2750 ////////////////////////////////////////////////////////////////////////////////
2751
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config)2752 TracingServiceImpl::TracingSession::TracingSession(
2753 TracingSessionID session_id,
2754 ConsumerEndpointImpl* consumer,
2755 const TraceConfig& new_config)
2756 : id(session_id),
2757 consumer_maybe_null(consumer),
2758 consumer_uid(consumer->uid_),
2759 config(new_config) {}
2760
2761 } // namespace perfetto
2762