• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/tracing_service_impl.h"
18 
19 #include "perfetto/base/build_config.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <string.h>
25 #include <regex>
26 #include <unordered_set>
27 
28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
29 #include <sys/uio.h>
30 #include <sys/utsname.h>
31 #include <unistd.h>
32 #endif
33 
34 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
35 #include <sys/system_properties.h>
36 #endif
37 
38 #include <algorithm>
39 
40 #include "perfetto/base/build_config.h"
41 #include "perfetto/base/file_utils.h"
42 #include "perfetto/base/task_runner.h"
43 #include "perfetto/base/utils.h"
44 #include "perfetto/tracing/core/consumer.h"
45 #include "perfetto/tracing/core/data_source_config.h"
46 #include "perfetto/tracing/core/producer.h"
47 #include "perfetto/tracing/core/shared_memory.h"
48 #include "perfetto/tracing/core/shared_memory_abi.h"
49 #include "perfetto/tracing/core/trace_packet.h"
50 #include "perfetto/tracing/core/trace_writer.h"
51 #include "src/tracing/core/packet_stream_validator.h"
52 #include "src/tracing/core/shared_memory_arbiter_impl.h"
53 #include "src/tracing/core/trace_buffer.h"
54 
55 #include "perfetto/trace/clock_snapshot.pb.h"
56 #include "perfetto/trace/system_info.pb.h"
57 #include "perfetto/trace/trusted_packet.pb.h"
58 
59 // General note: this class must assume that Producers are malicious and will
60 // try to crash / exploit this class. We can trust pointers because they come
61 // from the IPC layer, but we should never assume that that the producer calls
62 // come in the right order or their arguments are sane / within bounds.
63 
64 namespace perfetto {
65 
66 namespace {
67 constexpr size_t kDefaultShmPageSize = base::kPageSize;
68 constexpr int kMaxBuffersPerConsumer = 128;
69 constexpr base::TimeMillis kSnapshotsInterval(10 * 1000);
70 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
71 constexpr int kMaxConcurrentTracingSessions = 5;
72 
73 constexpr uint32_t kMillisPerHour = 3600000;
74 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
75 
76 // These apply only if enable_extra_guardrails is true.
77 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 32 * 1024;
78 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
79 
80 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
81 struct iovec {
82   void* iov_base;  // Address
83   size_t iov_len;  // Block size
84 };
85 
86 // Simple implementation of writev. Note that this does not give the atomicity
87 // guarantees of a real writev, but we don't depend on these (we aren't writing
88 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)89 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
90   ssize_t total_size = 0;
91   for (int i = 0; i < iovcnt; ++i) {
92     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
93     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
94       return -1;
95     total_size += current_size;
96   }
97   return total_size;
98 }
99 
100 #define IOV_MAX 1024  // Linux compatible limit.
101 
102 // uid checking is a NOP on Windows.
getuid()103 uid_t getuid() {
104   return 0;
105 }
geteuid()106 uid_t geteuid() {
107   return 0;
108 }
109 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
110 
111 }  // namespace
112 
113 // These constants instead are defined in the header because are used by tests.
114 constexpr size_t TracingServiceImpl::kDefaultShmSize;
115 constexpr size_t TracingServiceImpl::kMaxShmSize;
116 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
117 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
118 
119 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)120 std::unique_ptr<TracingService> TracingService::CreateInstance(
121     std::unique_ptr<SharedMemory::Factory> shm_factory,
122     base::TaskRunner* task_runner) {
123   return std::unique_ptr<TracingService>(
124       new TracingServiceImpl(std::move(shm_factory), task_runner));
125 }
126 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)127 TracingServiceImpl::TracingServiceImpl(
128     std::unique_ptr<SharedMemory::Factory> shm_factory,
129     base::TaskRunner* task_runner)
130     : task_runner_(task_runner),
131       shm_factory_(std::move(shm_factory)),
132       uid_(getuid()),
133       buffer_ids_(kMaxTraceBufferID),
134       weak_ptr_factory_(this) {
135   PERFETTO_DCHECK(task_runner_);
136 }
137 
~TracingServiceImpl()138 TracingServiceImpl::~TracingServiceImpl() {
139   // TODO(fmayer): handle teardown of all Producer.
140 }
141 
142 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode)143 TracingServiceImpl::ConnectProducer(Producer* producer,
144                                     uid_t uid,
145                                     const std::string& producer_name,
146                                     size_t shared_memory_size_hint_bytes,
147                                     bool in_process,
148                                     ProducerSMBScrapingMode smb_scraping_mode) {
149   PERFETTO_DCHECK_THREAD(thread_checker_);
150 
151   if (lockdown_mode_ && uid != geteuid()) {
152     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
153                   static_cast<unsigned long>(uid));
154     return nullptr;
155   }
156 
157   if (producers_.size() >= kMaxProducerID) {
158     PERFETTO_DFATAL("Too many producers.");
159     return nullptr;
160   }
161   const ProducerID id = GetNextProducerID();
162   PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
163 
164   bool smb_scraping_enabled = smb_scraping_enabled_;
165   switch (smb_scraping_mode) {
166     case ProducerSMBScrapingMode::kDefault:
167       break;
168     case ProducerSMBScrapingMode::kEnabled:
169       smb_scraping_enabled = true;
170       break;
171     case ProducerSMBScrapingMode::kDisabled:
172       smb_scraping_enabled = false;
173       break;
174   }
175 
176   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
177       id, uid, this, task_runner_, producer, producer_name, in_process,
178       smb_scraping_enabled));
179   auto it_and_inserted = producers_.emplace(id, endpoint.get());
180   PERFETTO_DCHECK(it_and_inserted.second);
181   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
182   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
183 
184   return std::move(endpoint);
185 }
186 
DisconnectProducer(ProducerID id)187 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
188   PERFETTO_DCHECK_THREAD(thread_checker_);
189   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
190   PERFETTO_DCHECK(producers_.count(id));
191 
192   // Scrape remaining chunks for this producer to ensure we don't lose data.
193   if (auto* producer = GetProducer(id)) {
194     for (auto& session_id_and_session : tracing_sessions_)
195       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
196   }
197 
198   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
199     auto next = it;
200     next++;
201     if (it->second.producer_id == id)
202       UnregisterDataSource(id, it->second.descriptor.name());
203     it = next;
204   }
205 
206   producers_.erase(id);
207   UpdateMemoryGuardrail();
208 }
209 
GetProducer(ProducerID id) const210 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
211     ProducerID id) const {
212   PERFETTO_DCHECK_THREAD(thread_checker_);
213   auto it = producers_.find(id);
214   if (it == producers_.end())
215     return nullptr;
216   return it->second;
217 }
218 
219 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)220 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
221   PERFETTO_DCHECK_THREAD(thread_checker_);
222   PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
223   std::unique_ptr<ConsumerEndpointImpl> endpoint(
224       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
225   auto it_and_inserted = consumers_.emplace(endpoint.get());
226   PERFETTO_DCHECK(it_and_inserted.second);
227   task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_));
228   return std::move(endpoint);
229 }
230 
DisconnectConsumer(ConsumerEndpointImpl * consumer)231 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
232   PERFETTO_DCHECK_THREAD(thread_checker_);
233   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
234   PERFETTO_DCHECK(consumers_.count(consumer));
235 
236   // TODO(primiano) : Check that this is safe (what happens if there are
237   // ReadBuffers() calls posted in the meantime? They need to become noop).
238   if (consumer->tracing_session_id_)
239     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
240   consumers_.erase(consumer);
241 
242   // At this point no more pointers to |consumer| should be around.
243   PERFETTO_DCHECK(!std::any_of(
244       tracing_sessions_.begin(), tracing_sessions_.end(),
245       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
246         return kv.second.consumer_maybe_null == consumer;
247       }));
248 }
249 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)250 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
251                                         const std::string& key) {
252   PERFETTO_DCHECK_THREAD(thread_checker_);
253   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
254   PERFETTO_DCHECK(consumers_.count(consumer));
255 
256   TracingSessionID tsid = consumer->tracing_session_id_;
257   TracingSession* tracing_session;
258   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
259     return false;
260 
261   if (GetDetachedSession(consumer->uid_, key)) {
262     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
263                   key.c_str());
264     return false;
265   }
266 
267   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
268   tracing_session->consumer_maybe_null = nullptr;
269   tracing_session->detach_key = key;
270   consumer->tracing_session_id_ = 0;
271   return true;
272 }
273 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)274 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
275                                         const std::string& key) {
276   PERFETTO_DCHECK_THREAD(thread_checker_);
277   PERFETTO_DLOG("Consumer %p attaching to session %s",
278                 reinterpret_cast<void*>(consumer), key.c_str());
279   PERFETTO_DCHECK(consumers_.count(consumer));
280 
281   if (consumer->tracing_session_id_) {
282     PERFETTO_ELOG(
283         "Cannot reattach consumer to session %s"
284         " while it already attached tracing session ID %" PRIu64,
285         key.c_str(), consumer->tracing_session_id_);
286     return false;
287   }
288 
289   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
290   if (!tracing_session) {
291     PERFETTO_ELOG(
292         "Failed to attach consumer, session '%s' not found for uid %d",
293         key.c_str(), static_cast<int>(consumer->uid_));
294     return false;
295   }
296 
297   consumer->tracing_session_id_ = tracing_session->id;
298   tracing_session->consumer_maybe_null = consumer;
299   tracing_session->detach_key.clear();
300   return true;
301 }
302 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)303 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
304                                        const TraceConfig& cfg,
305                                        base::ScopedFile fd) {
306   PERFETTO_DCHECK_THREAD(thread_checker_);
307   PERFETTO_DLOG("Enabling tracing for consumer %p",
308                 reinterpret_cast<void*>(consumer));
309   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_SET)
310     lockdown_mode_ = true;
311   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR)
312     lockdown_mode_ = false;
313   TracingSession* tracing_session =
314       GetTracingSession(consumer->tracing_session_id_);
315   if (tracing_session) {
316     PERFETTO_DLOG(
317         "A Consumer is trying to EnableTracing() but another tracing session "
318         "is already active (forgot a call to FreeBuffers() ?)");
319     return false;
320   }
321 
322   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
323                                        ? kGuardrailsMaxTracingDurationMillis
324                                        : kMaxTracingDurationMillis;
325   if (cfg.duration_ms() > max_duration_ms) {
326     PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu32 " ms)",
327                   cfg.duration_ms(), max_duration_ms);
328     return false;
329   }
330 
331   const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
332                                   TraceConfig::TriggerConfig::UNSPECIFIED;
333   if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
334                              cfg.trigger_config().trigger_timeout_ms() >
335                                  kGuardrailsMaxTracingDurationMillis)) {
336     PERFETTO_ELOG(
337         "Traces with START_TRACING triggers must provide a positive "
338         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
339         cfg.trigger_config().trigger_timeout_ms());
340     return false;
341   }
342 
343   if (has_trigger_config && cfg.duration_ms() != 0) {
344     PERFETTO_ELOG(
345         "duration_ms was set, this must not be set for traces with triggers.");
346     return false;
347   }
348 
349   std::unordered_set<std::string> triggers;
350   for (const auto& trigger : cfg.trigger_config().triggers()) {
351     if (!triggers.insert(trigger.name()).second) {
352       PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
353       return false;
354     }
355   }
356 
357   if (cfg.enable_extra_guardrails()) {
358     if (cfg.deferred_start()) {
359       PERFETTO_ELOG(
360           "deferred_start=true is not supported in unsupervised traces");
361       return false;
362     }
363     uint64_t buf_size_sum = 0;
364     for (const auto& buf : cfg.buffers())
365       buf_size_sum += buf.size_kb();
366     if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
367       PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
368                     "kB  > %" PRIu32 " kB)",
369                     buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
370       return false;
371     }
372   }
373 
374   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
375     PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size());
376     return false;
377   }
378 
379   if (!cfg.unique_session_name().empty()) {
380     const std::string& name = cfg.unique_session_name();
381     for (auto& kv : tracing_sessions_) {
382       if (kv.second.config.unique_session_name() == name) {
383         PERFETTO_ELOG(
384             "A trace wtih this unique session name (%s) already exists",
385             name.c_str());
386         return false;
387       }
388     }
389   }
390 
391   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
392   // in a state where it stalls by design by having more TraceWriterImpl
393   // instances than free pages in the buffer. This is really a bug in
394   // trace_probes and the way it handles stalls in the shmem buffer.
395   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
396     PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
397                   tracing_sessions_.size());
398     return false;
399   }
400 
401   const TracingSessionID tsid = ++last_tracing_session_id_;
402   tracing_session =
403       &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
404            .first->second;
405 
406   if (cfg.write_into_file()) {
407     if (!fd) {
408       PERFETTO_ELOG(
409           "The TraceConfig had write_into_file==true but no fd was passed");
410       tracing_sessions_.erase(tsid);
411       return false;
412     }
413     tracing_session->write_into_file = std::move(fd);
414     uint32_t write_period_ms = cfg.file_write_period_ms();
415     if (write_period_ms == 0)
416       write_period_ms = kDefaultWriteIntoFilePeriodMs;
417     if (write_period_ms < min_write_period_ms_)
418       write_period_ms = min_write_period_ms_;
419     tracing_session->write_period_ms = write_period_ms;
420     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
421     tracing_session->bytes_written_into_file = 0;
422   }
423 
424   // Initialize the log buffers.
425   bool did_allocate_all_buffers = true;
426 
427   // Allocate the trace buffers. Also create a map to translate a consumer
428   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
429   // corresponding BufferID, which is a global ID namespace for the service and
430   // all producers.
431   size_t total_buf_size_kb = 0;
432   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
433   tracing_session->buffers_index.reserve(num_buffers);
434   for (size_t i = 0; i < num_buffers; i++) {
435     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
436     BufferID global_id = buffer_ids_.Allocate();
437     if (!global_id) {
438       did_allocate_all_buffers = false;  // We ran out of IDs.
439       break;
440     }
441     tracing_session->buffers_index.push_back(global_id);
442     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
443     total_buf_size_kb += buffer_cfg.size_kb();
444     TraceBuffer::OverwritePolicy policy =
445         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
446             ? TraceBuffer::kDiscard
447             : TraceBuffer::kOverwrite;
448     auto it_and_inserted = buffers_.emplace(
449         global_id, TraceBuffer::Create(buf_size_bytes, policy));
450     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
451     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
452     if (!trace_buffer) {
453       did_allocate_all_buffers = false;
454       break;
455     }
456   }
457 
458   UpdateMemoryGuardrail();
459 
460   // This can happen if either:
461   // - All the kMaxTraceBufferID slots are taken.
462   // - OOM, or, more relistically, we exhausted virtual memory.
463   // In any case, free all the previously allocated buffers and abort.
464   // TODO(fmayer): add a test to cover this case, this is quite subtle.
465   if (!did_allocate_all_buffers) {
466     for (BufferID global_id : tracing_session->buffers_index) {
467       buffer_ids_.Free(global_id);
468       buffers_.erase(global_id);
469     }
470     tracing_sessions_.erase(tsid);
471     return false;
472   }
473 
474   consumer->tracing_session_id_ = tsid;
475 
476   // Setup the data sources on the producers without starting them.
477   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
478     // Scan all the registered data sources with a matching name.
479     auto range = data_sources_.equal_range(cfg_data_source.config().name());
480     for (auto it = range.first; it != range.second; it++) {
481       TraceConfig::ProducerConfig producer_config;
482       for (auto& config : cfg.producers()) {
483         if (GetProducer(it->second.producer_id)->name_ ==
484             config.producer_name()) {
485           producer_config = config;
486           break;
487         }
488       }
489       SetupDataSource(cfg_data_source, producer_config, it->second,
490                       tracing_session);
491     }
492   }
493 
494   bool has_start_trigger = false;
495   auto weak_this = weak_ptr_factory_.GetWeakPtr();
496   switch (cfg.trigger_config().trigger_mode()) {
497     case TraceConfig::TriggerConfig::UNSPECIFIED:
498       // no triggers are specified so this isn't a trace that is using triggers.
499       PERFETTO_DCHECK(!has_trigger_config);
500       break;
501     case TraceConfig::TriggerConfig::START_TRACING:
502       // For traces which use START_TRACE triggers we need to ensure that the
503       // tracing session will be cleaned up when it times out.
504       has_start_trigger = true;
505       task_runner_->PostDelayedTask(
506           [weak_this, tsid]() {
507             if (weak_this)
508               weak_this->OnStartTriggersTimeout(tsid);
509           },
510           cfg.trigger_config().trigger_timeout_ms());
511       break;
512     case TraceConfig::TriggerConfig::STOP_TRACING:
513       // Update the tracing_session's duration_ms to ensure that if no trigger
514       // is received the session will end and be cleaned up equal to the
515       // timeout.
516       //
517       // TODO(nuskos): Refactor this so that rather then modifying the config we
518       // have a field we look at on the tracing_session.
519       tracing_session->config.set_duration_ms(
520           cfg.trigger_config().trigger_timeout_ms());
521       break;
522   }
523 
524   tracing_session->state = TracingSession::CONFIGURED;
525   PERFETTO_LOG(
526       "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
527       "buffer size:%zu KB, total sessions:%zu",
528       cfg.data_sources().size(), tracing_session->config.duration_ms(),
529       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size());
530 
531   // Start the data sources, unless this is a case of early setup + fast
532   // triggering, either through TraceConfig.deferred_start or
533   // TraceConfig.trigger_config(). If both are specified which ever one occurs
534   // first will initiate the trace.
535   if (!cfg.deferred_start() && !has_start_trigger)
536     return StartTracing(tsid);
537 
538   return true;
539 }
540 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)541 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
542                                            const TraceConfig& updated_cfg) {
543   PERFETTO_DCHECK_THREAD(thread_checker_);
544   TracingSession* tracing_session =
545       GetTracingSession(consumer->tracing_session_id_);
546   PERFETTO_DCHECK(tracing_session);
547 
548   if ((tracing_session->state != TracingSession::STARTED) &&
549       (tracing_session->state != TracingSession::CONFIGURED)) {
550     PERFETTO_ELOG(
551         "ChangeTraceConfig() was called for a tracing session which isn't "
552         "running.");
553     return;
554   }
555 
556   // We only support updating producer_name_filter (and pass-through configs)
557   // for now; null out any changeable fields and make sure the rest are
558   // identical.
559   TraceConfig new_config_copy(updated_cfg);
560   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
561     ds_cfg.clear_producer_name_filter();
562   }
563 
564   TraceConfig current_config_copy(tracing_session->config);
565   for (auto& ds_cfg : *current_config_copy.mutable_data_sources())
566     ds_cfg.clear_producer_name_filter();
567 
568   if (new_config_copy != current_config_copy) {
569     PERFETTO_LOG(
570         "ChangeTraceConfig() was called with a config containing unsupported "
571         "changes; only adding to the producer_name_filter is currently "
572         "supported and will have an effect.");
573   }
574 
575   for (TraceConfig::DataSource& cfg_data_source :
576        *tracing_session->config.mutable_data_sources()) {
577     // Find the updated producer_filter in the new config.
578     std::vector<std::string> new_producer_name_filter;
579     bool found_data_source = false;
580     for (auto it : updated_cfg.data_sources()) {
581       if (cfg_data_source.config().name() == it.config().name()) {
582         new_producer_name_filter = it.producer_name_filter();
583         found_data_source = true;
584         break;
585       }
586     }
587 
588     // Bail out if data source not present in the new config.
589     if (!found_data_source) {
590       PERFETTO_ELOG(
591           "ChangeTraceConfig() called without a current data source also "
592           "present in the new "
593           "config: %s",
594           cfg_data_source.config().name().c_str());
595       continue;
596     }
597 
598     // TODO(oysteine): Just replacing the filter means that if
599     // there are any filter entries which were present in the original config,
600     // but removed from the config passed to ChangeTraceConfig, any matching
601     // producers will keep producing but newly added producers after this
602     // point will never start.
603     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
604 
605     // Scan all the registered data sources with a matching name.
606     auto range = data_sources_.equal_range(cfg_data_source.config().name());
607     for (auto it = range.first; it != range.second; it++) {
608       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
609       PERFETTO_DCHECK(producer);
610 
611       // Check if the producer name of this data source is present
612       // in the name filter. We currently only support new filters, not removing
613       // old ones.
614       if (!new_producer_name_filter.empty() &&
615           std::find(new_producer_name_filter.begin(),
616                     new_producer_name_filter.end(),
617                     producer->name_) == new_producer_name_filter.end()) {
618         continue;
619       }
620 
621       bool already_setup = false;
622       auto& ds_instances = tracing_session->data_source_instances;
623       for (auto instance_it = ds_instances.begin();
624            instance_it != ds_instances.end(); ++instance_it) {
625         if (instance_it->first == it->second.producer_id &&
626             instance_it->second.data_source_name ==
627                 cfg_data_source.config().name()) {
628           already_setup = true;
629           break;
630         }
631       }
632 
633       if (already_setup)
634         continue;
635 
636       // If it wasn't previously setup, set it up now.
637       // (The per-producer config is optional).
638       TraceConfig::ProducerConfig producer_config;
639       for (auto& config : tracing_session->config.producers()) {
640         if (producer->name_ == config.producer_name()) {
641           producer_config = config;
642           break;
643         }
644       }
645 
646       DataSourceInstance* ds_inst = SetupDataSource(
647           cfg_data_source, producer_config, it->second, tracing_session);
648 
649       if (ds_inst && tracing_session->state == TracingSession::STARTED)
650         StartDataSourceInstance(producer, tracing_session, ds_inst);
651     }
652   }
653 }
654 
StartTracing(TracingSessionID tsid)655 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
656   PERFETTO_DCHECK_THREAD(thread_checker_);
657   TracingSession* tracing_session = GetTracingSession(tsid);
658   if (!tracing_session) {
659     PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
660     return false;
661   }
662 
663   if (tracing_session->state != TracingSession::CONFIGURED) {
664     PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
665                   tracing_session->state);
666     return false;
667   }
668 
669   tracing_session->state = TracingSession::STARTED;
670 
671   if (!tracing_session->config.builtin_data_sources()
672            .disable_clock_snapshotting()) {
673     SnapshotClocks(&tracing_session->initial_clock_snapshot_,
674                    /*set_root_timestamp=*/true);
675   }
676 
677   // Trigger delayed task if the trace is time limited.
678   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
679   if (trace_duration_ms > 0) {
680     auto weak_this = weak_ptr_factory_.GetWeakPtr();
681     task_runner_->PostDelayedTask(
682         [weak_this, tsid] {
683           // Skip entirely the flush if the trace session doesn't exist anymore.
684           // This is to prevent misleading error messages to be logged.
685           if (!weak_this)
686             return;
687           auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
688           if (!tracing_session_ptr)
689             return;
690           // If this trace was using STOP_TRACING triggers and we've seen
691           // one, then the trigger overrides the normal timeout. In this
692           // case we just return and let the other task clean up this trace.
693           if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
694                   TraceConfig::TriggerConfig::STOP_TRACING &&
695               !tracing_session_ptr->received_triggers.empty())
696             return;
697           // In all other cases (START_TRACING or no triggers) we flush
698           // after |trace_duration_ms| unconditionally.
699           weak_this->FlushAndDisableTracing(tsid);
700         },
701         trace_duration_ms);
702   }
703 
704   // Start the periodic drain tasks if we should to save the trace into a file.
705   if (tracing_session->config.write_into_file()) {
706     auto weak_this = weak_ptr_factory_.GetWeakPtr();
707     task_runner_->PostDelayedTask(
708         [weak_this, tsid] {
709           if (weak_this)
710             weak_this->ReadBuffers(tsid, nullptr);
711         },
712         tracing_session->delay_to_next_write_period_ms());
713   }
714 
715   // Start the periodic flush tasks if the config specified a flush period.
716   if (tracing_session->config.flush_period_ms())
717     PeriodicFlushTask(tsid, /*post_next_only=*/true);
718 
719   // Start the periodic incremental state clear tasks if the config specified a
720   // period.
721   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
722     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
723   }
724 
725   for (auto& kv : tracing_session->data_source_instances) {
726     ProducerID producer_id = kv.first;
727     DataSourceInstance& data_source = kv.second;
728     ProducerEndpointImpl* producer = GetProducer(producer_id);
729     if (!producer) {
730       PERFETTO_DFATAL("Producer does not exist.");
731       continue;
732     }
733     StartDataSourceInstance(producer, tracing_session, &data_source);
734   }
735   return true;
736 }
737 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)738 void TracingServiceImpl::StartDataSourceInstance(
739     ProducerEndpointImpl* producer,
740     TracingSession* tracing_session,
741     TracingServiceImpl::DataSourceInstance* instance) {
742   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
743   if (instance->will_notify_on_start) {
744     instance->state = DataSourceInstance::STARTING;
745   } else {
746     instance->state = DataSourceInstance::STARTED;
747   }
748   if (tracing_session->consumer_maybe_null) {
749     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
750         *producer, *instance);
751   }
752   producer->StartDataSource(instance->instance_id, instance->config);
753 }
754 
755 // DisableTracing just stops the data sources but doesn't free up any buffer.
756 // This is to allow the consumer to freeze the buffers (by stopping the trace)
757 // and then drain the buffers. The actual teardown of the TracingSession happens
758 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)759 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
760                                         bool disable_immediately) {
761   PERFETTO_DCHECK_THREAD(thread_checker_);
762   TracingSession* tracing_session = GetTracingSession(tsid);
763   if (!tracing_session) {
764     // Can happen if the consumer calls this before EnableTracing() or after
765     // FreeBuffers().
766     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
767     return;
768   }
769 
770   switch (tracing_session->state) {
771     // Spurious call to DisableTracing() while already disabled, nothing to do.
772     case TracingSession::DISABLED:
773       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
774       return;
775 
776     // This is either:
777     // A) The case of a graceful DisableTracing() call followed by a call to
778     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
779     //    to forcefully transition in the disabled state without waiting for the
780     //    outstanding acks because the buffers are going to be destroyed soon.
781     // B) A spurious call, iff |disable_immediately| == false, in which case
782     //    there is nothing to do.
783     case TracingSession::DISABLING_WAITING_STOP_ACKS:
784       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
785       if (disable_immediately)
786         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
787       return;
788 
789     // Continues below.
790     case TracingSession::CONFIGURED:
791       // If the session didn't even start there is no need to orchestrate a
792       // graceful stop of data sources.
793       disable_immediately = true;
794       break;
795 
796     // This is the nominal case, continues below.
797     case TracingSession::STARTED:
798       break;
799   }
800 
801   for (auto& data_source_inst : tracing_session->data_source_instances) {
802     const ProducerID producer_id = data_source_inst.first;
803     DataSourceInstance& instance = data_source_inst.second;
804     const DataSourceInstanceID ds_inst_id = instance.instance_id;
805     ProducerEndpointImpl* producer = GetProducer(producer_id);
806     PERFETTO_DCHECK(producer);
807     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
808                     instance.state == DataSourceInstance::STARTING ||
809                     instance.state == DataSourceInstance::STARTED);
810     if (instance.will_notify_on_stop && !disable_immediately) {
811       instance.state = DataSourceInstance::STOPPING;
812     } else {
813       instance.state = DataSourceInstance::STOPPED;
814     }
815     if (tracing_session->consumer_maybe_null) {
816       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
817           *producer, instance);
818     }
819     producer->StopDataSource(ds_inst_id);
820   }
821 
822   // Either this request is flagged with |disable_immediately| or there are no
823   // data sources that are requesting a final handshake. In both cases just mark
824   // the session as disabled immediately, notify the consumer and flush the
825   // trace file (if used).
826   if (tracing_session->AllDataSourceInstancesStopped())
827     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
828 
829   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
830   auto weak_this = weak_ptr_factory_.GetWeakPtr();
831   auto timeout_ms = override_data_source_test_timeout_ms_for_testing
832                         ? override_data_source_test_timeout_ms_for_testing
833                         : kDataSourceStopTimeoutMs;
834   task_runner_->PostDelayedTask(
835       [weak_this, tsid] {
836         if (weak_this)
837           weak_this->OnDisableTracingTimeout(tsid);
838       },
839       timeout_ms);
840 
841   // Deliberately NOT removing the session from |tracing_session_|, it's still
842   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
843 }
844 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)845 void TracingServiceImpl::NotifyDataSourceStarted(
846     ProducerID producer_id,
847     DataSourceInstanceID instance_id) {
848   PERFETTO_DCHECK_THREAD(thread_checker_);
849   for (auto& kv : tracing_sessions_) {
850     TracingSession& tracing_session = kv.second;
851     DataSourceInstance* instance =
852         tracing_session.GetDataSourceInstance(producer_id, instance_id);
853 
854     if (!instance)
855       continue;
856 
857     if (instance->state != DataSourceInstance::STARTING) {
858       PERFETTO_ELOG("Data source instance in incorrect state.");
859       continue;
860     }
861 
862     instance->state = DataSourceInstance::STARTED;
863 
864     ProducerEndpointImpl* producer = GetProducer(producer_id);
865     PERFETTO_DCHECK(producer);
866     if (tracing_session.consumer_maybe_null) {
867       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
868           *producer, *instance);
869     }
870   }  // for (tracing_session)
871 }
872 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)873 void TracingServiceImpl::NotifyDataSourceStopped(
874     ProducerID producer_id,
875     DataSourceInstanceID instance_id) {
876   PERFETTO_DCHECK_THREAD(thread_checker_);
877   for (auto& kv : tracing_sessions_) {
878     TracingSession& tracing_session = kv.second;
879     DataSourceInstance* instance =
880         tracing_session.GetDataSourceInstance(producer_id, instance_id);
881 
882     if (!instance)
883       continue;
884 
885     if (instance->state != DataSourceInstance::STOPPING) {
886       PERFETTO_ELOG("Data source instance in incorrect state.");
887       continue;
888     }
889     PERFETTO_DCHECK(tracing_session.state ==
890                     TracingSession::DISABLING_WAITING_STOP_ACKS);
891 
892     instance->state = DataSourceInstance::STOPPED;
893 
894     ProducerEndpointImpl* producer = GetProducer(producer_id);
895     PERFETTO_DCHECK(producer);
896     if (tracing_session.consumer_maybe_null) {
897       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
898           *producer, *instance);
899     }
900 
901     if (!tracing_session.AllDataSourceInstancesStopped())
902       continue;
903 
904     // All data sources acked the termination.
905     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
906   }  // for (tracing_session)
907 }
908 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)909 void TracingServiceImpl::ActivateTriggers(
910     ProducerID producer_id,
911     const std::vector<std::string>& triggers) {
912   PERFETTO_DCHECK_THREAD(thread_checker_);
913   auto* producer = GetProducer(producer_id);
914   PERFETTO_DCHECK(producer);
915   for (const auto& trigger_name : triggers) {
916     for (auto& id_and_tracing_session : tracing_sessions_) {
917       auto& tracing_session = id_and_tracing_session.second;
918       TracingSessionID tsid = id_and_tracing_session.first;
919       auto iter = std::find_if(
920           tracing_session.config.trigger_config().triggers().begin(),
921           tracing_session.config.trigger_config().triggers().end(),
922           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
923             return trigger.name() == trigger_name;
924           });
925       if (iter == tracing_session.config.trigger_config().triggers().end()) {
926         continue;
927       }
928 
929       // If this trigger requires a certain producer to have sent it
930       // (non-empty producer_name()) ensure the producer who sent this trigger
931       // matches.
932       if (!iter->producer_name_regex().empty() &&
933           !std::regex_match(producer->name_,
934                             std::regex(iter->producer_name_regex()))) {
935         continue;
936       }
937 
938       const bool triggers_already_received =
939           !tracing_session.received_triggers.empty();
940       tracing_session.received_triggers.push_back(
941           {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
942            producer->name_, producer->uid_});
943       auto weak_this = weak_ptr_factory_.GetWeakPtr();
944       switch (tracing_session.config.trigger_config().trigger_mode()) {
945         case TraceConfig::TriggerConfig::START_TRACING:
946           // If the session has already been triggered and moved past
947           // CONFIGURED then we don't need to repeat StartTracing. This would
948           // work fine (StartTracing would return false) but would add error
949           // logs.
950           if (tracing_session.state != TracingSession::CONFIGURED)
951             break;
952 
953           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
954                         " with duration of %" PRIu32 "ms.",
955                         iter->name().c_str(), tsid, iter->stop_delay_ms());
956           // We override the trace duration to be the trigger's requested
957           // value, this ensures that the trace will end after this amount
958           // of time has passed.
959           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
960           StartTracing(tsid);
961           break;
962         case TraceConfig::TriggerConfig::STOP_TRACING:
963           // Only stop the trace once to avoid confusing log messages. I.E.
964           // when we've already hit the first trigger we've already Posted the
965           // task to FlushAndDisable. So all future triggers will just break
966           // out.
967           if (triggers_already_received)
968             break;
969 
970           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
971                         " with duration of %" PRIu32 "ms.",
972                         iter->name().c_str(), tsid, iter->stop_delay_ms());
973           // Now that we've seen a trigger we need to stop, flush, and disable
974           // this session after the configured |stop_delay_ms|.
975           task_runner_->PostDelayedTask(
976               [weak_this, tsid] {
977                 // Skip entirely the flush if the trace session doesn't exist
978                 // anymore. This is to prevent misleading error messages to be
979                 // logged.
980                 if (weak_this && weak_this->GetTracingSession(tsid))
981                   weak_this->FlushAndDisableTracing(tsid);
982               },
983               // If this trigger is zero this will immediately executable and
984               // will happen shortly.
985               iter->stop_delay_ms());
986           break;
987         case TraceConfig::TriggerConfig::UNSPECIFIED:
988           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
989           break;
990       }
991     }
992   }
993 }
994 
995 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
996 // conditions all data sources should have acked the stop and this will early
997 // out.
OnDisableTracingTimeout(TracingSessionID tsid)998 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
999   PERFETTO_DCHECK_THREAD(thread_checker_);
1000   TracingSession* tracing_session = GetTracingSession(tsid);
1001   if (!tracing_session ||
1002       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1003     return;  // Tracing session was successfully disabled.
1004   }
1005 
1006   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1007                 tsid);
1008   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1009   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1010 }
1011 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1012 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1013     TracingSession* tracing_session) {
1014   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1015   for (auto& inst_kv : tracing_session->data_source_instances) {
1016     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1017       continue;
1018     inst_kv.second.state = DataSourceInstance::STOPPED;
1019     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1020     PERFETTO_DCHECK(producer);
1021     if (tracing_session->consumer_maybe_null) {
1022       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1023           *producer, inst_kv.second);
1024     }
1025   }
1026   tracing_session->state = TracingSession::DISABLED;
1027 
1028   // Scrape any remaining chunks that weren't flushed by the producers.
1029   for (auto& producer_id_and_producer : producers_)
1030     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1031 
1032   if (tracing_session->write_into_file) {
1033     tracing_session->write_period_ms = 0;
1034     ReadBuffers(tracing_session->id, nullptr);
1035   }
1036 
1037   if (tracing_session->consumer_maybe_null)
1038     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
1039 }
1040 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1041 void TracingServiceImpl::Flush(TracingSessionID tsid,
1042                                uint32_t timeout_ms,
1043                                ConsumerEndpoint::FlushCallback callback) {
1044   PERFETTO_DCHECK_THREAD(thread_checker_);
1045   TracingSession* tracing_session = GetTracingSession(tsid);
1046   if (!tracing_session) {
1047     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1048     return;
1049   }
1050 
1051   if (!timeout_ms)
1052     timeout_ms = tracing_session->flush_timeout_ms();
1053 
1054   if (tracing_session->pending_flushes.size() > 1000) {
1055     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1056                   tracing_session->pending_flushes.size());
1057     callback(false);
1058     return;
1059   }
1060 
1061   FlushRequestID flush_request_id = ++last_flush_request_id_;
1062   PendingFlush& pending_flush =
1063       tracing_session->pending_flushes
1064           .emplace_hint(tracing_session->pending_flushes.end(),
1065                         flush_request_id, PendingFlush(std::move(callback)))
1066           ->second;
1067 
1068   // Send a flush request to each producer involved in the tracing session. In
1069   // order to issue a flush request we have to build a map of all data source
1070   // instance ids enabled for each producer.
1071   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1072   for (const auto& data_source_inst : tracing_session->data_source_instances) {
1073     const ProducerID producer_id = data_source_inst.first;
1074     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1075     flush_map[producer_id].push_back(ds_inst_id);
1076   }
1077 
1078   for (const auto& kv : flush_map) {
1079     ProducerID producer_id = kv.first;
1080     ProducerEndpointImpl* producer = GetProducer(producer_id);
1081     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1082     producer->Flush(flush_request_id, data_sources);
1083     pending_flush.producers.insert(producer_id);
1084   }
1085 
1086   // If there are no producers to flush (realistically this happens only in
1087   // some tests) fire OnFlushTimeout() straight away, without waiting.
1088   if (flush_map.empty())
1089     timeout_ms = 0;
1090 
1091   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1092   task_runner_->PostDelayedTask(
1093       [weak_this, tsid, flush_request_id] {
1094         if (weak_this)
1095           weak_this->OnFlushTimeout(tsid, flush_request_id);
1096       },
1097       timeout_ms);
1098 }
1099 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1100 void TracingServiceImpl::NotifyFlushDoneForProducer(
1101     ProducerID producer_id,
1102     FlushRequestID flush_request_id) {
1103   for (auto& kv : tracing_sessions_) {
1104     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1105     auto& pending_flushes = kv.second.pending_flushes;
1106     auto end_it = pending_flushes.upper_bound(flush_request_id);
1107     for (auto it = pending_flushes.begin(); it != end_it;) {
1108       PendingFlush& pending_flush = it->second;
1109       pending_flush.producers.erase(producer_id);
1110       if (pending_flush.producers.empty()) {
1111         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1112         TracingSessionID tsid = kv.first;
1113         auto callback = std::move(pending_flush.callback);
1114         task_runner_->PostTask([weak_this, tsid, callback]() {
1115           if (weak_this) {
1116             weak_this->CompleteFlush(tsid, std::move(callback),
1117                                      /*success=*/true);
1118           }
1119         });
1120         it = pending_flushes.erase(it);
1121       } else {
1122         it++;
1123       }
1124     }  // for (pending_flushes)
1125   }    // for (tracing_session)
1126 }
1127 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1128 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1129                                         FlushRequestID flush_request_id) {
1130   TracingSession* tracing_session = GetTracingSession(tsid);
1131   if (!tracing_session)
1132     return;
1133   auto it = tracing_session->pending_flushes.find(flush_request_id);
1134   if (it == tracing_session->pending_flushes.end())
1135     return;  // Nominal case: flush was completed and acked on time.
1136 
1137   // If there were no producers to flush, consider it a success.
1138   bool success = it->second.producers.empty();
1139 
1140   auto callback = std::move(it->second.callback);
1141   tracing_session->pending_flushes.erase(it);
1142   CompleteFlush(tsid, std::move(callback), success);
1143 }
1144 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1145 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1146                                        ConsumerEndpoint::FlushCallback callback,
1147                                        bool success) {
1148   TracingSession* tracing_session = GetTracingSession(tsid);
1149   if (tracing_session) {
1150     // Producers may not have been able to flush all their data, even if they
1151     // indicated flush completion. If possible, also collect uncommitted chunks
1152     // to make sure we have everything they wrote so far.
1153     for (auto& producer_id_and_producer : producers_) {
1154       ScrapeSharedMemoryBuffers(tracing_session,
1155                                 producer_id_and_producer.second);
1156     }
1157   }
1158   callback(success);
1159 }
1160 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1161 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1162     TracingSession* tracing_session,
1163     ProducerEndpointImpl* producer) {
1164   if (!producer->smb_scraping_enabled_)
1165     return;
1166 
1167   // Can't copy chunks if we don't know about any trace writers.
1168   if (producer->writers_.empty())
1169     return;
1170 
1171   // Performance optimization: On flush or session disconnect, this method is
1172   // called for each producer. If the producer doesn't participate in the
1173   // session, there's no need to scape its chunks right now. We can tell if a
1174   // producer participates in the session by checking if the producer is allowed
1175   // to write into the session's log buffers.
1176   const auto& session_buffers = tracing_session->buffers_index;
1177   bool producer_in_session =
1178       std::any_of(session_buffers.begin(), session_buffers.end(),
1179                   [producer](BufferID buffer_id) {
1180                     return producer->allowed_target_buffers_.count(buffer_id);
1181                   });
1182   if (!producer_in_session)
1183     return;
1184 
1185   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1186 
1187   // Find and copy any uncommitted chunks from the SMB.
1188   //
1189   // In nominal conditions, the page layout of the used SMB pages should never
1190   // change because the service is the only one who is supposed to modify used
1191   // pages (to make them free again).
1192   //
1193   // However, the code here needs to deal with the case of a malicious producer
1194   // altering the SMB in unpredictable ways. Thankfully the SMB size is
1195   // immutable, so a chunk will always point to some valid memory, even if the
1196   // producer alters the intended layout and chunk header concurrently.
1197   // Ultimately a malicious producer altering the SMB's chunk layout while we
1198   // are iterating in this function is not any different from the case of a
1199   // malicious producer asking to commit a chunk made of random data, which is
1200   // something this class has to deal with regardless.
1201   //
1202   // The only legitimate mutations that can happen from sane producers,
1203   // concurrently to this function, are:
1204   //   A. free pages being partitioned,
1205   //   B. free chunks being migrated to kChunkBeingWritten,
1206   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1207 
1208   SharedMemoryABI* abi = &producer->shmem_abi_;
1209   // num_pages() is immutable after the SMB is initialized and cannot be changed
1210   // even by a producer even if malicious.
1211   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1212     uint32_t layout = abi->GetPageLayout(page_idx);
1213 
1214     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
1215     // Skip empty pages.
1216     if (used_chunks == 0)
1217       continue;
1218 
1219     // Scrape the chunks that are currently used. These should be either in
1220     // state kChunkBeingWritten or kChunkComplete.
1221     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1222       if (!(used_chunks & 1))
1223         continue;
1224 
1225       SharedMemoryABI::ChunkState state =
1226           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1227       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1228                       state == SharedMemoryABI::kChunkComplete);
1229       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1230 
1231       SharedMemoryABI::Chunk chunk =
1232           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1233 
1234       uint16_t packet_count;
1235       uint8_t flags;
1236       // GetPacketCountAndFlags has acquire_load semantics.
1237       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1238 
1239       // It only makes sense to copy an incomplete chunk if there's at least
1240       // one full packet available. (The producer may not have completed the
1241       // last packet in it yet, so we need at least 2.)
1242       if (!chunk_complete && packet_count < 2)
1243         continue;
1244 
1245       // At this point, it is safe to access the remaining header fields of
1246       // the chunk. Even if the chunk was only just transferred from
1247       // kChunkFree into kChunkBeingWritten state, the header should be
1248       // written completely once the packet count increased above 1 (it was
1249       // reset to 0 by the service when the chunk was freed).
1250 
1251       WriterID writer_id = chunk.writer_id();
1252       base::Optional<BufferID> target_buffer_id =
1253           producer->buffer_id_for_writer(writer_id);
1254 
1255       // We can only scrape this chunk if we know which log buffer to copy it
1256       // into.
1257       if (!target_buffer_id)
1258         continue;
1259 
1260       // Skip chunks that don't belong to the requested tracing session.
1261       bool target_buffer_belongs_to_session =
1262           std::find(session_buffers.begin(), session_buffers.end(),
1263                     *target_buffer_id) != session_buffers.end();
1264       if (!target_buffer_belongs_to_session)
1265         continue;
1266 
1267       uint32_t chunk_id =
1268           chunk.header()->chunk_id.load(std::memory_order_relaxed);
1269 
1270       CopyProducerPageIntoLogBuffer(
1271           producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
1272           packet_count, flags, chunk_complete, chunk.payload_begin(),
1273           chunk.payload_size());
1274     }
1275   }
1276 }
1277 
FlushAndDisableTracing(TracingSessionID tsid)1278 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1279   PERFETTO_DCHECK_THREAD(thread_checker_);
1280   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1281   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1282   Flush(tsid, 0, [weak_this, tsid](bool success) {
1283     PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
1284                   success, tsid);
1285     if (!weak_this)
1286       return;
1287     TracingSession* session = weak_this->GetTracingSession(tsid);
1288     if (session->consumer_maybe_null) {
1289       // If the consumer is still attached, just disable the session but give it
1290       // a chance to read the contents.
1291       weak_this->DisableTracing(tsid);
1292     } else {
1293       // If the consumer detached, destroy the session. If the consumer did
1294       // start the session in long-tracing mode, the service will have saved
1295       // the contents to the passed file. If not, the contents will be
1296       // destroyed.
1297       weak_this->FreeBuffers(tsid);
1298     }
1299   });
1300 }
1301 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1302 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1303                                            bool post_next_only) {
1304   PERFETTO_DCHECK_THREAD(thread_checker_);
1305   TracingSession* tracing_session = GetTracingSession(tsid);
1306   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1307     return;
1308 
1309   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1310   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1311   task_runner_->PostDelayedTask(
1312       [weak_this, tsid] {
1313         if (weak_this)
1314           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1315       },
1316       flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
1317 
1318   if (post_next_only)
1319     return;
1320 
1321   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1322   Flush(tsid, 0, [](bool success) {
1323     if (!success)
1324       PERFETTO_ELOG("Periodic flush timed out");
1325   });
1326 }
1327 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1328 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1329     TracingSessionID tsid,
1330     bool post_next_only) {
1331   PERFETTO_DCHECK_THREAD(thread_checker_);
1332   TracingSession* tracing_session = GetTracingSession(tsid);
1333   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1334     return;
1335 
1336   uint32_t clear_period_ms =
1337       tracing_session->config.incremental_state_config().clear_period_ms();
1338   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1339   task_runner_->PostDelayedTask(
1340       [weak_this, tsid] {
1341         if (weak_this)
1342           weak_this->PeriodicClearIncrementalStateTask(
1343               tsid, /*post_next_only=*/false);
1344       },
1345       clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
1346 
1347   if (post_next_only)
1348     return;
1349 
1350   PERFETTO_DLOG(
1351       "Performing periodic incremental state clear for trace session %" PRIu64,
1352       tsid);
1353 
1354   // Queue the IPCs to producers with active data sources that opted in.
1355   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1356   for (const auto& kv : tracing_session->data_source_instances) {
1357     ProducerID producer_id = kv.first;
1358     const DataSourceInstance& data_source = kv.second;
1359     if (data_source.handles_incremental_state_clear)
1360       clear_map[producer_id].push_back(data_source.instance_id);
1361   }
1362 
1363   for (const auto& kv : clear_map) {
1364     ProducerID producer_id = kv.first;
1365     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1366     ProducerEndpointImpl* producer = GetProducer(producer_id);
1367     if (!producer) {
1368       PERFETTO_DFATAL("Producer does not exist.");
1369       continue;
1370     }
1371     producer->ClearIncrementalState(data_sources);
1372   }
1373 }
1374 
1375 // Note: when this is called to write into a file passed when starting tracing
1376 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
1377 // to send the trace data back over IPC).
ReadBuffers(TracingSessionID tsid,ConsumerEndpointImpl * consumer)1378 void TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
1379                                      ConsumerEndpointImpl* consumer) {
1380   PERFETTO_DCHECK_THREAD(thread_checker_);
1381   TracingSession* tracing_session = GetTracingSession(tsid);
1382   if (!tracing_session) {
1383     // This will be hit systematically from the PostDelayedTask when directly
1384     // writing into the file (in which case consumer == nullptr). Suppress the
1385     // log in this case as it's just spam.
1386     if (consumer)
1387       PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
1388     return;  // TODO(primiano): signal failure?
1389   }
1390 
1391   // When a tracing session is waiting for a trigger it is considered empty. If
1392   // a tracing session finishes and moves into DISABLED without ever receiving a
1393   // trigger the trace should never return any data. This includes the synthetic
1394   // packets like TraceConfig and Clock snapshots. So we bail out early and let
1395   // the consumer know there is no data.
1396   if (!tracing_session->config.trigger_config().triggers().empty() &&
1397       tracing_session->received_triggers.empty()) {
1398     if (consumer)
1399       consumer->consumer_->OnTraceData({}, /* has_more = */ false);
1400     PERFETTO_DLOG(
1401         "ReadBuffers(): tracing session has not received a trigger yet.");
1402     return;
1403   }
1404 
1405   // This can happen if the file is closed by a previous task because it reaches
1406   // |max_file_size_bytes|.
1407   if (!tracing_session->write_into_file && !consumer)
1408     return;
1409 
1410   if (tracing_session->write_into_file && consumer) {
1411     // If the consumer enabled tracing and asked to save the contents into the
1412     // passed file makes little sense to also try to read the buffers over IPC,
1413     // as that would just steal data from the periodic draining task.
1414     PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
1415     return;
1416   }
1417 
1418   std::vector<TracePacket> packets;
1419   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
1420 
1421   std::move(tracing_session->initial_clock_snapshot_.begin(),
1422             tracing_session->initial_clock_snapshot_.end(),
1423             std::back_inserter(packets));
1424   tracing_session->initial_clock_snapshot_.clear();
1425 
1426   base::TimeMillis now = base::GetWallTimeMs();
1427   if (now >= tracing_session->last_snapshot_time + kSnapshotsInterval) {
1428     tracing_session->last_snapshot_time = now;
1429     SnapshotSyncMarker(&packets);
1430     SnapshotStats(tracing_session, &packets);
1431 
1432     if (!tracing_session->config.builtin_data_sources()
1433              .disable_clock_snapshotting()) {
1434       // We don't want to put a root timestamp in this snapshot as the packet
1435       // may be very out of order with respect to the actual trace packets
1436       // since consuming the trace may happen at any point after it starts.
1437       SnapshotClocks(&packets, /*set_root_timestamp=*/false);
1438     }
1439   }
1440   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
1441     MaybeEmitTraceConfig(tracing_session, &packets);
1442     MaybeEmitReceivedTriggers(tracing_session, &packets);
1443   }
1444   if (!tracing_session->config.builtin_data_sources().disable_system_info())
1445     MaybeEmitSystemInfo(tracing_session, &packets);
1446 
1447   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
1448   size_t total_slices = 0;   // SUM(#slices in |packets|).
1449 
1450   // Add up size for packets added by the Maybe* calls above.
1451   for (const TracePacket& packet : packets) {
1452     packets_bytes += packet.size();
1453     total_slices += packet.slices().size();
1454   }
1455 
1456   // This is a rough threshold to determine how much to read from the buffer in
1457   // each task. This is to avoid executing a single huge sending task for too
1458   // long and risk to hit the watchdog. This is *not* an upper bound: we just
1459   // stop accumulating new packets and PostTask *after* we cross this threshold.
1460   // This constant essentially balances the PostTask and IPC overhead vs the
1461   // responsiveness of the service. An extremely small value will cause one IPC
1462   // and one PostTask for each slice but will keep the service extremely
1463   // responsive. An extremely large value will batch the send for the full
1464   // buffer in one large task, will hit the blocking send() once the socket
1465   // buffers are full and hang the service for a bit (until the consumer
1466   // catches up).
1467   static constexpr size_t kApproxBytesPerTask = 32768;
1468   bool did_hit_threshold = false;
1469 
1470   // TODO(primiano): Extend the ReadBuffers API to allow reading only some
1471   // buffers, not all of them in one go.
1472   for (size_t buf_idx = 0;
1473        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
1474        buf_idx++) {
1475     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
1476     if (tbuf_iter == buffers_.end()) {
1477       PERFETTO_DFATAL("Buffer not found.");
1478       continue;
1479     }
1480     TraceBuffer& tbuf = *tbuf_iter->second;
1481     tbuf.BeginRead();
1482     while (!did_hit_threshold) {
1483       TracePacket packet;
1484       TraceBuffer::PacketSequenceProperties sequence_properties{};
1485       bool previous_packet_dropped;
1486       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
1487                                     &previous_packet_dropped)) {
1488         break;
1489       }
1490       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
1491       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
1492       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
1493       PERFETTO_DCHECK(packet.size() > 0);
1494       if (!PacketStreamValidator::Validate(packet.slices())) {
1495         PERFETTO_DLOG("Dropping invalid packet");
1496         continue;
1497       }
1498 
1499       // Append a slice with the trusted field data. This can't be spoofed
1500       // because above we validated that the existing slices don't contain any
1501       // trusted fields. For added safety we append instead of prepending
1502       // because according to protobuf semantics, if the same field is
1503       // encountered multiple times the last instance takes priority. Note that
1504       // truncated packets are also rejected, so the producer can't give us a
1505       // partial packet (e.g., a truncated string) which only becomes valid when
1506       // the trusted data is appended here.
1507       protos::TrustedPacket trusted_packet;
1508       trusted_packet.set_trusted_uid(
1509           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
1510       trusted_packet.set_trusted_packet_sequence_id(
1511           tracing_session->GetPacketSequenceID(
1512               sequence_properties.producer_id_trusted,
1513               sequence_properties.writer_id));
1514       if (previous_packet_dropped)
1515         trusted_packet.set_previous_packet_dropped(previous_packet_dropped);
1516       static constexpr size_t kTrustedBufSize = 16;
1517       Slice slice = Slice::Allocate(kTrustedBufSize);
1518       PERFETTO_CHECK(
1519           trusted_packet.SerializeToArray(slice.own_data(), kTrustedBufSize));
1520       slice.size = static_cast<size_t>(trusted_packet.GetCachedSize());
1521       PERFETTO_DCHECK(slice.size > 0 && slice.size <= kTrustedBufSize);
1522       packet.AddSlice(std::move(slice));
1523 
1524       // Append the packet (inclusive of the trusted uid) to |packets|.
1525       packets_bytes += packet.size();
1526       total_slices += packet.slices().size();
1527       did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
1528                           !tracing_session->write_into_file;
1529       packets.emplace_back(std::move(packet));
1530     }  // for(packets...)
1531   }    // for(buffers...)
1532 
1533   // If the caller asked us to write into a file by setting
1534   // |write_into_file| == true in the trace config, drain the packets read
1535   // (if any) into the given file descriptor.
1536   if (tracing_session->write_into_file) {
1537     const uint64_t max_size = tracing_session->max_file_size_bytes
1538                                   ? tracing_session->max_file_size_bytes
1539                                   : std::numeric_limits<size_t>::max();
1540 
1541     // When writing into a file, the file should look like a root trace.proto
1542     // message. Each packet should be prepended with a proto preamble stating
1543     // its field id (within trace.proto) and size. Hence the addition below.
1544     const size_t max_iovecs = total_slices + packets.size();
1545 
1546     size_t num_iovecs = 0;
1547     bool stop_writing_into_file = tracing_session->write_period_ms == 0;
1548     std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
1549     size_t num_iovecs_at_last_packet = 0;
1550     uint64_t bytes_about_to_be_written = 0;
1551     for (TracePacket& packet : packets) {
1552       std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
1553           packet.GetProtoPreamble();
1554       bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
1555       num_iovecs++;
1556       for (const Slice& slice : packet.slices()) {
1557         // writev() doesn't change the passed pointer. However, struct iovec
1558         // take a non-const ptr because it's the same struct used by readv().
1559         // Hence the const_cast here.
1560         char* start = static_cast<char*>(const_cast<void*>(slice.start));
1561         bytes_about_to_be_written += slice.size;
1562         iovecs[num_iovecs++] = {start, slice.size};
1563       }
1564 
1565       if (tracing_session->bytes_written_into_file +
1566               bytes_about_to_be_written >=
1567           max_size) {
1568         stop_writing_into_file = true;
1569         num_iovecs = num_iovecs_at_last_packet;
1570         break;
1571       }
1572 
1573       num_iovecs_at_last_packet = num_iovecs;
1574     }
1575     PERFETTO_DCHECK(num_iovecs <= max_iovecs);
1576     int fd = *tracing_session->write_into_file;
1577 
1578     uint64_t total_wr_size = 0;
1579 
1580     // writev() can take at most IOV_MAX entries per call. Batch them.
1581     constexpr size_t kIOVMax = IOV_MAX;
1582     for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
1583       int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
1584       ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
1585       if (wr_size <= 0) {
1586         PERFETTO_PLOG("writev() failed");
1587         stop_writing_into_file = true;
1588         break;
1589       }
1590       total_wr_size += static_cast<size_t>(wr_size);
1591     }
1592 
1593     tracing_session->bytes_written_into_file += total_wr_size;
1594 
1595     PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
1596                   (total_wr_size + 1023) / 1024, stop_writing_into_file);
1597     if (stop_writing_into_file) {
1598       // Ensure all data was written to the file before we close it.
1599       base::FlushFile(fd);
1600       tracing_session->write_into_file.reset();
1601       tracing_session->write_period_ms = 0;
1602       if (tracing_session->state == TracingSession::STARTED)
1603         DisableTracing(tsid);
1604       return;
1605     }
1606 
1607     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1608     task_runner_->PostDelayedTask(
1609         [weak_this, tsid] {
1610           if (weak_this)
1611             weak_this->ReadBuffers(tsid, nullptr);
1612         },
1613         tracing_session->delay_to_next_write_period_ms());
1614     return;
1615   }  // if (tracing_session->write_into_file)
1616 
1617   const bool has_more = did_hit_threshold;
1618   if (has_more) {
1619     auto weak_consumer = consumer->GetWeakPtr();
1620     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1621     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
1622       if (!weak_this || !weak_consumer)
1623         return;
1624       weak_this->ReadBuffers(tsid, weak_consumer.get());
1625     });
1626   }
1627 
1628   // Keep this as tail call, just in case the consumer re-enters.
1629   consumer->consumer_->OnTraceData(std::move(packets), has_more);
1630 }
1631 
FreeBuffers(TracingSessionID tsid)1632 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
1633   PERFETTO_DCHECK_THREAD(thread_checker_);
1634   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
1635   TracingSession* tracing_session = GetTracingSession(tsid);
1636   if (!tracing_session) {
1637     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
1638     return;  // TODO(primiano): signal failure?
1639   }
1640   DisableTracing(tsid, /*disable_immediately=*/true);
1641 
1642   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1643   tracing_session->data_source_instances.clear();
1644 
1645   for (auto& producer_entry : producers_) {
1646     ProducerEndpointImpl* producer = producer_entry.second;
1647     producer->OnFreeBuffers(tracing_session->buffers_index);
1648   }
1649 
1650   for (BufferID buffer_id : tracing_session->buffers_index) {
1651     buffer_ids_.Free(buffer_id);
1652     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
1653     buffers_.erase(buffer_id);
1654   }
1655   bool notify_traceur = tracing_session->config.notify_traceur();
1656   tracing_sessions_.erase(tsid);
1657   UpdateMemoryGuardrail();
1658 
1659   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
1660                tracing_sessions_.size());
1661 
1662 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
1663   static const char kTraceurProp[] = "sys.trace.trace_end_signal";
1664   if (notify_traceur && __system_property_set(kTraceurProp, "1"))
1665     PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
1666 #else
1667   base::ignore_result(notify_traceur);
1668 #endif
1669 }
1670 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)1671 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
1672                                             const DataSourceDescriptor& desc) {
1673   PERFETTO_DCHECK_THREAD(thread_checker_);
1674   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
1675                 producer_id, desc.name().c_str());
1676 
1677   PERFETTO_DCHECK(!desc.name().empty());
1678   auto reg_ds = data_sources_.emplace(desc.name(),
1679                                       RegisteredDataSource{producer_id, desc});
1680 
1681   // If there are existing tracing sessions, we need to check if the new
1682   // data source is enabled by any of them.
1683   if (tracing_sessions_.empty())
1684     return;
1685 
1686   ProducerEndpointImpl* producer = GetProducer(producer_id);
1687   if (!producer) {
1688     PERFETTO_DFATAL("Producer not found.");
1689     return;
1690   }
1691 
1692   for (auto& iter : tracing_sessions_) {
1693     TracingSession& tracing_session = iter.second;
1694     if (tracing_session.state != TracingSession::STARTED &&
1695         tracing_session.state != TracingSession::CONFIGURED) {
1696       continue;
1697     }
1698 
1699     TraceConfig::ProducerConfig producer_config;
1700     for (auto& config : tracing_session.config.producers()) {
1701       if (producer->name_ == config.producer_name()) {
1702         producer_config = config;
1703         break;
1704       }
1705     }
1706     for (const TraceConfig::DataSource& cfg_data_source :
1707          tracing_session.config.data_sources()) {
1708       if (cfg_data_source.config().name() != desc.name())
1709         continue;
1710       DataSourceInstance* ds_inst = SetupDataSource(
1711           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
1712       if (ds_inst && tracing_session.state == TracingSession::STARTED)
1713         StartDataSourceInstance(producer, &tracing_session, ds_inst);
1714     }
1715   }
1716 }
1717 
UnregisterDataSource(ProducerID producer_id,const std::string & name)1718 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
1719                                               const std::string& name) {
1720   PERFETTO_DCHECK_THREAD(thread_checker_);
1721   PERFETTO_CHECK(producer_id);
1722   ProducerEndpointImpl* producer = GetProducer(producer_id);
1723   PERFETTO_DCHECK(producer);
1724   for (auto& kv : tracing_sessions_) {
1725     auto& ds_instances = kv.second.data_source_instances;
1726     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
1727       if (it->first == producer_id && it->second.data_source_name == name) {
1728         DataSourceInstanceID ds_inst_id = it->second.instance_id;
1729         if (it->second.state != DataSourceInstance::STOPPED) {
1730           if (it->second.state != DataSourceInstance::STOPPING)
1731             producer->StopDataSource(ds_inst_id);
1732           // Mark the instance as stopped immediately, since we are
1733           // unregistering it below.
1734           if (it->second.state == DataSourceInstance::STOPPING)
1735             NotifyDataSourceStopped(producer_id, ds_inst_id);
1736         }
1737         it = ds_instances.erase(it);
1738       } else {
1739         ++it;
1740       }
1741     }  // for (data_source_instances)
1742   }    // for (tracing_session)
1743 
1744   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
1745     if (it->second.producer_id == producer_id &&
1746         it->second.descriptor.name() == name) {
1747       data_sources_.erase(it);
1748       return;
1749     }
1750   }
1751 
1752   PERFETTO_DFATAL(
1753       "Tried to unregister a non-existent data source \"%s\" for "
1754       "producer %" PRIu16,
1755       name.c_str(), producer_id);
1756 }
1757 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)1758 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
1759     const TraceConfig::DataSource& cfg_data_source,
1760     const TraceConfig::ProducerConfig& producer_config,
1761     const RegisteredDataSource& data_source,
1762     TracingSession* tracing_session) {
1763   PERFETTO_DCHECK_THREAD(thread_checker_);
1764   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
1765   PERFETTO_DCHECK(producer);
1766   // An existing producer that is not ftrace could have registered itself as
1767   // ftrace, we must not enable it in that case.
1768   if (lockdown_mode_ && producer->uid_ != uid_) {
1769     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
1770     return nullptr;
1771   }
1772   // TODO(primiano): Add tests for registration ordering
1773   // (data sources vs consumers).
1774   // TODO: This logic is duplicated in ChangeTraceConfig, consider refactoring
1775   // it. Meanwhile update both.
1776   if (!cfg_data_source.producer_name_filter().empty()) {
1777     if (std::find(cfg_data_source.producer_name_filter().begin(),
1778                   cfg_data_source.producer_name_filter().end(),
1779                   producer->name_) ==
1780         cfg_data_source.producer_name_filter().end()) {
1781       PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
1782                     cfg_data_source.config().name().c_str(),
1783                     producer->name_.c_str());
1784       return nullptr;
1785     }
1786   }
1787 
1788   auto relative_buffer_id = cfg_data_source.config().target_buffer();
1789   if (relative_buffer_id >= tracing_session->num_buffers()) {
1790     PERFETTO_LOG(
1791         "The TraceConfig for DataSource %s specified a target_buffer out of "
1792         "bound (%d). Skipping it.",
1793         cfg_data_source.config().name().c_str(), relative_buffer_id);
1794     return nullptr;
1795   }
1796 
1797   // Create a copy of the DataSourceConfig specified in the trace config. This
1798   // will be passed to the producer after translating the |target_buffer| id.
1799   // The |target_buffer| parameter passed by the consumer in the trace config is
1800   // relative to the buffers declared in the same trace config. This has to be
1801   // translated to the global BufferID before passing it to the producers, which
1802   // don't know anything about tracing sessions and consumers.
1803 
1804   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
1805   auto insert_iter = tracing_session->data_source_instances.emplace(
1806       std::piecewise_construct,  //
1807       std::forward_as_tuple(producer->id_),
1808       std::forward_as_tuple(
1809           inst_id,
1810           cfg_data_source.config(),  //  Deliberate copy.
1811           data_source.descriptor.name(),
1812           data_source.descriptor.will_notify_on_start(),
1813           data_source.descriptor.will_notify_on_stop(),
1814           data_source.descriptor.handles_incremental_state_clear()));
1815   DataSourceInstance* ds_instance = &insert_iter->second;
1816 
1817   // New data source instance starts out in CONFIGURED state.
1818   if (tracing_session->consumer_maybe_null) {
1819     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1820         *producer, *ds_instance);
1821   }
1822 
1823   DataSourceConfig& ds_config = ds_instance->config;
1824   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
1825   ds_config.set_enable_extra_guardrails(
1826       tracing_session->config.enable_extra_guardrails());
1827   ds_config.set_tracing_session_id(tracing_session->id);
1828   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
1829   PERFETTO_DCHECK(global_id);
1830   ds_config.set_target_buffer(global_id);
1831 
1832   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
1833                 ds_config.name().c_str(), global_id);
1834   if (!producer->shared_memory()) {
1835     // Determine the SMB page size. Must be an integer multiple of 4k.
1836     size_t page_size = std::min<size_t>(producer_config.page_size_kb() * 1024,
1837                                         SharedMemoryABI::kMaxPageSize);
1838     if (page_size < base::kPageSize || page_size % base::kPageSize != 0)
1839       page_size = kDefaultShmPageSize;
1840     producer->shared_buffer_page_size_kb_ = page_size / 1024;
1841 
1842     // Determine the SMB size. Must be an integer multiple of the SMB page size.
1843     // The decisional tree is as follows:
1844     // 1. Give priority to what defined in the trace config.
1845     // 2. If unset give priority to the hint passed by the producer.
1846     // 3. Keep within bounds and ensure it's a multiple of the page size.
1847     size_t shm_size = producer_config.shm_size_kb() * 1024;
1848     if (shm_size == 0)
1849       shm_size = producer->shmem_size_hint_bytes_;
1850     shm_size = std::min<size_t>(shm_size, kMaxShmSize);
1851     if (shm_size < page_size || shm_size % page_size)
1852       shm_size = kDefaultShmSize;
1853 
1854     // TODO(primiano): right now Create() will suicide in case of OOM if the
1855     // mmap fails. We should instead gracefully fail the request and tell the
1856     // client to go away.
1857     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
1858     producer->SetSharedMemory(std::move(shared_memory));
1859     producer->OnTracingSetup();
1860     UpdateMemoryGuardrail();
1861   }
1862   producer->SetupDataSource(inst_id, ds_config);
1863   return ds_instance;
1864 }
1865 
1866 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
1867 // might be lying / returning garbage contents. |src| and |size| can be trusted
1868 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)1869 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
1870     ProducerID producer_id_trusted,
1871     uid_t producer_uid_trusted,
1872     WriterID writer_id,
1873     ChunkID chunk_id,
1874     BufferID buffer_id,
1875     uint16_t num_fragments,
1876     uint8_t chunk_flags,
1877     bool chunk_complete,
1878     const uint8_t* src,
1879     size_t size) {
1880   PERFETTO_DCHECK_THREAD(thread_checker_);
1881 
1882   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
1883   if (!producer) {
1884     PERFETTO_DFATAL("Producer not found.");
1885     chunks_discarded_++;
1886     return;
1887   }
1888 
1889   TraceBuffer* buf = GetBufferByID(buffer_id);
1890   if (!buf) {
1891     PERFETTO_DLOG("Could not find target buffer %" PRIu16
1892                   " for producer %" PRIu16,
1893                   buffer_id, producer_id_trusted);
1894     chunks_discarded_++;
1895     return;
1896   }
1897 
1898   // Verify that the producer is actually allowed to write into the target
1899   // buffer specified in the request. This prevents a malicious producer from
1900   // injecting data into a log buffer that belongs to a tracing session the
1901   // producer is not part of.
1902   if (!producer->is_allowed_target_buffer(buffer_id)) {
1903     PERFETTO_ELOG("Producer %" PRIu16
1904                   " tried to write into forbidden target buffer %" PRIu16,
1905                   producer_id_trusted, buffer_id);
1906     PERFETTO_DFATAL("Forbidden target buffer");
1907     chunks_discarded_++;
1908     return;
1909   }
1910 
1911   // If the writer was registered by the producer, it should only write into the
1912   // buffer it was registered with.
1913   base::Optional<BufferID> associated_buffer =
1914       producer->buffer_id_for_writer(writer_id);
1915   if (associated_buffer && *associated_buffer != buffer_id) {
1916     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
1917                   " was registered to write into target buffer %" PRIu16
1918                   ", but tried to write into buffer %" PRIu16,
1919                   writer_id, producer_id_trusted, *associated_buffer,
1920                   buffer_id);
1921     PERFETTO_DFATAL("Wrong target buffer");
1922     chunks_discarded_++;
1923     return;
1924   }
1925 
1926   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
1927                           chunk_id, num_fragments, chunk_flags, chunk_complete,
1928                           src, size);
1929 }
1930 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)1931 void TracingServiceImpl::ApplyChunkPatches(
1932     ProducerID producer_id_trusted,
1933     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
1934   PERFETTO_DCHECK_THREAD(thread_checker_);
1935 
1936   for (const auto& chunk : chunks_to_patch) {
1937     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
1938     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
1939     TraceBuffer* buf =
1940         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
1941     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
1942                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
1943     if (!writer_id || writer_id > kMaxWriterID || !buf) {
1944       PERFETTO_ELOG(
1945           "Received invalid chunks_to_patch request from Producer: %" PRIu16
1946           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
1947           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
1948       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
1949       continue;
1950     }
1951 
1952     // Note, there's no need to validate that the producer is allowed to write
1953     // to the specified buffer ID (or that it's the correct buffer ID for a
1954     // registered TraceWriter). That's because TraceBuffer uses the producer ID
1955     // and writer ID to look up the chunk to patch. If the producer specifies an
1956     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
1957     // patches. Because the producer ID is trusted, there's also no way for a
1958     // malicious producer to patch another producer's data.
1959 
1960     // Speculate on the fact that there are going to be a limited amount of
1961     // patches per request, so we can allocate the |patches| array on the stack.
1962     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
1963     if (chunk.patches().size() > patches.size()) {
1964       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
1965                     patches.size());
1966       PERFETTO_DFATAL("Too many patches");
1967       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
1968       continue;
1969     }
1970 
1971     size_t i = 0;
1972     for (const auto& patch : chunk.patches()) {
1973       const std::string& patch_data = patch.data();
1974       if (patch_data.size() != patches[i].data.size()) {
1975         PERFETTO_ELOG("Received patch from producer: %" PRIu16
1976                       " of unexpected size %zu",
1977                       producer_id_trusted, patch_data.size());
1978         patches_discarded_++;
1979         continue;
1980       }
1981       patches[i].offset_untrusted = patch.offset();
1982       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
1983       i++;
1984     }
1985     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
1986                                &patches[0], i, chunk.has_more_patches());
1987   }
1988 }
1989 
GetDetachedSession(uid_t uid,const std::string & key)1990 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
1991     uid_t uid,
1992     const std::string& key) {
1993   PERFETTO_DCHECK_THREAD(thread_checker_);
1994   for (auto& kv : tracing_sessions_) {
1995     TracingSession* session = &kv.second;
1996     if (session->consumer_uid == uid && session->detach_key == key) {
1997       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
1998       return session;
1999     }
2000   }
2001   return nullptr;
2002 }
2003 
GetTracingSession(TracingSessionID tsid)2004 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2005     TracingSessionID tsid) {
2006   PERFETTO_DCHECK_THREAD(thread_checker_);
2007   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2008   if (it == tracing_sessions_.end())
2009     return nullptr;
2010   return &it->second;
2011 }
2012 
GetNextProducerID()2013 ProducerID TracingServiceImpl::GetNextProducerID() {
2014   PERFETTO_DCHECK_THREAD(thread_checker_);
2015   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2016   do {
2017     ++last_producer_id_;
2018   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2019   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2020   return last_producer_id_;
2021 }
2022 
GetBufferByID(BufferID buffer_id)2023 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2024   auto buf_iter = buffers_.find(buffer_id);
2025   if (buf_iter == buffers_.end())
2026     return nullptr;
2027   return &*buf_iter->second;
2028 }
2029 
OnStartTriggersTimeout(TracingSessionID tsid)2030 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2031   // Skip entirely the flush if the trace session doesn't exist anymore.
2032   // This is to prevent misleading error messages to be logged.
2033   //
2034   // if the trace has started from the trigger we rely on
2035   // the |stop_delay_ms| from the trigger so don't flush and
2036   // disable if we've moved beyond a CONFIGURED state
2037   auto* tracing_session_ptr = GetTracingSession(tsid);
2038   if (tracing_session_ptr &&
2039       tracing_session_ptr->state == TracingSession::CONFIGURED) {
2040     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2041                   " since no triggers activated.",
2042                   tsid);
2043     // No data should be returned from ReadBuffers() regardless of if we
2044     // call FreeBuffers() or DisableTracing(). This is because in
2045     // STOP_TRACING we need this promise in either case, and using
2046     // DisableTracing() allows a graceful shutdown. Consumers can follow
2047     // their normal path and check the buffers through ReadBuffers() and
2048     // the code won't hang because the tracing session will still be
2049     // alive just disabled.
2050     DisableTracing(tsid);
2051   }
2052 }
2053 
UpdateMemoryGuardrail()2054 void TracingServiceImpl::UpdateMemoryGuardrail() {
2055 #if !PERFETTO_BUILDFLAG(PERFETTO_EMBEDDER_BUILD) && \
2056     !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2057   uint64_t total_buffer_bytes = 0;
2058 
2059   // Sum up all the shared memory buffers.
2060   for (const auto& id_to_producer : producers_) {
2061     if (id_to_producer.second->shared_memory())
2062       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2063   }
2064 
2065   // Sum up all the trace buffers.
2066   for (const auto& id_to_buffer : buffers_) {
2067     total_buffer_bytes += id_to_buffer.second->size();
2068   }
2069 
2070   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2071   // interval.
2072   uint64_t guardrail = 32 * 1024 * 1024 + total_buffer_bytes;
2073   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2074 #endif
2075 }
2076 
SnapshotSyncMarker(std::vector<TracePacket> * packets)2077 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
2078   // The sync markes is used to tokenize large traces efficiently.
2079   // See description in trace_packet.proto.
2080   if (sync_marker_packet_size_ == 0) {
2081     // Serialize the marker and the uid separately to guarantee that the marker
2082     // is serialzied at the end and is adjacent to the start of the next packet.
2083     int size_left = static_cast<int>(sizeof(sync_marker_packet_));
2084     uint8_t* dst = &sync_marker_packet_[0];
2085     protos::TrustedPacket packet;
2086     packet.set_trusted_uid(static_cast<int32_t>(uid_));
2087     packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2088     PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
2089     size_left -= packet.ByteSize();
2090     sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
2091     dst += sync_marker_packet_size_;
2092 
2093     packet.Clear();
2094     packet.set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
2095     PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
2096     sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
2097     PERFETTO_CHECK(sync_marker_packet_size_ <= sizeof(sync_marker_packet_));
2098   };
2099   packets->emplace_back();
2100   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
2101 }
2102 
SnapshotClocks(std::vector<TracePacket> * packets,bool set_root_timestamp)2103 void TracingServiceImpl::SnapshotClocks(std::vector<TracePacket>* packets,
2104                                         bool set_root_timestamp) {
2105   protos::TrustedPacket packet;
2106   protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot();
2107 
2108 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
2109     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2110   struct {
2111     clockid_t id;
2112     protos::ClockSnapshot::Clock::Type type;
2113     struct timespec ts;
2114   } clocks[] = {
2115       {CLOCK_BOOTTIME, protos::ClockSnapshot::Clock::BOOTTIME, {0, 0}},
2116       {CLOCK_REALTIME_COARSE,
2117        protos::ClockSnapshot::Clock::REALTIME_COARSE,
2118        {0, 0}},
2119       {CLOCK_MONOTONIC_COARSE,
2120        protos::ClockSnapshot::Clock::MONOTONIC_COARSE,
2121        {0, 0}},
2122       {CLOCK_REALTIME, protos::ClockSnapshot::Clock::REALTIME, {0, 0}},
2123       {CLOCK_MONOTONIC, protos::ClockSnapshot::Clock::MONOTONIC, {0, 0}},
2124       {CLOCK_MONOTONIC_RAW,
2125        protos::ClockSnapshot::Clock::MONOTONIC_RAW,
2126        {0, 0}},
2127       {CLOCK_PROCESS_CPUTIME_ID,
2128        protos::ClockSnapshot::Clock::PROCESS_CPUTIME,
2129        {0, 0}},
2130       {CLOCK_THREAD_CPUTIME_ID,
2131        protos::ClockSnapshot::Clock::THREAD_CPUTIME,
2132        {0, 0}},
2133   };
2134   // First snapshot all the clocks as atomically as we can.
2135   for (auto& clock : clocks) {
2136     if (clock_gettime(clock.id, &clock.ts) == -1)
2137       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
2138   }
2139   for (auto& clock : clocks) {
2140     if (set_root_timestamp &&
2141         clock.type == protos::ClockSnapshot::Clock::BOOTTIME) {
2142       packet.set_timestamp(
2143           static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
2144     };
2145     protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
2146     c->set_type(clock.type);
2147     c->set_timestamp(
2148         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
2149   }
2150 #else   // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2151   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
2152   if (set_root_timestamp)
2153     packet.set_timestamp(wall_time_ns);
2154   protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
2155   c->set_type(protos::ClockSnapshot::Clock::MONOTONIC);
2156   c->set_timestamp(wall_time_ns);
2157 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
2158 
2159   packet.set_trusted_uid(static_cast<int32_t>(uid_));
2160   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2161   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2162   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2163   packets->emplace_back();
2164   packets->back().AddSlice(std::move(slice));
2165 }
2166 
SnapshotStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)2167 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
2168                                        std::vector<TracePacket>* packets) {
2169   protos::TrustedPacket packet;
2170   packet.set_trusted_uid(static_cast<int32_t>(uid_));
2171   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2172 
2173   protos::TraceStats* trace_stats = packet.mutable_trace_stats();
2174   GetTraceStats(tracing_session).ToProto(trace_stats);
2175   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2176   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2177   packets->emplace_back();
2178   packets->back().AddSlice(std::move(slice));
2179 }
2180 
GetTraceStats(TracingSession * tracing_session)2181 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
2182   TraceStats trace_stats;
2183   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
2184   trace_stats.set_producers_seen(last_producer_id_);
2185   trace_stats.set_data_sources_registered(
2186       static_cast<uint32_t>(data_sources_.size()));
2187   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
2188   trace_stats.set_tracing_sessions(
2189       static_cast<uint32_t>(tracing_sessions_.size()));
2190   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
2191   trace_stats.set_chunks_discarded(chunks_discarded_);
2192   trace_stats.set_patches_discarded(patches_discarded_);
2193 
2194   for (BufferID buf_id : tracing_session->buffers_index) {
2195     TraceBuffer* buf = GetBufferByID(buf_id);
2196     if (!buf) {
2197       PERFETTO_DFATAL("Buffer not found.");
2198       continue;
2199     }
2200     *trace_stats.add_buffer_stats() = buf->stats();
2201   }  // for (buf in session).
2202   return trace_stats;
2203 }
2204 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)2205 void TracingServiceImpl::MaybeEmitTraceConfig(
2206     TracingSession* tracing_session,
2207     std::vector<TracePacket>* packets) {
2208   if (tracing_session->did_emit_config)
2209     return;
2210   tracing_session->did_emit_config = true;
2211   protos::TrustedPacket packet;
2212   tracing_session->config.ToProto(packet.mutable_trace_config());
2213   packet.set_trusted_uid(static_cast<int32_t>(uid_));
2214   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2215   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2216   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2217   packets->emplace_back();
2218   packets->back().AddSlice(std::move(slice));
2219 }
2220 
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)2221 void TracingServiceImpl::MaybeEmitSystemInfo(
2222     TracingSession* tracing_session,
2223     std::vector<TracePacket>* packets) {
2224   if (tracing_session->did_emit_system_info)
2225     return;
2226   tracing_session->did_emit_system_info = true;
2227   protos::TrustedPacket packet;
2228 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2229   protos::SystemInfo* info = packet.mutable_system_info();
2230   struct utsname uname_info;
2231   if (uname(&uname_info) == 0) {
2232     protos::Utsname* utsname_info = info->mutable_utsname();
2233     utsname_info->set_sysname(uname_info.sysname);
2234     utsname_info->set_version(uname_info.version);
2235     utsname_info->set_machine(uname_info.machine);
2236     utsname_info->set_release(uname_info.release);
2237   }
2238 #endif
2239   packet.set_trusted_uid(static_cast<int32_t>(uid_));
2240   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2241   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2242   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2243   packets->emplace_back();
2244   packets->back().AddSlice(std::move(slice));
2245 }
2246 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)2247 void TracingServiceImpl::MaybeEmitReceivedTriggers(
2248     TracingSession* tracing_session,
2249     std::vector<TracePacket>* packets) {
2250   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
2251                   tracing_session->received_triggers.size());
2252   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
2253        i < tracing_session->received_triggers.size(); ++i) {
2254     const auto& info = tracing_session->received_triggers[i];
2255     protos::TrustedPacket packet;
2256 
2257     protos::Trigger* trigger = packet.mutable_trigger();
2258     trigger->set_trigger_name(info.trigger_name);
2259     trigger->set_producer_name(info.producer_name);
2260     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
2261 
2262     packet.set_timestamp(info.boot_time_ns);
2263     packet.set_trusted_uid(static_cast<int32_t>(uid_));
2264     packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
2265     Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
2266     PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
2267     packets->emplace_back();
2268     packets->back().AddSlice(std::move(slice));
2269     ++tracing_session->num_triggers_emitted_into_trace;
2270   }
2271 }
2272 
2273 ////////////////////////////////////////////////////////////////////////////////
2274 // TracingServiceImpl::ConsumerEndpointImpl implementation
2275 ////////////////////////////////////////////////////////////////////////////////
2276 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)2277 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
2278     TracingServiceImpl* service,
2279     base::TaskRunner* task_runner,
2280     Consumer* consumer,
2281     uid_t uid)
2282     : task_runner_(task_runner),
2283       service_(service),
2284       consumer_(consumer),
2285       uid_(uid),
2286       weak_ptr_factory_(this) {}
2287 
~ConsumerEndpointImpl()2288 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
2289   service_->DisconnectConsumer(this);
2290   consumer_->OnDisconnect();
2291 }
2292 
NotifyOnTracingDisabled()2293 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
2294   PERFETTO_DCHECK_THREAD(thread_checker_);
2295   auto weak_this = GetWeakPtr();
2296   task_runner_->PostTask([weak_this] {
2297     if (weak_this)
2298       weak_this->consumer_->OnTracingDisabled();
2299   });
2300 }
2301 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)2302 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
2303     const TraceConfig& cfg,
2304     base::ScopedFile fd) {
2305   PERFETTO_DCHECK_THREAD(thread_checker_);
2306   if (!service_->EnableTracing(this, cfg, std::move(fd)))
2307     NotifyOnTracingDisabled();
2308 }
2309 
ChangeTraceConfig(const TraceConfig & cfg)2310 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
2311     const TraceConfig& cfg) {
2312   if (!tracing_session_id_) {
2313     PERFETTO_LOG(
2314         "Consumer called ChangeTraceConfig() but tracing was "
2315         "not active");
2316     return;
2317   }
2318   service_->ChangeTraceConfig(this, cfg);
2319 }
2320 
StartTracing()2321 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
2322   PERFETTO_DCHECK_THREAD(thread_checker_);
2323   if (!tracing_session_id_) {
2324     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
2325     return;
2326   }
2327   service_->StartTracing(tracing_session_id_);
2328 }
2329 
DisableTracing()2330 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
2331   PERFETTO_DCHECK_THREAD(thread_checker_);
2332   if (!tracing_session_id_) {
2333     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
2334     return;
2335   }
2336   service_->DisableTracing(tracing_session_id_);
2337 }
2338 
ReadBuffers()2339 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
2340   PERFETTO_DCHECK_THREAD(thread_checker_);
2341   if (!tracing_session_id_) {
2342     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
2343     return;
2344   }
2345   service_->ReadBuffers(tracing_session_id_, this);
2346 }
2347 
FreeBuffers()2348 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
2349   PERFETTO_DCHECK_THREAD(thread_checker_);
2350   if (!tracing_session_id_) {
2351     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
2352     return;
2353   }
2354   service_->FreeBuffers(tracing_session_id_);
2355   tracing_session_id_ = 0;
2356 }
2357 
Flush(uint32_t timeout_ms,FlushCallback callback)2358 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
2359                                                      FlushCallback callback) {
2360   PERFETTO_DCHECK_THREAD(thread_checker_);
2361   if (!tracing_session_id_) {
2362     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
2363     return;
2364   }
2365   service_->Flush(tracing_session_id_, timeout_ms, callback);
2366 }
2367 
Detach(const std::string & key)2368 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
2369   PERFETTO_DCHECK_THREAD(thread_checker_);
2370   bool success = service_->DetachConsumer(this, key);
2371   auto weak_this = GetWeakPtr();
2372   task_runner_->PostTask([weak_this, success] {
2373     if (weak_this)
2374       weak_this->consumer_->OnDetach(success);
2375   });
2376 }
2377 
Attach(const std::string & key)2378 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
2379   PERFETTO_DCHECK_THREAD(thread_checker_);
2380   bool success = service_->AttachConsumer(this, key);
2381   auto weak_this = GetWeakPtr();
2382   task_runner_->PostTask([weak_this, success] {
2383     if (!weak_this)
2384       return;
2385     Consumer* consumer = weak_this->consumer_;
2386     TracingSession* session =
2387         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
2388     if (!session) {
2389       consumer->OnAttach(false, TraceConfig());
2390       return;
2391     }
2392     consumer->OnAttach(success, session->config);
2393   });
2394 }
2395 
GetTraceStats()2396 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
2397   PERFETTO_DCHECK_THREAD(thread_checker_);
2398   bool success = false;
2399   TraceStats stats;
2400   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2401   if (session) {
2402     success = true;
2403     stats = service_->GetTraceStats(session);
2404   }
2405   auto weak_this = GetWeakPtr();
2406   task_runner_->PostTask([weak_this, success, stats] {
2407     if (weak_this)
2408       weak_this->consumer_->OnTraceStats(success, stats);
2409   });
2410 }
2411 
ObserveEvents(uint32_t enabled_event_types)2412 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
2413     uint32_t enabled_event_types) {
2414   PERFETTO_DCHECK_THREAD(thread_checker_);
2415   enabled_observable_event_types_ = enabled_event_types;
2416 
2417   if (enabled_observable_event_types_ == ObservableEventType::kNone)
2418     return;
2419 
2420   PERFETTO_DCHECK(enabled_observable_event_types_ ==
2421                   ObservableEventType::kDataSourceInstances);
2422 
2423   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2424   if (!session)
2425     return;
2426 
2427   // Issue initial states
2428   for (const auto& kv : session->data_source_instances) {
2429     ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
2430     PERFETTO_DCHECK(producer);
2431     OnDataSourceInstanceStateChange(*producer, kv.second);
2432   }
2433 }
2434 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)2435 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
2436     const ProducerEndpointImpl& producer,
2437     const DataSourceInstance& instance) {
2438   if (!(enabled_observable_event_types_ &
2439         ObservableEventType::kDataSourceInstances)) {
2440     return;
2441   }
2442 
2443   if (instance.state != DataSourceInstance::CONFIGURED &&
2444       instance.state != DataSourceInstance::STARTED &&
2445       instance.state != DataSourceInstance::STOPPED) {
2446     return;
2447   }
2448 
2449   auto* observable_events = AddObservableEvents();
2450   auto* change = observable_events->add_instance_state_changes();
2451   change->set_producer_name(producer.name_);
2452   change->set_data_source_name(instance.data_source_name);
2453   if (instance.state == DataSourceInstance::STARTED) {
2454     change->set_state(ObservableEvents::DataSourceInstanceStateChange::
2455                           DATA_SOURCE_INSTANCE_STATE_STARTED);
2456   } else {
2457     change->set_state(ObservableEvents::DataSourceInstanceStateChange::
2458                           DATA_SOURCE_INSTANCE_STATE_STOPPED);
2459   }
2460 }
2461 
2462 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
GetWeakPtr()2463 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
2464   PERFETTO_DCHECK_THREAD(thread_checker_);
2465   return weak_ptr_factory_.GetWeakPtr();
2466 }
2467 
2468 ObservableEvents*
AddObservableEvents()2469 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
2470   PERFETTO_DCHECK_THREAD(thread_checker_);
2471   if (!observable_events_) {
2472     observable_events_.reset(new ObservableEvents());
2473     auto weak_this = GetWeakPtr();
2474     task_runner_->PostTask([weak_this] {
2475       if (!weak_this)
2476         return;
2477 
2478       // Move into a temporary to allow reentrancy in OnObservableEvents.
2479       auto observable_events = std::move(weak_this->observable_events_);
2480       weak_this->consumer_->OnObservableEvents(*observable_events);
2481     });
2482   }
2483   return observable_events_.get();
2484 }
2485 
2486 ////////////////////////////////////////////////////////////////////////////////
2487 // TracingServiceImpl::ProducerEndpointImpl implementation
2488 ////////////////////////////////////////////////////////////////////////////////
2489 
ProducerEndpointImpl(ProducerID id,uid_t uid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,bool in_process,bool smb_scraping_enabled)2490 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
2491     ProducerID id,
2492     uid_t uid,
2493     TracingServiceImpl* service,
2494     base::TaskRunner* task_runner,
2495     Producer* producer,
2496     const std::string& producer_name,
2497     bool in_process,
2498     bool smb_scraping_enabled)
2499     : id_(id),
2500       uid_(uid),
2501       service_(service),
2502       task_runner_(task_runner),
2503       producer_(producer),
2504       name_(producer_name),
2505       in_process_(in_process),
2506       smb_scraping_enabled_(smb_scraping_enabled),
2507       weak_ptr_factory_(this) {}
2508 
~ProducerEndpointImpl()2509 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
2510   service_->DisconnectProducer(id_);
2511   producer_->OnDisconnect();
2512 }
2513 
RegisterDataSource(const DataSourceDescriptor & desc)2514 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
2515     const DataSourceDescriptor& desc) {
2516   PERFETTO_DCHECK_THREAD(thread_checker_);
2517   if (desc.name().empty()) {
2518     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2519     return;
2520   }
2521 
2522   service_->RegisterDataSource(id_, desc);
2523 }
2524 
UnregisterDataSource(const std::string & name)2525 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
2526     const std::string& name) {
2527   PERFETTO_DCHECK_THREAD(thread_checker_);
2528   service_->UnregisterDataSource(id_, name);
2529 }
2530 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)2531 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
2532     uint32_t writer_id,
2533     uint32_t target_buffer) {
2534   PERFETTO_DCHECK_THREAD(thread_checker_);
2535   PERFETTO_DCHECK(!buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2536   writers_[static_cast<WriterID>(writer_id)] =
2537       static_cast<BufferID>(target_buffer);
2538 }
2539 
UnregisterTraceWriter(uint32_t writer_id)2540 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
2541     uint32_t writer_id) {
2542   PERFETTO_DCHECK_THREAD(thread_checker_);
2543   PERFETTO_DCHECK(buffer_id_for_writer(static_cast<WriterID>(writer_id)));
2544   writers_.erase(static_cast<WriterID>(writer_id));
2545 }
2546 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)2547 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
2548     const CommitDataRequest& req_untrusted,
2549     CommitDataCallback callback) {
2550   PERFETTO_DCHECK_THREAD(thread_checker_);
2551 
2552   if (!shared_memory_) {
2553     PERFETTO_DLOG(
2554         "Attempted to commit data before the shared memory was allocated.");
2555     return;
2556   }
2557   PERFETTO_DCHECK(shmem_abi_.is_valid());
2558   for (const auto& entry : req_untrusted.chunks_to_move()) {
2559     const uint32_t page_idx = entry.page();
2560     if (page_idx >= shmem_abi_.num_pages())
2561       continue;  // A buggy or malicious producer.
2562 
2563     SharedMemoryABI::Chunk chunk =
2564         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
2565     if (!chunk.is_valid()) {
2566       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
2567                     entry.page(), entry.chunk());
2568       continue;
2569     }
2570 
2571     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
2572     // the ABI contract expects the producer to not touch the chunk anymore
2573     // (until the service marks that as free). This is why all the reads below
2574     // are just memory_order_relaxed. Also, the code here assumes that all this
2575     // data can be malicious and just gives up if anything is malformed.
2576     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
2577     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
2578     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
2579     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
2580     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
2581     uint16_t num_fragments = packets.count;
2582     uint8_t chunk_flags = packets.flags;
2583 
2584     service_->CopyProducerPageIntoLogBuffer(
2585         id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
2586         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
2587 
2588     // This one has release-store semantics.
2589     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
2590   }  // for(chunks_to_move)
2591 
2592   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
2593 
2594   if (req_untrusted.flush_request_id()) {
2595     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
2596   }
2597 
2598   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
2599   // callback being invoked within the same callstack and not posted. If this
2600   // changes, the code there needs to be changed accordingly.
2601   if (callback)
2602     callback();
2603 }
2604 
SetSharedMemory(std::unique_ptr<SharedMemory> shared_memory)2605 void TracingServiceImpl::ProducerEndpointImpl::SetSharedMemory(
2606     std::unique_ptr<SharedMemory> shared_memory) {
2607   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
2608   shared_memory_ = std::move(shared_memory);
2609   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
2610                         shared_memory_->size(),
2611                         shared_buffer_page_size_kb() * 1024);
2612   if (in_process_) {
2613     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
2614         shared_memory_->start(), shared_memory_->size(),
2615         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
2616   }
2617 }
2618 
shared_memory() const2619 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
2620   PERFETTO_DCHECK_THREAD(thread_checker_);
2621   return shared_memory_.get();
2622 }
2623 
shared_buffer_page_size_kb() const2624 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
2625     const {
2626   return shared_buffer_page_size_kb_;
2627 }
2628 
ActivateTriggers(const std::vector<std::string> & triggers)2629 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
2630     const std::vector<std::string>& triggers) {
2631   service_->ActivateTriggers(id_, triggers);
2632 }
2633 
StopDataSource(DataSourceInstanceID ds_inst_id)2634 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
2635     DataSourceInstanceID ds_inst_id) {
2636   // TODO(primiano): When we'll support tearing down the SMB, at this point we
2637   // should send the Producer a TearDownTracing if all its data sources have
2638   // been disabled (see b/77532839 and aosp/655179 PS1).
2639   PERFETTO_DCHECK_THREAD(thread_checker_);
2640   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2641   task_runner_->PostTask([weak_this, ds_inst_id] {
2642     if (weak_this)
2643       weak_this->producer_->StopDataSource(ds_inst_id);
2644   });
2645 }
2646 
2647 SharedMemoryArbiter*
GetInProcessShmemArbiter()2648 TracingServiceImpl::ProducerEndpointImpl::GetInProcessShmemArbiter() {
2649   if (!inproc_shmem_arbiter_) {
2650     PERFETTO_FATAL(
2651         "The in-process SharedMemoryArbiter can only be used when "
2652         "CreateProducer has been called with in_process=true and after tracing "
2653         "has started.");
2654   }
2655 
2656   PERFETTO_DCHECK(in_process_);
2657   return inproc_shmem_arbiter_.get();
2658 }
2659 
2660 // Can be called on any thread.
2661 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id)2662 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
2663   return GetInProcessShmemArbiter()->CreateTraceWriter(buf_id);
2664 }
2665 
NotifyFlushComplete(FlushRequestID id)2666 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
2667     FlushRequestID id) {
2668   PERFETTO_DCHECK_THREAD(thread_checker_);
2669   return GetInProcessShmemArbiter()->NotifyFlushComplete(id);
2670 }
2671 
OnTracingSetup()2672 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
2673   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2674   task_runner_->PostTask([weak_this] {
2675     if (weak_this)
2676       weak_this->producer_->OnTracingSetup();
2677   });
2678 }
2679 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)2680 void TracingServiceImpl::ProducerEndpointImpl::Flush(
2681     FlushRequestID flush_request_id,
2682     const std::vector<DataSourceInstanceID>& data_sources) {
2683   PERFETTO_DCHECK_THREAD(thread_checker_);
2684   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2685   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
2686     if (weak_this) {
2687       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
2688                                   data_sources.size());
2689     }
2690   });
2691 }
2692 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2693 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
2694     DataSourceInstanceID ds_id,
2695     const DataSourceConfig& config) {
2696   PERFETTO_DCHECK_THREAD(thread_checker_);
2697   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
2698   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2699   task_runner_->PostTask([weak_this, ds_id, config] {
2700     if (weak_this)
2701       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
2702   });
2703 }
2704 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)2705 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
2706     DataSourceInstanceID ds_id,
2707     const DataSourceConfig& config) {
2708   PERFETTO_DCHECK_THREAD(thread_checker_);
2709   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2710   task_runner_->PostTask([weak_this, ds_id, config] {
2711     if (weak_this)
2712       weak_this->producer_->StartDataSource(ds_id, std::move(config));
2713   });
2714 }
2715 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)2716 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
2717     DataSourceInstanceID data_source_id) {
2718   PERFETTO_DCHECK_THREAD(thread_checker_);
2719   service_->NotifyDataSourceStarted(id_, data_source_id);
2720 }
2721 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)2722 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
2723     DataSourceInstanceID data_source_id) {
2724   PERFETTO_DCHECK_THREAD(thread_checker_);
2725   service_->NotifyDataSourceStopped(id_, data_source_id);
2726 }
2727 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)2728 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
2729     const std::vector<BufferID>& target_buffers) {
2730   if (allowed_target_buffers_.empty())
2731     return;
2732   for (BufferID buffer : target_buffers)
2733     allowed_target_buffers_.erase(buffer);
2734 }
2735 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)2736 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
2737     const std::vector<DataSourceInstanceID>& data_sources) {
2738   PERFETTO_DCHECK_THREAD(thread_checker_);
2739   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2740   task_runner_->PostTask([weak_this, data_sources] {
2741     if (weak_this) {
2742       weak_this->producer_->ClearIncrementalState(data_sources.data(),
2743                                                   data_sources.size());
2744     }
2745   });
2746 }
2747 
2748 ////////////////////////////////////////////////////////////////////////////////
2749 // TracingServiceImpl::TracingSession implementation
2750 ////////////////////////////////////////////////////////////////////////////////
2751 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config)2752 TracingServiceImpl::TracingSession::TracingSession(
2753     TracingSessionID session_id,
2754     ConsumerEndpointImpl* consumer,
2755     const TraceConfig& new_config)
2756     : id(session_id),
2757       consumer_maybe_null(consumer),
2758       consumer_uid(consumer->uid_),
2759       config(new_config) {}
2760 
2761 }  // namespace perfetto
2762