• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/tracing_service_impl.h"
18 
19 #include "perfetto/base/build_config.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <limits.h>
24 #include <string.h>
25 #include <regex>
26 #include <unordered_set>
27 
28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
29     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
30 #include <sys/uio.h>
31 #include <sys/utsname.h>
32 #include <unistd.h>
33 #endif
34 
35 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
36 #include <sys/system_properties.h>
37 #endif
38 
39 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) || \
40     PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) ||   \
41     PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
42 #define PERFETTO_HAS_CHMOD
43 #include <sys/stat.h>
44 #endif
45 
46 #include <algorithm>
47 
48 #include "perfetto/base/build_config.h"
49 #include "perfetto/base/task_runner.h"
50 #include "perfetto/ext/base/file_utils.h"
51 #include "perfetto/ext/base/metatrace.h"
52 #include "perfetto/ext/base/string_utils.h"
53 #include "perfetto/ext/base/utils.h"
54 #include "perfetto/ext/base/watchdog.h"
55 #include "perfetto/ext/tracing/core/consumer.h"
56 #include "perfetto/ext/tracing/core/observable_events.h"
57 #include "perfetto/ext/tracing/core/producer.h"
58 #include "perfetto/ext/tracing/core/shared_memory.h"
59 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
60 #include "perfetto/ext/tracing/core/trace_packet.h"
61 #include "perfetto/ext/tracing/core/trace_writer.h"
62 #include "perfetto/protozero/scattered_heap_buffer.h"
63 #include "perfetto/protozero/static_buffer.h"
64 #include "perfetto/tracing/core/data_source_descriptor.h"
65 #include "perfetto/tracing/core/tracing_service_capabilities.h"
66 #include "perfetto/tracing/core/tracing_service_state.h"
67 #include "src/tracing/core/packet_stream_validator.h"
68 #include "src/tracing/core/shared_memory_arbiter_impl.h"
69 #include "src/tracing/core/trace_buffer.h"
70 
71 #include "protos/perfetto/common/builtin_clock.gen.h"
72 #include "protos/perfetto/common/builtin_clock.pbzero.h"
73 #include "protos/perfetto/common/trace_stats.pbzero.h"
74 #include "protos/perfetto/config/trace_config.pbzero.h"
75 #include "protos/perfetto/trace/clock_snapshot.pbzero.h"
76 #include "protos/perfetto/trace/perfetto/tracing_service_event.pbzero.h"
77 #include "protos/perfetto/trace/system_info.pbzero.h"
78 #include "protos/perfetto/trace/trace_packet.pbzero.h"
79 #include "protos/perfetto/trace/trigger.pbzero.h"
80 
81 // General note: this class must assume that Producers are malicious and will
82 // try to crash / exploit this class. We can trust pointers because they come
83 // from the IPC layer, but we should never assume that that the producer calls
84 // come in the right order or their arguments are sane / within bounds.
85 
86 namespace perfetto {
87 
88 namespace {
89 constexpr int kMaxBuffersPerConsumer = 128;
90 constexpr uint32_t kDefaultSnapshotsIntervalMs = 10 * 1000;
91 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
92 constexpr int kMaxConcurrentTracingSessions = 15;
93 constexpr int kMaxConcurrentTracingSessionsPerUid = 5;
94 constexpr int kMaxConcurrentTracingSessionsForStatsdUid = 10;
95 constexpr int64_t kMinSecondsBetweenTracesGuardrail = 5 * 60;
96 
97 constexpr uint32_t kMillisPerHour = 3600000;
98 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
99 
100 // These apply only if enable_extra_guardrails is true.
101 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 128 * 1024;
102 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
103 
104 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) || PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
105 struct iovec {
106   void* iov_base;  // Address
107   size_t iov_len;  // Block size
108 };
109 
110 // Simple implementation of writev. Note that this does not give the atomicity
111 // guarantees of a real writev, but we don't depend on these (we aren't writing
112 // to the same file from another thread).
writev(int fd,const struct iovec * iov,int iovcnt)113 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
114   ssize_t total_size = 0;
115   for (int i = 0; i < iovcnt; ++i) {
116     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
117     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
118       return -1;
119     total_size += current_size;
120   }
121   return total_size;
122 }
123 
124 #define IOV_MAX 1024  // Linux compatible limit.
125 
126 // uid checking is a NOP on Windows.
getuid()127 uid_t getuid() {
128   return 0;
129 }
geteuid()130 uid_t geteuid() {
131   return 0;
132 }
133 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) ||
134         // PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
135 
136 // Partially encodes a CommitDataRequest in an int32 for the purposes of
137 // metatracing. Note that it encodes only the bottom 10 bits of the producer id
138 // (which is technically 16 bits wide).
139 //
140 // Format (by bit range):
141 // [   31 ][         30 ][             29:20 ][            19:10 ][        9:0]
142 // [unused][has flush id][num chunks to patch][num chunks to move][producer id]
EncodeCommitDataRequest(ProducerID producer_id,const CommitDataRequest & req_untrusted)143 static int32_t EncodeCommitDataRequest(ProducerID producer_id,
144                                        const CommitDataRequest& req_untrusted) {
145   uint32_t cmov = static_cast<uint32_t>(req_untrusted.chunks_to_move_size());
146   uint32_t cpatch = static_cast<uint32_t>(req_untrusted.chunks_to_patch_size());
147   uint32_t has_flush_id = req_untrusted.flush_request_id() != 0;
148 
149   uint32_t mask = (1 << 10) - 1;
150   uint32_t acc = 0;
151   acc |= has_flush_id << 30;
152   acc |= (cpatch & mask) << 20;
153   acc |= (cmov & mask) << 10;
154   acc |= (producer_id & mask);
155   return static_cast<int32_t>(acc);
156 }
157 
SerializeAndAppendPacket(std::vector<TracePacket> * packets,std::vector<uint8_t> packet)158 void SerializeAndAppendPacket(std::vector<TracePacket>* packets,
159                               std::vector<uint8_t> packet) {
160   Slice slice = Slice::Allocate(packet.size());
161   memcpy(slice.own_data(), packet.data(), packet.size());
162   packets->emplace_back();
163   packets->back().AddSlice(std::move(slice));
164 }
165 
EnsureValidShmSizes(size_t shm_size,size_t page_size)166 std::tuple<size_t /*shm_size*/, size_t /*page_size*/> EnsureValidShmSizes(
167     size_t shm_size,
168     size_t page_size) {
169   // Theoretically the max page size supported by the ABI is 64KB.
170   // However, the current implementation of TraceBuffer (the non-shared
171   // userspace buffer where the service copies data) supports at most
172   // 32K. Setting 64K "works" from the producer<>consumer viewpoint
173   // but then causes the data to be discarded when copying it into
174   // TraceBuffer.
175   constexpr size_t kMaxPageSize = 32 * 1024;
176   static_assert(kMaxPageSize <= SharedMemoryABI::kMaxPageSize, "");
177 
178   if (page_size == 0)
179     page_size = TracingServiceImpl::kDefaultShmPageSize;
180   if (shm_size == 0)
181     shm_size = TracingServiceImpl::kDefaultShmSize;
182 
183   page_size = std::min<size_t>(page_size, kMaxPageSize);
184   shm_size = std::min<size_t>(shm_size, TracingServiceImpl::kMaxShmSize);
185 
186   // Page size has to be multiple of system's page size.
187   bool page_size_is_valid = page_size >= base::kPageSize;
188   page_size_is_valid &= page_size % base::kPageSize == 0;
189 
190   // Only allow power of two numbers of pages, i.e. 1, 2, 4, 8 pages.
191   size_t num_pages = page_size / base::kPageSize;
192   page_size_is_valid &= (num_pages & (num_pages - 1)) == 0;
193 
194   if (!page_size_is_valid || shm_size < page_size ||
195       shm_size % page_size != 0) {
196     return std::make_tuple(TracingServiceImpl::kDefaultShmSize,
197                            TracingServiceImpl::kDefaultShmPageSize);
198   }
199   return std::make_tuple(shm_size, page_size);
200 }
201 
NameMatchesFilter(const std::string & name,const std::vector<std::string> & name_filter,const std::vector<std::string> & name_regex_filter)202 bool NameMatchesFilter(const std::string& name,
203                        const std::vector<std::string>& name_filter,
204                        const std::vector<std::string>& name_regex_filter) {
205   bool filter_is_set = !name_filter.empty() || !name_regex_filter.empty();
206   if (!filter_is_set)
207     return true;
208   bool filter_matches = std::find(name_filter.begin(), name_filter.end(),
209                                   name) != name_filter.end();
210   bool filter_regex_matches =
211       std::find_if(name_regex_filter.begin(), name_regex_filter.end(),
212                    [&](const std::string& regex) {
213                      return std::regex_match(
214                          name, std::regex(regex, std::regex::extended));
215                    }) != name_regex_filter.end();
216   return filter_matches || filter_regex_matches;
217 }
218 
219 // Used when write_into_file == true and output_path is not empty.
CreateTraceFile(const std::string & path)220 base::ScopedFile CreateTraceFile(const std::string& path) {
221 #if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
222   static const char kBase[] = "/data/misc/perfetto-traces/";
223   if (!base::StartsWith(path, kBase) || path.rfind('/') != strlen(kBase) - 1) {
224     PERFETTO_ELOG("Invalid output_path %s. On Android it must be within %s.",
225                   path.c_str(), kBase);
226     return base::ScopedFile();
227   }
228 #endif
229   // O_CREAT | O_EXCL will fail if the file exists already.
230   auto fd = base::OpenFile(path, O_RDWR | O_CREAT | O_EXCL, 0600);
231   if (!fd)
232     PERFETTO_PLOG("Failed to create %s", path.c_str());
233 #if defined(PERFETTO_HAS_CHMOD)
234   // Passing 0644 directly above won't work because of umask.
235   PERFETTO_CHECK(fchmod(*fd, 0644) == 0);
236 #endif
237   return fd;
238 }
239 
240 }  // namespace
241 
242 // These constants instead are defined in the header because are used by tests.
243 constexpr size_t TracingServiceImpl::kDefaultShmSize;
244 constexpr size_t TracingServiceImpl::kDefaultShmPageSize;
245 
246 constexpr size_t TracingServiceImpl::kMaxShmSize;
247 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
248 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
249 
250 // static
CreateInstance(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)251 std::unique_ptr<TracingService> TracingService::CreateInstance(
252     std::unique_ptr<SharedMemory::Factory> shm_factory,
253     base::TaskRunner* task_runner) {
254   return std::unique_ptr<TracingService>(
255       new TracingServiceImpl(std::move(shm_factory), task_runner));
256 }
257 
TracingServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,base::TaskRunner * task_runner)258 TracingServiceImpl::TracingServiceImpl(
259     std::unique_ptr<SharedMemory::Factory> shm_factory,
260     base::TaskRunner* task_runner)
261     : task_runner_(task_runner),
262       shm_factory_(std::move(shm_factory)),
263       uid_(getuid()),
264       buffer_ids_(kMaxTraceBufferID),
265       weak_ptr_factory_(this) {
266   PERFETTO_DCHECK(task_runner_);
267 }
268 
~TracingServiceImpl()269 TracingServiceImpl::~TracingServiceImpl() {
270   // TODO(fmayer): handle teardown of all Producer.
271 }
272 
273 std::unique_ptr<TracingService::ProducerEndpoint>
ConnectProducer(Producer * producer,uid_t uid,const std::string & producer_name,size_t shared_memory_size_hint_bytes,bool in_process,ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm)274 TracingServiceImpl::ConnectProducer(Producer* producer,
275                                     uid_t uid,
276                                     const std::string& producer_name,
277                                     size_t shared_memory_size_hint_bytes,
278                                     bool in_process,
279                                     ProducerSMBScrapingMode smb_scraping_mode,
280                                     size_t shared_memory_page_size_hint_bytes,
281                                     std::unique_ptr<SharedMemory> shm) {
282   PERFETTO_DCHECK_THREAD(thread_checker_);
283 
284   if (lockdown_mode_ && uid != geteuid()) {
285     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
286                   static_cast<unsigned long>(uid));
287     return nullptr;
288   }
289 
290   if (producers_.size() >= kMaxProducerID) {
291     PERFETTO_DFATAL("Too many producers.");
292     return nullptr;
293   }
294   const ProducerID id = GetNextProducerID();
295   PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
296 
297   bool smb_scraping_enabled = smb_scraping_enabled_;
298   switch (smb_scraping_mode) {
299     case ProducerSMBScrapingMode::kDefault:
300       break;
301     case ProducerSMBScrapingMode::kEnabled:
302       smb_scraping_enabled = true;
303       break;
304     case ProducerSMBScrapingMode::kDisabled:
305       smb_scraping_enabled = false;
306       break;
307   }
308 
309   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
310       id, uid, this, task_runner_, producer, producer_name, in_process,
311       smb_scraping_enabled));
312   auto it_and_inserted = producers_.emplace(id, endpoint.get());
313   PERFETTO_DCHECK(it_and_inserted.second);
314   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
315   endpoint->shmem_page_size_hint_bytes_ = shared_memory_page_size_hint_bytes;
316 
317   // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
318   // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
319   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
320 
321   if (shm) {
322     // The producer supplied an SMB. This is used only by Chrome; in the most
323     // common cases the SMB is created by the service and passed via
324     // OnTracingSetup(). Verify that it is correctly sized before we attempt to
325     // use it. The transport layer has to verify the integrity of the SMB (e.g.
326     // ensure that the producer can't resize if after the fact).
327     size_t shm_size, page_size;
328     std::tie(shm_size, page_size) =
329         EnsureValidShmSizes(shm->size(), endpoint->shmem_page_size_hint_bytes_);
330     if (shm_size == shm->size() &&
331         page_size == endpoint->shmem_page_size_hint_bytes_) {
332       PERFETTO_DLOG(
333           "Adopting producer-provided SMB of %zu kB for producer \"%s\"",
334           shm_size / 1024, endpoint->name_.c_str());
335       endpoint->SetupSharedMemory(std::move(shm), page_size,
336                                   /*provided_by_producer=*/true);
337     } else {
338       PERFETTO_LOG(
339           "Discarding incorrectly sized producer-provided SMB for producer "
340           "\"%s\", falling back to service-provided SMB. Requested sizes: %zu "
341           "B total, %zu B page size; suggested corrected sizes: %zu B total, "
342           "%zu B page size",
343           endpoint->name_.c_str(), shm->size(),
344           endpoint->shmem_page_size_hint_bytes_, shm_size, page_size);
345       shm.reset();
346     }
347   }
348 
349   return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
350 }
351 
DisconnectProducer(ProducerID id)352 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
353   PERFETTO_DCHECK_THREAD(thread_checker_);
354   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
355   PERFETTO_DCHECK(producers_.count(id));
356 
357   // Scrape remaining chunks for this producer to ensure we don't lose data.
358   if (auto* producer = GetProducer(id)) {
359     for (auto& session_id_and_session : tracing_sessions_)
360       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
361   }
362 
363   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
364     auto next = it;
365     next++;
366     if (it->second.producer_id == id)
367       UnregisterDataSource(id, it->second.descriptor.name());
368     it = next;
369   }
370 
371   producers_.erase(id);
372   UpdateMemoryGuardrail();
373 }
374 
GetProducer(ProducerID id) const375 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
376     ProducerID id) const {
377   PERFETTO_DCHECK_THREAD(thread_checker_);
378   auto it = producers_.find(id);
379   if (it == producers_.end())
380     return nullptr;
381   return it->second;
382 }
383 
384 std::unique_ptr<TracingService::ConsumerEndpoint>
ConnectConsumer(Consumer * consumer,uid_t uid)385 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
386   PERFETTO_DCHECK_THREAD(thread_checker_);
387   PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
388   std::unique_ptr<ConsumerEndpointImpl> endpoint(
389       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
390   auto it_and_inserted = consumers_.emplace(endpoint.get());
391   PERFETTO_DCHECK(it_and_inserted.second);
392   // Consumer might go away before we're able to send the connect notification,
393   // if that is the case just bail out.
394   auto weak_ptr = endpoint->GetWeakPtr();
395   task_runner_->PostTask([weak_ptr] {
396     if (!weak_ptr) {
397       return;
398     }
399     weak_ptr->consumer_->OnConnect();
400   });
401   return std::unique_ptr<ConsumerEndpoint>(std::move(endpoint));
402 }
403 
DisconnectConsumer(ConsumerEndpointImpl * consumer)404 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
405   PERFETTO_DCHECK_THREAD(thread_checker_);
406   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
407   PERFETTO_DCHECK(consumers_.count(consumer));
408 
409   // TODO(primiano) : Check that this is safe (what happens if there are
410   // ReadBuffers() calls posted in the meantime? They need to become noop).
411   if (consumer->tracing_session_id_)
412     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
413   consumers_.erase(consumer);
414 
415   // At this point no more pointers to |consumer| should be around.
416   PERFETTO_DCHECK(!std::any_of(
417       tracing_sessions_.begin(), tracing_sessions_.end(),
418       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
419         return kv.second.consumer_maybe_null == consumer;
420       }));
421 }
422 
DetachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)423 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
424                                         const std::string& key) {
425   PERFETTO_DCHECK_THREAD(thread_checker_);
426   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
427   PERFETTO_DCHECK(consumers_.count(consumer));
428 
429   TracingSessionID tsid = consumer->tracing_session_id_;
430   TracingSession* tracing_session;
431   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
432     return false;
433 
434   if (GetDetachedSession(consumer->uid_, key)) {
435     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
436                   key.c_str());
437     return false;
438   }
439 
440   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
441   tracing_session->consumer_maybe_null = nullptr;
442   tracing_session->detach_key = key;
443   consumer->tracing_session_id_ = 0;
444   return true;
445 }
446 
AttachConsumer(ConsumerEndpointImpl * consumer,const std::string & key)447 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
448                                         const std::string& key) {
449   PERFETTO_DCHECK_THREAD(thread_checker_);
450   PERFETTO_DLOG("Consumer %p attaching to session %s",
451                 reinterpret_cast<void*>(consumer), key.c_str());
452   PERFETTO_DCHECK(consumers_.count(consumer));
453 
454   if (consumer->tracing_session_id_) {
455     PERFETTO_ELOG(
456         "Cannot reattach consumer to session %s"
457         " while it already attached tracing session ID %" PRIu64,
458         key.c_str(), consumer->tracing_session_id_);
459     return false;
460   }
461 
462   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
463   if (!tracing_session) {
464     PERFETTO_ELOG(
465         "Failed to attach consumer, session '%s' not found for uid %d",
466         key.c_str(), static_cast<int>(consumer->uid_));
467     return false;
468   }
469 
470   consumer->tracing_session_id_ = tracing_session->id;
471   tracing_session->consumer_maybe_null = consumer;
472   tracing_session->detach_key.clear();
473   return true;
474 }
475 
EnableTracing(ConsumerEndpointImpl * consumer,const TraceConfig & cfg,base::ScopedFile fd)476 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
477                                        const TraceConfig& cfg,
478                                        base::ScopedFile fd) {
479   PERFETTO_DCHECK_THREAD(thread_checker_);
480   PERFETTO_DLOG("Enabling tracing for consumer %p",
481                 reinterpret_cast<void*>(consumer));
482   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_SET)
483     lockdown_mode_ = true;
484   if (cfg.lockdown_mode() == TraceConfig::LOCKDOWN_CLEAR)
485     lockdown_mode_ = false;
486   TracingSession* tracing_session =
487       GetTracingSession(consumer->tracing_session_id_);
488   if (tracing_session) {
489     PERFETTO_DLOG(
490         "A Consumer is trying to EnableTracing() but another tracing session "
491         "is already active (forgot a call to FreeBuffers() ?)");
492     return false;
493   }
494 
495   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
496                                        ? kGuardrailsMaxTracingDurationMillis
497                                        : kMaxTracingDurationMillis;
498   if (cfg.duration_ms() > max_duration_ms) {
499     PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu32 " ms)",
500                   cfg.duration_ms(), max_duration_ms);
501     return false;
502   }
503 
504   const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
505                                   TraceConfig::TriggerConfig::UNSPECIFIED;
506   if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
507                              cfg.trigger_config().trigger_timeout_ms() >
508                                  kGuardrailsMaxTracingDurationMillis)) {
509     PERFETTO_ELOG(
510         "Traces with START_TRACING triggers must provide a positive "
511         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
512         cfg.trigger_config().trigger_timeout_ms());
513     return false;
514   }
515 
516   if (has_trigger_config && cfg.duration_ms() != 0) {
517     PERFETTO_ELOG(
518         "duration_ms was set, this must not be set for traces with triggers.");
519     return false;
520   }
521 
522   std::unordered_set<std::string> triggers;
523   for (const auto& trigger : cfg.trigger_config().triggers()) {
524     if (!triggers.insert(trigger.name()).second) {
525       PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
526       return false;
527     }
528   }
529 
530   if (cfg.enable_extra_guardrails()) {
531     if (cfg.deferred_start()) {
532       PERFETTO_ELOG(
533           "deferred_start=true is not supported in unsupervised traces");
534       return false;
535     }
536     uint64_t buf_size_sum = 0;
537     for (const auto& buf : cfg.buffers())
538       buf_size_sum += buf.size_kb();
539     if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
540       PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
541                     "kB  > %" PRIu32 " kB)",
542                     buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
543       return false;
544     }
545   }
546 
547   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
548     PERFETTO_ELOG("Too many buffers configured (%d)", cfg.buffers_size());
549     return false;
550   }
551 
552   if (!cfg.unique_session_name().empty()) {
553     const std::string& name = cfg.unique_session_name();
554     for (auto& kv : tracing_sessions_) {
555       if (kv.second.config.unique_session_name() == name) {
556         PERFETTO_ELOG(
557             "A trace with this unique session name (%s) already exists",
558             name.c_str());
559         return false;
560       }
561     }
562   }
563 
564   if (cfg.enable_extra_guardrails()) {
565     // unique_session_name can be empty
566     const std::string& name = cfg.unique_session_name();
567     int64_t now_s = base::GetBootTimeS().count();
568 
569     // Remove any entries where the time limit has passed so this map doesn't
570     // grow indefinitely:
571     std::map<std::string, int64_t>& sessions = session_to_last_trace_s_;
572     for (auto it = sessions.cbegin(); it != sessions.cend();) {
573       if (now_s - it->second > kMinSecondsBetweenTracesGuardrail) {
574         it = sessions.erase(it);
575       } else {
576         ++it;
577       }
578     }
579 
580     int64_t& previous_s = session_to_last_trace_s_[name];
581     if (previous_s == 0) {
582       previous_s = now_s;
583     } else {
584       PERFETTO_ELOG(
585           "A trace with unique session name \"%s\" began less than %" PRId64
586           "s ago (%" PRId64 "s)",
587           name.c_str(), kMinSecondsBetweenTracesGuardrail, now_s - previous_s);
588       return false;
589     }
590   }
591 
592   const long sessions_for_uid = std::count_if(
593       tracing_sessions_.begin(), tracing_sessions_.end(),
594       [consumer](const decltype(tracing_sessions_)::value_type& s) {
595         return s.second.consumer_uid == consumer->uid_;
596       });
597 
598   int per_uid_limit = kMaxConcurrentTracingSessionsPerUid;
599   if (consumer->uid_ == 1066 /* AID_STATSD*/) {
600     per_uid_limit = kMaxConcurrentTracingSessionsForStatsdUid;
601   }
602   if (sessions_for_uid >= per_uid_limit) {
603     PERFETTO_ELOG(
604         "Too many concurrent tracing sesions (%ld) for uid %d limit is %d",
605         sessions_for_uid, static_cast<int>(consumer->uid_), per_uid_limit);
606     return false;
607   }
608 
609   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
610   // in a state where it stalls by design by having more TraceWriterImpl
611   // instances than free pages in the buffer. This is really a bug in
612   // trace_probes and the way it handles stalls in the shmem buffer.
613   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
614     PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
615                   tracing_sessions_.size());
616     return false;
617   }
618 
619   const TracingSessionID tsid = ++last_tracing_session_id_;
620   tracing_session =
621       &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
622            .first->second;
623 
624   if (cfg.write_into_file()) {
625     if (!fd ^ !cfg.output_path().empty()) {
626       PERFETTO_ELOG(
627           "When write_into_file==true either a FD needs to be passed or "
628           "output_path must be populated (but not both)");
629       tracing_sessions_.erase(tsid);
630       return false;
631     }
632     if (!cfg.output_path().empty()) {
633       fd = CreateTraceFile(cfg.output_path());
634       if (!fd) {
635         tracing_sessions_.erase(tsid);
636         return false;
637       }
638     }
639     tracing_session->write_into_file = std::move(fd);
640     uint32_t write_period_ms = cfg.file_write_period_ms();
641     if (write_period_ms == 0)
642       write_period_ms = kDefaultWriteIntoFilePeriodMs;
643     if (write_period_ms < min_write_period_ms_)
644       write_period_ms = min_write_period_ms_;
645     tracing_session->write_period_ms = write_period_ms;
646     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
647     tracing_session->bytes_written_into_file = 0;
648   }
649 
650   // Initialize the log buffers.
651   bool did_allocate_all_buffers = true;
652 
653   // Allocate the trace buffers. Also create a map to translate a consumer
654   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
655   // corresponding BufferID, which is a global ID namespace for the service and
656   // all producers.
657   size_t total_buf_size_kb = 0;
658   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
659   tracing_session->buffers_index.reserve(num_buffers);
660   for (size_t i = 0; i < num_buffers; i++) {
661     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
662     BufferID global_id = buffer_ids_.Allocate();
663     if (!global_id) {
664       did_allocate_all_buffers = false;  // We ran out of IDs.
665       break;
666     }
667     tracing_session->buffers_index.push_back(global_id);
668     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
669     total_buf_size_kb += buffer_cfg.size_kb();
670     TraceBuffer::OverwritePolicy policy =
671         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
672             ? TraceBuffer::kDiscard
673             : TraceBuffer::kOverwrite;
674     auto it_and_inserted = buffers_.emplace(
675         global_id, TraceBuffer::Create(buf_size_bytes, policy));
676     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
677     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
678     if (!trace_buffer) {
679       did_allocate_all_buffers = false;
680       break;
681     }
682   }
683 
684   UpdateMemoryGuardrail();
685 
686   // This can happen if either:
687   // - All the kMaxTraceBufferID slots are taken.
688   // - OOM, or, more relistically, we exhausted virtual memory.
689   // In any case, free all the previously allocated buffers and abort.
690   // TODO(fmayer): add a test to cover this case, this is quite subtle.
691   if (!did_allocate_all_buffers) {
692     for (BufferID global_id : tracing_session->buffers_index) {
693       buffer_ids_.Free(global_id);
694       buffers_.erase(global_id);
695     }
696     tracing_sessions_.erase(tsid);
697     return false;
698   }
699 
700   consumer->tracing_session_id_ = tsid;
701 
702   // Setup the data sources on the producers without starting them.
703   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
704     // Scan all the registered data sources with a matching name.
705     auto range = data_sources_.equal_range(cfg_data_source.config().name());
706     for (auto it = range.first; it != range.second; it++) {
707       TraceConfig::ProducerConfig producer_config;
708       for (auto& config : cfg.producers()) {
709         if (GetProducer(it->second.producer_id)->name_ ==
710             config.producer_name()) {
711           producer_config = config;
712           break;
713         }
714       }
715       SetupDataSource(cfg_data_source, producer_config, it->second,
716                       tracing_session);
717     }
718   }
719 
720   bool has_start_trigger = false;
721   auto weak_this = weak_ptr_factory_.GetWeakPtr();
722   switch (cfg.trigger_config().trigger_mode()) {
723     case TraceConfig::TriggerConfig::UNSPECIFIED:
724       // no triggers are specified so this isn't a trace that is using triggers.
725       PERFETTO_DCHECK(!has_trigger_config);
726       break;
727     case TraceConfig::TriggerConfig::START_TRACING:
728       // For traces which use START_TRACE triggers we need to ensure that the
729       // tracing session will be cleaned up when it times out.
730       has_start_trigger = true;
731       task_runner_->PostDelayedTask(
732           [weak_this, tsid]() {
733             if (weak_this)
734               weak_this->OnStartTriggersTimeout(tsid);
735           },
736           cfg.trigger_config().trigger_timeout_ms());
737       break;
738     case TraceConfig::TriggerConfig::STOP_TRACING:
739       // Update the tracing_session's duration_ms to ensure that if no trigger
740       // is received the session will end and be cleaned up equal to the
741       // timeout.
742       //
743       // TODO(nuskos): Refactor this so that rather then modifying the config we
744       // have a field we look at on the tracing_session.
745       tracing_session->config.set_duration_ms(
746           cfg.trigger_config().trigger_timeout_ms());
747       break;
748   }
749 
750   tracing_session->state = TracingSession::CONFIGURED;
751   PERFETTO_LOG(
752       "Configured tracing session %" PRIu64
753       ", #sources:%zu, duration:%d ms, #buffers:%d, total "
754       "buffer size:%zu KB, total sessions:%zu, uid:%d session name: \"%s\"",
755       tsid, cfg.data_sources().size(), tracing_session->config.duration_ms(),
756       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size(),
757       static_cast<unsigned int>(consumer->uid_),
758       cfg.unique_session_name().c_str());
759 
760   // Start the data sources, unless this is a case of early setup + fast
761   // triggering, either through TraceConfig.deferred_start or
762   // TraceConfig.trigger_config(). If both are specified which ever one occurs
763   // first will initiate the trace.
764   if (!cfg.deferred_start() && !has_start_trigger)
765     return StartTracing(tsid);
766 
767   return true;
768 }
769 
ChangeTraceConfig(ConsumerEndpointImpl * consumer,const TraceConfig & updated_cfg)770 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
771                                            const TraceConfig& updated_cfg) {
772   PERFETTO_DCHECK_THREAD(thread_checker_);
773   TracingSession* tracing_session =
774       GetTracingSession(consumer->tracing_session_id_);
775   PERFETTO_DCHECK(tracing_session);
776 
777   if ((tracing_session->state != TracingSession::STARTED) &&
778       (tracing_session->state != TracingSession::CONFIGURED)) {
779     PERFETTO_ELOG(
780         "ChangeTraceConfig() was called for a tracing session which isn't "
781         "running.");
782     return;
783   }
784 
785   // We only support updating producer_name_{,regex}_filter (and pass-through
786   // configs) for now; null out any changeable fields and make sure the rest are
787   // identical.
788   TraceConfig new_config_copy(updated_cfg);
789   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
790     ds_cfg.clear_producer_name_filter();
791     ds_cfg.clear_producer_name_regex_filter();
792   }
793 
794   TraceConfig current_config_copy(tracing_session->config);
795   for (auto& ds_cfg : *current_config_copy.mutable_data_sources()) {
796     ds_cfg.clear_producer_name_filter();
797     ds_cfg.clear_producer_name_regex_filter();
798   }
799 
800   if (new_config_copy != current_config_copy) {
801     PERFETTO_LOG(
802         "ChangeTraceConfig() was called with a config containing unsupported "
803         "changes; only adding to the producer_name_{,regex}_filter is "
804         "currently supported and will have an effect.");
805   }
806 
807   for (TraceConfig::DataSource& cfg_data_source :
808        *tracing_session->config.mutable_data_sources()) {
809     // Find the updated producer_filter in the new config.
810     std::vector<std::string> new_producer_name_filter;
811     std::vector<std::string> new_producer_name_regex_filter;
812     bool found_data_source = false;
813     for (auto it : updated_cfg.data_sources()) {
814       if (cfg_data_source.config().name() == it.config().name()) {
815         new_producer_name_filter = it.producer_name_filter();
816         new_producer_name_regex_filter = it.producer_name_regex_filter();
817         found_data_source = true;
818         break;
819       }
820     }
821 
822     // Bail out if data source not present in the new config.
823     if (!found_data_source) {
824       PERFETTO_ELOG(
825           "ChangeTraceConfig() called without a current data source also "
826           "present in the new config: %s",
827           cfg_data_source.config().name().c_str());
828       continue;
829     }
830 
831     // TODO(oysteine): Just replacing the filter means that if
832     // there are any filter entries which were present in the original config,
833     // but removed from the config passed to ChangeTraceConfig, any matching
834     // producers will keep producing but newly added producers after this
835     // point will never start.
836     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
837     *cfg_data_source.mutable_producer_name_regex_filter() =
838         new_producer_name_regex_filter;
839 
840     // Scan all the registered data sources with a matching name.
841     auto range = data_sources_.equal_range(cfg_data_source.config().name());
842     for (auto it = range.first; it != range.second; it++) {
843       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
844       PERFETTO_DCHECK(producer);
845 
846       // Check if the producer name of this data source is present
847       // in the name filters. We currently only support new filters, not
848       // removing old ones.
849       if (!NameMatchesFilter(producer->name_, new_producer_name_filter,
850                              new_producer_name_regex_filter)) {
851         continue;
852       }
853 
854       bool already_setup = false;
855       auto& ds_instances = tracing_session->data_source_instances;
856       for (auto instance_it = ds_instances.begin();
857            instance_it != ds_instances.end(); ++instance_it) {
858         if (instance_it->first == it->second.producer_id &&
859             instance_it->second.data_source_name ==
860                 cfg_data_source.config().name()) {
861           already_setup = true;
862           break;
863         }
864       }
865 
866       if (already_setup)
867         continue;
868 
869       // If it wasn't previously setup, set it up now.
870       // (The per-producer config is optional).
871       TraceConfig::ProducerConfig producer_config;
872       for (auto& config : tracing_session->config.producers()) {
873         if (producer->name_ == config.producer_name()) {
874           producer_config = config;
875           break;
876         }
877       }
878 
879       DataSourceInstance* ds_inst = SetupDataSource(
880           cfg_data_source, producer_config, it->second, tracing_session);
881 
882       if (ds_inst && tracing_session->state == TracingSession::STARTED)
883         StartDataSourceInstance(producer, tracing_session, ds_inst);
884     }
885   }
886 }
887 
StartTracing(TracingSessionID tsid)888 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
889   PERFETTO_DCHECK_THREAD(thread_checker_);
890   TracingSession* tracing_session = GetTracingSession(tsid);
891   if (!tracing_session) {
892     PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
893     return false;
894   }
895 
896   if (tracing_session->state != TracingSession::CONFIGURED) {
897     PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
898                   tracing_session->state);
899     return false;
900   }
901 
902   tracing_session->state = TracingSession::STARTED;
903 
904   // Periodically snapshot clocks, stats, sync markers while the trace is
905   // active. The snapshots are emitted on the future ReadBuffers() calls, which
906   // means that:
907   //  (a) If we're streaming to a file (or to a consumer) while tracing, we
908   //      write snapshots periodically into the trace.
909   //  (b) If ReadBuffers() is only called after tracing ends, we emit the latest
910   //      snapshot into the trace. For clock snapshots, we keep track of a
911   //      snapshot recorded at the beginning of the session as well as the most
912   //      recent snapshot that showed significant new drift between different
913   //      clocks. This way, we emit useful snapshots for both stop-when-full and
914   //      ring-buffer tracing modes.
915   PeriodicSnapshotTask(tracing_session, /*is_initial_snapshot=*/true);
916 
917   // Trigger delayed task if the trace is time limited.
918   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
919   if (trace_duration_ms > 0) {
920     auto weak_this = weak_ptr_factory_.GetWeakPtr();
921     task_runner_->PostDelayedTask(
922         [weak_this, tsid] {
923           // Skip entirely the flush if the trace session doesn't exist anymore.
924           // This is to prevent misleading error messages to be logged.
925           if (!weak_this)
926             return;
927           auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
928           if (!tracing_session_ptr)
929             return;
930           // If this trace was using STOP_TRACING triggers and we've seen
931           // one, then the trigger overrides the normal timeout. In this
932           // case we just return and let the other task clean up this trace.
933           if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
934                   TraceConfig::TriggerConfig::STOP_TRACING &&
935               !tracing_session_ptr->received_triggers.empty())
936             return;
937           // In all other cases (START_TRACING or no triggers) we flush
938           // after |trace_duration_ms| unconditionally.
939           weak_this->FlushAndDisableTracing(tsid);
940         },
941         trace_duration_ms);
942   }
943 
944   // Start the periodic drain tasks if we should to save the trace into a file.
945   if (tracing_session->config.write_into_file()) {
946     auto weak_this = weak_ptr_factory_.GetWeakPtr();
947     task_runner_->PostDelayedTask(
948         [weak_this, tsid] {
949           if (weak_this)
950             weak_this->ReadBuffers(tsid, nullptr);
951         },
952         tracing_session->delay_to_next_write_period_ms());
953   }
954 
955   // Start the periodic flush tasks if the config specified a flush period.
956   if (tracing_session->config.flush_period_ms())
957     PeriodicFlushTask(tsid, /*post_next_only=*/true);
958 
959   // Start the periodic incremental state clear tasks if the config specified a
960   // period.
961   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
962     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
963   }
964 
965   for (auto& kv : tracing_session->data_source_instances) {
966     ProducerID producer_id = kv.first;
967     DataSourceInstance& data_source = kv.second;
968     ProducerEndpointImpl* producer = GetProducer(producer_id);
969     if (!producer) {
970       PERFETTO_DFATAL("Producer does not exist.");
971       continue;
972     }
973     StartDataSourceInstance(producer, tracing_session, &data_source);
974   }
975   return true;
976 }
977 
StartDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,TracingServiceImpl::DataSourceInstance * instance)978 void TracingServiceImpl::StartDataSourceInstance(
979     ProducerEndpointImpl* producer,
980     TracingSession* tracing_session,
981     TracingServiceImpl::DataSourceInstance* instance) {
982   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
983   if (instance->will_notify_on_start) {
984     instance->state = DataSourceInstance::STARTING;
985   } else {
986     instance->state = DataSourceInstance::STARTED;
987   }
988   if (tracing_session->consumer_maybe_null) {
989     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
990         *producer, *instance);
991   }
992   producer->StartDataSource(instance->instance_id, instance->config);
993 
994   // If all data sources are started, notify the consumer.
995   if (instance->state == DataSourceInstance::STARTED)
996     MaybeNotifyAllDataSourcesStarted(tracing_session);
997 }
998 
999 // DisableTracing just stops the data sources but doesn't free up any buffer.
1000 // This is to allow the consumer to freeze the buffers (by stopping the trace)
1001 // and then drain the buffers. The actual teardown of the TracingSession happens
1002 // in FreeBuffers().
DisableTracing(TracingSessionID tsid,bool disable_immediately)1003 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
1004                                         bool disable_immediately) {
1005   PERFETTO_DCHECK_THREAD(thread_checker_);
1006   TracingSession* tracing_session = GetTracingSession(tsid);
1007   if (!tracing_session) {
1008     // Can happen if the consumer calls this before EnableTracing() or after
1009     // FreeBuffers().
1010     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
1011     return;
1012   }
1013 
1014   switch (tracing_session->state) {
1015     // Spurious call to DisableTracing() while already disabled, nothing to do.
1016     case TracingSession::DISABLED:
1017       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1018       return;
1019 
1020     // This is either:
1021     // A) The case of a graceful DisableTracing() call followed by a call to
1022     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
1023     //    to forcefully transition in the disabled state without waiting for the
1024     //    outstanding acks because the buffers are going to be destroyed soon.
1025     // B) A spurious call, iff |disable_immediately| == false, in which case
1026     //    there is nothing to do.
1027     case TracingSession::DISABLING_WAITING_STOP_ACKS:
1028       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1029       if (disable_immediately)
1030         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1031       return;
1032 
1033     // Continues below.
1034     case TracingSession::CONFIGURED:
1035       // If the session didn't even start there is no need to orchestrate a
1036       // graceful stop of data sources.
1037       disable_immediately = true;
1038       break;
1039 
1040     // This is the nominal case, continues below.
1041     case TracingSession::STARTED:
1042       break;
1043   }
1044 
1045   for (auto& data_source_inst : tracing_session->data_source_instances) {
1046     const ProducerID producer_id = data_source_inst.first;
1047     DataSourceInstance& instance = data_source_inst.second;
1048     ProducerEndpointImpl* producer = GetProducer(producer_id);
1049     PERFETTO_DCHECK(producer);
1050     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
1051                     instance.state == DataSourceInstance::STARTING ||
1052                     instance.state == DataSourceInstance::STARTED);
1053     StopDataSourceInstance(producer, tracing_session, &instance,
1054                            disable_immediately);
1055   }
1056 
1057   // Either this request is flagged with |disable_immediately| or there are no
1058   // data sources that are requesting a final handshake. In both cases just mark
1059   // the session as disabled immediately, notify the consumer and flush the
1060   // trace file (if used).
1061   if (tracing_session->AllDataSourceInstancesStopped())
1062     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1063 
1064   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
1065   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1066   task_runner_->PostDelayedTask(
1067       [weak_this, tsid] {
1068         if (weak_this)
1069           weak_this->OnDisableTracingTimeout(tsid);
1070       },
1071       tracing_session->data_source_stop_timeout_ms());
1072 
1073   // Deliberately NOT removing the session from |tracing_session_|, it's still
1074   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
1075 }
1076 
NotifyDataSourceStarted(ProducerID producer_id,DataSourceInstanceID instance_id)1077 void TracingServiceImpl::NotifyDataSourceStarted(
1078     ProducerID producer_id,
1079     DataSourceInstanceID instance_id) {
1080   PERFETTO_DCHECK_THREAD(thread_checker_);
1081   for (auto& kv : tracing_sessions_) {
1082     TracingSession& tracing_session = kv.second;
1083     DataSourceInstance* instance =
1084         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1085 
1086     if (!instance)
1087       continue;
1088 
1089     // If the tracing session was already stopped, ignore this notification.
1090     if (tracing_session.state != TracingSession::STARTED)
1091       continue;
1092 
1093     if (instance->state != DataSourceInstance::STARTING) {
1094       PERFETTO_ELOG("Started data source instance in incorrect state: %d",
1095                     instance->state);
1096       continue;
1097     }
1098 
1099     instance->state = DataSourceInstance::STARTED;
1100 
1101     ProducerEndpointImpl* producer = GetProducer(producer_id);
1102     PERFETTO_DCHECK(producer);
1103     if (tracing_session.consumer_maybe_null) {
1104       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1105           *producer, *instance);
1106     }
1107 
1108     // If all data sources are started, notify the consumer.
1109     MaybeNotifyAllDataSourcesStarted(&tracing_session);
1110   }  // for (tracing_session)
1111 }
1112 
MaybeNotifyAllDataSourcesStarted(TracingSession * tracing_session)1113 void TracingServiceImpl::MaybeNotifyAllDataSourcesStarted(
1114     TracingSession* tracing_session) {
1115   if (!tracing_session->consumer_maybe_null)
1116     return;
1117 
1118   if (!tracing_session->AllDataSourceInstancesStarted())
1119     return;
1120 
1121   // In some rare cases, we can get in this state more than once. Consider the
1122   // following scenario: 3 data sources are registered -> trace starts ->
1123   // all 3 data sources ack -> OnAllDataSourcesStarted() is called.
1124   // Imagine now that a 4th data source registers while the trace is ongoing.
1125   // This would hit the AllDataSourceInstancesStarted() condition again.
1126   // In this case, however, we don't want to re-notify the consumer again.
1127   // That would be unexpected (even if, perhaps, technically correct) and
1128   // trigger bugs in the consumer.
1129   if (tracing_session->did_notify_all_data_source_started)
1130     return;
1131 
1132   PERFETTO_DLOG("All data sources started");
1133   tracing_session->did_notify_all_data_source_started = true;
1134   tracing_session->time_all_data_source_started = base::GetBootTimeNs();
1135   tracing_session->consumer_maybe_null->OnAllDataSourcesStarted();
1136 }
1137 
NotifyDataSourceStopped(ProducerID producer_id,DataSourceInstanceID instance_id)1138 void TracingServiceImpl::NotifyDataSourceStopped(
1139     ProducerID producer_id,
1140     DataSourceInstanceID instance_id) {
1141   PERFETTO_DCHECK_THREAD(thread_checker_);
1142   for (auto& kv : tracing_sessions_) {
1143     TracingSession& tracing_session = kv.second;
1144     DataSourceInstance* instance =
1145         tracing_session.GetDataSourceInstance(producer_id, instance_id);
1146 
1147     if (!instance)
1148       continue;
1149 
1150     if (instance->state != DataSourceInstance::STOPPING) {
1151       PERFETTO_ELOG("Stopped data source instance in incorrect state: %d",
1152                     instance->state);
1153       continue;
1154     }
1155 
1156     instance->state = DataSourceInstance::STOPPED;
1157 
1158     ProducerEndpointImpl* producer = GetProducer(producer_id);
1159     PERFETTO_DCHECK(producer);
1160     if (tracing_session.consumer_maybe_null) {
1161       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
1162           *producer, *instance);
1163     }
1164 
1165     if (!tracing_session.AllDataSourceInstancesStopped())
1166       continue;
1167 
1168     if (tracing_session.state != TracingSession::DISABLING_WAITING_STOP_ACKS)
1169       continue;
1170 
1171     // All data sources acked the termination.
1172     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
1173   }  // for (tracing_session)
1174 }
1175 
ActivateTriggers(ProducerID producer_id,const std::vector<std::string> & triggers)1176 void TracingServiceImpl::ActivateTriggers(
1177     ProducerID producer_id,
1178     const std::vector<std::string>& triggers) {
1179   PERFETTO_DCHECK_THREAD(thread_checker_);
1180   auto* producer = GetProducer(producer_id);
1181   PERFETTO_DCHECK(producer);
1182   for (const auto& trigger_name : triggers) {
1183     for (auto& id_and_tracing_session : tracing_sessions_) {
1184       auto& tracing_session = id_and_tracing_session.second;
1185       TracingSessionID tsid = id_and_tracing_session.first;
1186       auto iter = std::find_if(
1187           tracing_session.config.trigger_config().triggers().begin(),
1188           tracing_session.config.trigger_config().triggers().end(),
1189           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
1190             return trigger.name() == trigger_name;
1191           });
1192       if (iter == tracing_session.config.trigger_config().triggers().end()) {
1193         continue;
1194       }
1195 
1196       // If this trigger requires a certain producer to have sent it
1197       // (non-empty producer_name()) ensure the producer who sent this trigger
1198       // matches.
1199       if (!iter->producer_name_regex().empty() &&
1200           !std::regex_match(
1201               producer->name_,
1202               std::regex(iter->producer_name_regex(), std::regex::extended))) {
1203         continue;
1204       }
1205 
1206       const bool triggers_already_received =
1207           !tracing_session.received_triggers.empty();
1208       tracing_session.received_triggers.push_back(
1209           {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
1210            producer->name_, producer->uid_});
1211       auto weak_this = weak_ptr_factory_.GetWeakPtr();
1212       switch (tracing_session.config.trigger_config().trigger_mode()) {
1213         case TraceConfig::TriggerConfig::START_TRACING:
1214           // If the session has already been triggered and moved past
1215           // CONFIGURED then we don't need to repeat StartTracing. This would
1216           // work fine (StartTracing would return false) but would add error
1217           // logs.
1218           if (tracing_session.state != TracingSession::CONFIGURED)
1219             break;
1220 
1221           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1222                         " with duration of %" PRIu32 "ms.",
1223                         iter->name().c_str(), tsid, iter->stop_delay_ms());
1224           // We override the trace duration to be the trigger's requested
1225           // value, this ensures that the trace will end after this amount
1226           // of time has passed.
1227           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
1228           StartTracing(tsid);
1229           break;
1230         case TraceConfig::TriggerConfig::STOP_TRACING:
1231           // Only stop the trace once to avoid confusing log messages. I.E.
1232           // when we've already hit the first trigger we've already Posted the
1233           // task to FlushAndDisable. So all future triggers will just break
1234           // out.
1235           if (triggers_already_received)
1236             break;
1237 
1238           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
1239                         " with duration of %" PRIu32 "ms.",
1240                         iter->name().c_str(), tsid, iter->stop_delay_ms());
1241           // Now that we've seen a trigger we need to stop, flush, and disable
1242           // this session after the configured |stop_delay_ms|.
1243           task_runner_->PostDelayedTask(
1244               [weak_this, tsid] {
1245                 // Skip entirely the flush if the trace session doesn't exist
1246                 // anymore. This is to prevent misleading error messages to be
1247                 // logged.
1248                 if (weak_this && weak_this->GetTracingSession(tsid))
1249                   weak_this->FlushAndDisableTracing(tsid);
1250               },
1251               // If this trigger is zero this will immediately executable and
1252               // will happen shortly.
1253               iter->stop_delay_ms());
1254           break;
1255         case TraceConfig::TriggerConfig::UNSPECIFIED:
1256           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
1257           break;
1258       }
1259     }
1260   }
1261 }
1262 
1263 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
1264 // conditions all data sources should have acked the stop and this will early
1265 // out.
OnDisableTracingTimeout(TracingSessionID tsid)1266 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
1267   PERFETTO_DCHECK_THREAD(thread_checker_);
1268   TracingSession* tracing_session = GetTracingSession(tsid);
1269   if (!tracing_session ||
1270       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
1271     return;  // Tracing session was successfully disabled.
1272   }
1273 
1274   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
1275                 tsid);
1276   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
1277   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
1278 }
1279 
DisableTracingNotifyConsumerAndFlushFile(TracingSession * tracing_session)1280 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
1281     TracingSession* tracing_session) {
1282   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
1283   for (auto& inst_kv : tracing_session->data_source_instances) {
1284     if (inst_kv.second.state == DataSourceInstance::STOPPED)
1285       continue;
1286     inst_kv.second.state = DataSourceInstance::STOPPED;
1287     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
1288     PERFETTO_DCHECK(producer);
1289     if (tracing_session->consumer_maybe_null) {
1290       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
1291           *producer, inst_kv.second);
1292     }
1293   }
1294   tracing_session->state = TracingSession::DISABLED;
1295 
1296   // Scrape any remaining chunks that weren't flushed by the producers.
1297   for (auto& producer_id_and_producer : producers_)
1298     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
1299 
1300   if (tracing_session->write_into_file) {
1301     tracing_session->write_period_ms = 0;
1302     ReadBuffers(tracing_session->id, nullptr);
1303   }
1304 
1305   if (tracing_session->consumer_maybe_null)
1306     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
1307 }
1308 
Flush(TracingSessionID tsid,uint32_t timeout_ms,ConsumerEndpoint::FlushCallback callback)1309 void TracingServiceImpl::Flush(TracingSessionID tsid,
1310                                uint32_t timeout_ms,
1311                                ConsumerEndpoint::FlushCallback callback) {
1312   PERFETTO_DCHECK_THREAD(thread_checker_);
1313   TracingSession* tracing_session = GetTracingSession(tsid);
1314   if (!tracing_session) {
1315     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
1316     return;
1317   }
1318 
1319   if (!timeout_ms)
1320     timeout_ms = tracing_session->flush_timeout_ms();
1321 
1322   if (tracing_session->pending_flushes.size() > 1000) {
1323     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
1324                   tracing_session->pending_flushes.size());
1325     callback(false);
1326     return;
1327   }
1328 
1329   FlushRequestID flush_request_id = ++last_flush_request_id_;
1330   PendingFlush& pending_flush =
1331       tracing_session->pending_flushes
1332           .emplace_hint(tracing_session->pending_flushes.end(),
1333                         flush_request_id, PendingFlush(std::move(callback)))
1334           ->second;
1335 
1336   // Send a flush request to each producer involved in the tracing session. In
1337   // order to issue a flush request we have to build a map of all data source
1338   // instance ids enabled for each producer.
1339   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
1340   for (const auto& data_source_inst : tracing_session->data_source_instances) {
1341     const ProducerID producer_id = data_source_inst.first;
1342     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
1343     flush_map[producer_id].push_back(ds_inst_id);
1344   }
1345 
1346   for (const auto& kv : flush_map) {
1347     ProducerID producer_id = kv.first;
1348     ProducerEndpointImpl* producer = GetProducer(producer_id);
1349     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1350     producer->Flush(flush_request_id, data_sources);
1351     pending_flush.producers.insert(producer_id);
1352   }
1353 
1354   // If there are no producers to flush (realistically this happens only in
1355   // some tests) fire OnFlushTimeout() straight away, without waiting.
1356   if (flush_map.empty())
1357     timeout_ms = 0;
1358 
1359   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1360   task_runner_->PostDelayedTask(
1361       [weak_this, tsid, flush_request_id] {
1362         if (weak_this)
1363           weak_this->OnFlushTimeout(tsid, flush_request_id);
1364       },
1365       timeout_ms);
1366 }
1367 
NotifyFlushDoneForProducer(ProducerID producer_id,FlushRequestID flush_request_id)1368 void TracingServiceImpl::NotifyFlushDoneForProducer(
1369     ProducerID producer_id,
1370     FlushRequestID flush_request_id) {
1371   for (auto& kv : tracing_sessions_) {
1372     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
1373     auto& pending_flushes = kv.second.pending_flushes;
1374     auto end_it = pending_flushes.upper_bound(flush_request_id);
1375     for (auto it = pending_flushes.begin(); it != end_it;) {
1376       PendingFlush& pending_flush = it->second;
1377       pending_flush.producers.erase(producer_id);
1378       if (pending_flush.producers.empty()) {
1379         auto weak_this = weak_ptr_factory_.GetWeakPtr();
1380         TracingSessionID tsid = kv.first;
1381         auto callback = std::move(pending_flush.callback);
1382         task_runner_->PostTask([weak_this, tsid, callback]() {
1383           if (weak_this) {
1384             weak_this->CompleteFlush(tsid, std::move(callback),
1385                                      /*success=*/true);
1386           }
1387         });
1388         it = pending_flushes.erase(it);
1389       } else {
1390         it++;
1391       }
1392     }  // for (pending_flushes)
1393   }    // for (tracing_session)
1394 }
1395 
OnFlushTimeout(TracingSessionID tsid,FlushRequestID flush_request_id)1396 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
1397                                         FlushRequestID flush_request_id) {
1398   TracingSession* tracing_session = GetTracingSession(tsid);
1399   if (!tracing_session)
1400     return;
1401   auto it = tracing_session->pending_flushes.find(flush_request_id);
1402   if (it == tracing_session->pending_flushes.end())
1403     return;  // Nominal case: flush was completed and acked on time.
1404 
1405   // If there were no producers to flush, consider it a success.
1406   bool success = it->second.producers.empty();
1407 
1408   auto callback = std::move(it->second.callback);
1409   tracing_session->pending_flushes.erase(it);
1410   CompleteFlush(tsid, std::move(callback), success);
1411 }
1412 
CompleteFlush(TracingSessionID tsid,ConsumerEndpoint::FlushCallback callback,bool success)1413 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
1414                                        ConsumerEndpoint::FlushCallback callback,
1415                                        bool success) {
1416   TracingSession* tracing_session = GetTracingSession(tsid);
1417   if (tracing_session) {
1418     // Producers may not have been able to flush all their data, even if they
1419     // indicated flush completion. If possible, also collect uncommitted chunks
1420     // to make sure we have everything they wrote so far.
1421     for (auto& producer_id_and_producer : producers_) {
1422       ScrapeSharedMemoryBuffers(tracing_session,
1423                                 producer_id_and_producer.second);
1424     }
1425   }
1426   callback(success);
1427 }
1428 
ScrapeSharedMemoryBuffers(TracingSession * tracing_session,ProducerEndpointImpl * producer)1429 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
1430     TracingSession* tracing_session,
1431     ProducerEndpointImpl* producer) {
1432   if (!producer->smb_scraping_enabled_)
1433     return;
1434 
1435   // Can't copy chunks if we don't know about any trace writers.
1436   if (producer->writers_.empty())
1437     return;
1438 
1439   // Performance optimization: On flush or session disconnect, this method is
1440   // called for each producer. If the producer doesn't participate in the
1441   // session, there's no need to scape its chunks right now. We can tell if a
1442   // producer participates in the session by checking if the producer is allowed
1443   // to write into the session's log buffers.
1444   const auto& session_buffers = tracing_session->buffers_index;
1445   bool producer_in_session =
1446       std::any_of(session_buffers.begin(), session_buffers.end(),
1447                   [producer](BufferID buffer_id) {
1448                     return producer->allowed_target_buffers_.count(buffer_id);
1449                   });
1450   if (!producer_in_session)
1451     return;
1452 
1453   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
1454 
1455   // Find and copy any uncommitted chunks from the SMB.
1456   //
1457   // In nominal conditions, the page layout of the used SMB pages should never
1458   // change because the service is the only one who is supposed to modify used
1459   // pages (to make them free again).
1460   //
1461   // However, the code here needs to deal with the case of a malicious producer
1462   // altering the SMB in unpredictable ways. Thankfully the SMB size is
1463   // immutable, so a chunk will always point to some valid memory, even if the
1464   // producer alters the intended layout and chunk header concurrently.
1465   // Ultimately a malicious producer altering the SMB's chunk layout while we
1466   // are iterating in this function is not any different from the case of a
1467   // malicious producer asking to commit a chunk made of random data, which is
1468   // something this class has to deal with regardless.
1469   //
1470   // The only legitimate mutations that can happen from sane producers,
1471   // concurrently to this function, are:
1472   //   A. free pages being partitioned,
1473   //   B. free chunks being migrated to kChunkBeingWritten,
1474   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
1475 
1476   SharedMemoryABI* abi = &producer->shmem_abi_;
1477   // num_pages() is immutable after the SMB is initialized and cannot be changed
1478   // even by a producer even if malicious.
1479   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
1480     uint32_t layout = abi->GetPageLayout(page_idx);
1481 
1482     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
1483     // Skip empty pages.
1484     if (used_chunks == 0)
1485       continue;
1486 
1487     // Scrape the chunks that are currently used. These should be either in
1488     // state kChunkBeingWritten or kChunkComplete.
1489     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
1490       if (!(used_chunks & 1))
1491         continue;
1492 
1493       SharedMemoryABI::ChunkState state =
1494           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
1495       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
1496                       state == SharedMemoryABI::kChunkComplete);
1497       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
1498 
1499       SharedMemoryABI::Chunk chunk =
1500           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
1501 
1502       uint16_t packet_count;
1503       uint8_t flags;
1504       // GetPacketCountAndFlags has acquire_load semantics.
1505       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
1506 
1507       // It only makes sense to copy an incomplete chunk if there's at least
1508       // one full packet available. (The producer may not have completed the
1509       // last packet in it yet, so we need at least 2.)
1510       if (!chunk_complete && packet_count < 2)
1511         continue;
1512 
1513       // At this point, it is safe to access the remaining header fields of
1514       // the chunk. Even if the chunk was only just transferred from
1515       // kChunkFree into kChunkBeingWritten state, the header should be
1516       // written completely once the packet count increased above 1 (it was
1517       // reset to 0 by the service when the chunk was freed).
1518 
1519       WriterID writer_id = chunk.writer_id();
1520       base::Optional<BufferID> target_buffer_id =
1521           producer->buffer_id_for_writer(writer_id);
1522 
1523       // We can only scrape this chunk if we know which log buffer to copy it
1524       // into.
1525       if (!target_buffer_id)
1526         continue;
1527 
1528       // Skip chunks that don't belong to the requested tracing session.
1529       bool target_buffer_belongs_to_session =
1530           std::find(session_buffers.begin(), session_buffers.end(),
1531                     *target_buffer_id) != session_buffers.end();
1532       if (!target_buffer_belongs_to_session)
1533         continue;
1534 
1535       uint32_t chunk_id =
1536           chunk.header()->chunk_id.load(std::memory_order_relaxed);
1537 
1538       CopyProducerPageIntoLogBuffer(
1539           producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
1540           packet_count, flags, chunk_complete, chunk.payload_begin(),
1541           chunk.payload_size());
1542     }
1543   }
1544 }
1545 
FlushAndDisableTracing(TracingSessionID tsid)1546 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
1547   PERFETTO_DCHECK_THREAD(thread_checker_);
1548   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
1549   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1550   Flush(tsid, 0, [weak_this, tsid](bool success) {
1551     PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
1552                   success, tsid);
1553     if (!weak_this)
1554       return;
1555     TracingSession* session = weak_this->GetTracingSession(tsid);
1556     if (session->consumer_maybe_null) {
1557       // If the consumer is still attached, just disable the session but give it
1558       // a chance to read the contents.
1559       weak_this->DisableTracing(tsid);
1560     } else {
1561       // If the consumer detached, destroy the session. If the consumer did
1562       // start the session in long-tracing mode, the service will have saved
1563       // the contents to the passed file. If not, the contents will be
1564       // destroyed.
1565       weak_this->FreeBuffers(tsid);
1566     }
1567   });
1568 }
1569 
PeriodicFlushTask(TracingSessionID tsid,bool post_next_only)1570 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
1571                                            bool post_next_only) {
1572   PERFETTO_DCHECK_THREAD(thread_checker_);
1573   TracingSession* tracing_session = GetTracingSession(tsid);
1574   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1575     return;
1576 
1577   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
1578   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1579   task_runner_->PostDelayedTask(
1580       [weak_this, tsid] {
1581         if (weak_this)
1582           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
1583       },
1584       flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
1585 
1586   if (post_next_only)
1587     return;
1588 
1589   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
1590   Flush(tsid, 0, [](bool success) {
1591     if (!success)
1592       PERFETTO_ELOG("Periodic flush timed out");
1593   });
1594 }
1595 
PeriodicClearIncrementalStateTask(TracingSessionID tsid,bool post_next_only)1596 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
1597     TracingSessionID tsid,
1598     bool post_next_only) {
1599   PERFETTO_DCHECK_THREAD(thread_checker_);
1600   TracingSession* tracing_session = GetTracingSession(tsid);
1601   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
1602     return;
1603 
1604   uint32_t clear_period_ms =
1605       tracing_session->config.incremental_state_config().clear_period_ms();
1606   auto weak_this = weak_ptr_factory_.GetWeakPtr();
1607   task_runner_->PostDelayedTask(
1608       [weak_this, tsid] {
1609         if (weak_this)
1610           weak_this->PeriodicClearIncrementalStateTask(
1611               tsid, /*post_next_only=*/false);
1612       },
1613       clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
1614 
1615   if (post_next_only)
1616     return;
1617 
1618   PERFETTO_DLOG(
1619       "Performing periodic incremental state clear for trace session %" PRIu64,
1620       tsid);
1621 
1622   // Queue the IPCs to producers with active data sources that opted in.
1623   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
1624   for (const auto& kv : tracing_session->data_source_instances) {
1625     ProducerID producer_id = kv.first;
1626     const DataSourceInstance& data_source = kv.second;
1627     if (data_source.handles_incremental_state_clear)
1628       clear_map[producer_id].push_back(data_source.instance_id);
1629   }
1630 
1631   for (const auto& kv : clear_map) {
1632     ProducerID producer_id = kv.first;
1633     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
1634     ProducerEndpointImpl* producer = GetProducer(producer_id);
1635     if (!producer) {
1636       PERFETTO_DFATAL("Producer does not exist.");
1637       continue;
1638     }
1639     producer->ClearIncrementalState(data_sources);
1640   }
1641 }
1642 
1643 // Note: when this is called to write into a file passed when starting tracing
1644 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
1645 // to send the trace data back over IPC).
ReadBuffers(TracingSessionID tsid,ConsumerEndpointImpl * consumer)1646 bool TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
1647                                      ConsumerEndpointImpl* consumer) {
1648   PERFETTO_DCHECK_THREAD(thread_checker_);
1649   TracingSession* tracing_session = GetTracingSession(tsid);
1650   if (!tracing_session) {
1651     // This will be hit systematically from the PostDelayedTask when directly
1652     // writing into the file (in which case consumer == nullptr). Suppress the
1653     // log in this case as it's just spam.
1654     if (consumer) {
1655       PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
1656     }
1657     return false;
1658   }
1659 
1660   // When a tracing session is waiting for a trigger it is considered empty. If
1661   // a tracing session finishes and moves into DISABLED without ever receiving a
1662   // trigger the trace should never return any data. This includes the synthetic
1663   // packets like TraceConfig and Clock snapshots. So we bail out early and let
1664   // the consumer know there is no data.
1665   if (!tracing_session->config.trigger_config().triggers().empty() &&
1666       tracing_session->received_triggers.empty()) {
1667     PERFETTO_DLOG(
1668         "ReadBuffers(): tracing session has not received a trigger yet.");
1669     return false;
1670   }
1671 
1672   // This can happen if the file is closed by a previous task because it reaches
1673   // |max_file_size_bytes|.
1674   if (!tracing_session->write_into_file && !consumer)
1675     return false;
1676 
1677   if (tracing_session->write_into_file && consumer) {
1678     // If the consumer enabled tracing and asked to save the contents into the
1679     // passed file makes little sense to also try to read the buffers over IPC,
1680     // as that would just steal data from the periodic draining task.
1681     PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
1682     return false;
1683   }
1684 
1685   std::vector<TracePacket> packets;
1686   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
1687 
1688   if (!tracing_session->initial_clock_snapshot_.empty()) {
1689     EmitClockSnapshot(tracing_session,
1690                       std::move(tracing_session->initial_clock_snapshot_),
1691                       /*set_root_timestamp=*/true, &packets);
1692   }
1693 
1694   if (!tracing_session->last_clock_snapshot_.empty()) {
1695     // We don't want to put a root timestamp in periodic clock snapshot packets
1696     // as they may be emitted very out of order with respect to the actual trace
1697     // packets, since consuming the trace may happen at any point after it
1698     // starts.
1699     EmitClockSnapshot(tracing_session,
1700                       std::move(tracing_session->last_clock_snapshot_),
1701                       /*set_root_timestamp=*/false, &packets);
1702   }
1703 
1704   if (tracing_session->should_emit_sync_marker) {
1705     SnapshotSyncMarker(&packets);
1706     tracing_session->should_emit_sync_marker = false;
1707   }
1708 
1709   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
1710     MaybeEmitTraceConfig(tracing_session, &packets);
1711     MaybeEmitReceivedTriggers(tracing_session, &packets);
1712   }
1713   if (!tracing_session->config.builtin_data_sources().disable_system_info())
1714     MaybeEmitSystemInfo(tracing_session, &packets);
1715   if (!tracing_session->config.builtin_data_sources().disable_service_events())
1716     MaybeEmitServiceEvents(tracing_session, &packets);
1717 
1718   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
1719   size_t total_slices = 0;   // SUM(#slices in |packets|).
1720 
1721   // Add up size for packets added by the Maybe* calls above.
1722   for (const TracePacket& packet : packets) {
1723     packets_bytes += packet.size();
1724     total_slices += packet.slices().size();
1725   }
1726 
1727   // This is a rough threshold to determine how much to read from the buffer in
1728   // each task. This is to avoid executing a single huge sending task for too
1729   // long and risk to hit the watchdog. This is *not* an upper bound: we just
1730   // stop accumulating new packets and PostTask *after* we cross this threshold.
1731   // This constant essentially balances the PostTask and IPC overhead vs the
1732   // responsiveness of the service. An extremely small value will cause one IPC
1733   // and one PostTask for each slice but will keep the service extremely
1734   // responsive. An extremely large value will batch the send for the full
1735   // buffer in one large task, will hit the blocking send() once the socket
1736   // buffers are full and hang the service for a bit (until the consumer
1737   // catches up).
1738   static constexpr size_t kApproxBytesPerTask = 32768;
1739   bool did_hit_threshold = false;
1740 
1741   // TODO(primiano): Extend the ReadBuffers API to allow reading only some
1742   // buffers, not all of them in one go.
1743   for (size_t buf_idx = 0;
1744        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
1745        buf_idx++) {
1746     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
1747     if (tbuf_iter == buffers_.end()) {
1748       PERFETTO_DFATAL("Buffer not found.");
1749       continue;
1750     }
1751     TraceBuffer& tbuf = *tbuf_iter->second;
1752     tbuf.BeginRead();
1753     while (!did_hit_threshold) {
1754       TracePacket packet;
1755       TraceBuffer::PacketSequenceProperties sequence_properties{};
1756       bool previous_packet_dropped;
1757       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
1758                                     &previous_packet_dropped)) {
1759         break;
1760       }
1761       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
1762       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
1763       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
1764       PERFETTO_DCHECK(packet.size() > 0);
1765       if (!PacketStreamValidator::Validate(packet.slices())) {
1766         tracing_session->invalid_packets++;
1767         PERFETTO_DLOG("Dropping invalid packet");
1768         continue;
1769       }
1770 
1771       // Append a slice with the trusted field data. This can't be spoofed
1772       // because above we validated that the existing slices don't contain any
1773       // trusted fields. For added safety we append instead of prepending
1774       // because according to protobuf semantics, if the same field is
1775       // encountered multiple times the last instance takes priority. Note that
1776       // truncated packets are also rejected, so the producer can't give us a
1777       // partial packet (e.g., a truncated string) which only becomes valid when
1778       // the trusted data is appended here.
1779       Slice slice = Slice::Allocate(32);
1780       protozero::StaticBuffered<protos::pbzero::TracePacket> trusted_packet(
1781           slice.own_data(), slice.size);
1782       trusted_packet->set_trusted_uid(
1783           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
1784       trusted_packet->set_trusted_packet_sequence_id(
1785           tracing_session->GetPacketSequenceID(
1786               sequence_properties.producer_id_trusted,
1787               sequence_properties.writer_id));
1788       if (previous_packet_dropped)
1789         trusted_packet->set_previous_packet_dropped(previous_packet_dropped);
1790       slice.size = trusted_packet.Finalize();
1791       packet.AddSlice(std::move(slice));
1792 
1793       // Append the packet (inclusive of the trusted uid) to |packets|.
1794       packets_bytes += packet.size();
1795       total_slices += packet.slices().size();
1796       did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
1797                           !tracing_session->write_into_file;
1798       packets.emplace_back(std::move(packet));
1799     }  // for(packets...)
1800   }    // for(buffers...)
1801 
1802   const bool has_more = did_hit_threshold;
1803 
1804   // Only emit the stats when there is no more trace data is available to read.
1805   // That way, any problems that occur while reading from the buffers are
1806   // reflected in the emitted stats. This is particularly important for use
1807   // cases where ReadBuffers is only ever called after the tracing session is
1808   // stopped.
1809   if (!has_more && tracing_session->should_emit_stats) {
1810     size_t prev_packets_size = packets.size();
1811     SnapshotStats(tracing_session, &packets);
1812     tracing_session->should_emit_stats = false;
1813 
1814     // Add sizes of packets emitted by SnapshotStats.
1815     for (size_t i = prev_packets_size; i < packets.size(); ++i) {
1816       packets_bytes += packets[i].size();
1817       total_slices += packets[i].slices().size();
1818     }
1819   }
1820 
1821   // If the caller asked us to write into a file by setting
1822   // |write_into_file| == true in the trace config, drain the packets read
1823   // (if any) into the given file descriptor.
1824   if (tracing_session->write_into_file) {
1825     const uint64_t max_size = tracing_session->max_file_size_bytes
1826                                   ? tracing_session->max_file_size_bytes
1827                                   : std::numeric_limits<size_t>::max();
1828 
1829     // When writing into a file, the file should look like a root trace.proto
1830     // message. Each packet should be prepended with a proto preamble stating
1831     // its field id (within trace.proto) and size. Hence the addition below.
1832     const size_t max_iovecs = total_slices + packets.size();
1833 
1834     size_t num_iovecs = 0;
1835     bool stop_writing_into_file = tracing_session->write_period_ms == 0;
1836     std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
1837     size_t num_iovecs_at_last_packet = 0;
1838     uint64_t bytes_about_to_be_written = 0;
1839     for (TracePacket& packet : packets) {
1840       std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
1841           packet.GetProtoPreamble();
1842       bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
1843       num_iovecs++;
1844       for (const Slice& slice : packet.slices()) {
1845         // writev() doesn't change the passed pointer. However, struct iovec
1846         // take a non-const ptr because it's the same struct used by readv().
1847         // Hence the const_cast here.
1848         char* start = static_cast<char*>(const_cast<void*>(slice.start));
1849         bytes_about_to_be_written += slice.size;
1850         iovecs[num_iovecs++] = {start, slice.size};
1851       }
1852 
1853       if (tracing_session->bytes_written_into_file +
1854               bytes_about_to_be_written >=
1855           max_size) {
1856         stop_writing_into_file = true;
1857         num_iovecs = num_iovecs_at_last_packet;
1858         break;
1859       }
1860 
1861       num_iovecs_at_last_packet = num_iovecs;
1862     }
1863     PERFETTO_DCHECK(num_iovecs <= max_iovecs);
1864     int fd = *tracing_session->write_into_file;
1865 
1866     uint64_t total_wr_size = 0;
1867 
1868     // writev() can take at most IOV_MAX entries per call. Batch them.
1869     constexpr size_t kIOVMax = IOV_MAX;
1870     for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
1871       int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
1872       ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
1873       if (wr_size <= 0) {
1874         PERFETTO_PLOG("writev() failed");
1875         stop_writing_into_file = true;
1876         break;
1877       }
1878       total_wr_size += static_cast<size_t>(wr_size);
1879     }
1880 
1881     tracing_session->bytes_written_into_file += total_wr_size;
1882 
1883     PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
1884                   (total_wr_size + 1023) / 1024, stop_writing_into_file);
1885     if (stop_writing_into_file) {
1886       // Ensure all data was written to the file before we close it.
1887       base::FlushFile(fd);
1888       tracing_session->write_into_file.reset();
1889       tracing_session->write_period_ms = 0;
1890       if (tracing_session->state == TracingSession::STARTED)
1891         DisableTracing(tsid);
1892       return true;
1893     }
1894 
1895     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1896     task_runner_->PostDelayedTask(
1897         [weak_this, tsid] {
1898           if (weak_this)
1899             weak_this->ReadBuffers(tsid, nullptr);
1900         },
1901         tracing_session->delay_to_next_write_period_ms());
1902     return true;
1903   }  // if (tracing_session->write_into_file)
1904 
1905   if (has_more) {
1906     auto weak_consumer = consumer->GetWeakPtr();
1907     auto weak_this = weak_ptr_factory_.GetWeakPtr();
1908     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
1909       if (!weak_this || !weak_consumer)
1910         return;
1911       weak_this->ReadBuffers(tsid, weak_consumer.get());
1912     });
1913   }
1914 
1915   // Keep this as tail call, just in case the consumer re-enters.
1916   consumer->consumer_->OnTraceData(std::move(packets), has_more);
1917   return true;
1918 }
1919 
FreeBuffers(TracingSessionID tsid)1920 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
1921   PERFETTO_DCHECK_THREAD(thread_checker_);
1922   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
1923   TracingSession* tracing_session = GetTracingSession(tsid);
1924   if (!tracing_session) {
1925     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
1926     return;  // TODO(primiano): signal failure?
1927   }
1928   DisableTracing(tsid, /*disable_immediately=*/true);
1929 
1930   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
1931   tracing_session->data_source_instances.clear();
1932 
1933   for (auto& producer_entry : producers_) {
1934     ProducerEndpointImpl* producer = producer_entry.second;
1935     producer->OnFreeBuffers(tracing_session->buffers_index);
1936   }
1937 
1938   for (BufferID buffer_id : tracing_session->buffers_index) {
1939     buffer_ids_.Free(buffer_id);
1940     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
1941     buffers_.erase(buffer_id);
1942   }
1943   bool notify_traceur = tracing_session->config.notify_traceur();
1944   tracing_sessions_.erase(tsid);
1945   UpdateMemoryGuardrail();
1946 
1947   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
1948                tracing_sessions_.size());
1949 
1950 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
1951   static const char kTraceurProp[] = "sys.trace.trace_end_signal";
1952   if (notify_traceur && __system_property_set(kTraceurProp, "1"))
1953     PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
1954 #else
1955   base::ignore_result(notify_traceur);
1956 #endif
1957 }
1958 
RegisterDataSource(ProducerID producer_id,const DataSourceDescriptor & desc)1959 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
1960                                             const DataSourceDescriptor& desc) {
1961   PERFETTO_DCHECK_THREAD(thread_checker_);
1962   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
1963                 producer_id, desc.name().c_str());
1964 
1965   PERFETTO_DCHECK(!desc.name().empty());
1966   auto reg_ds = data_sources_.emplace(desc.name(),
1967                                       RegisteredDataSource{producer_id, desc});
1968 
1969   // If there are existing tracing sessions, we need to check if the new
1970   // data source is enabled by any of them.
1971   if (tracing_sessions_.empty())
1972     return;
1973 
1974   ProducerEndpointImpl* producer = GetProducer(producer_id);
1975   if (!producer) {
1976     PERFETTO_DFATAL("Producer not found.");
1977     return;
1978   }
1979 
1980   for (auto& iter : tracing_sessions_) {
1981     TracingSession& tracing_session = iter.second;
1982     if (tracing_session.state != TracingSession::STARTED &&
1983         tracing_session.state != TracingSession::CONFIGURED) {
1984       continue;
1985     }
1986 
1987     TraceConfig::ProducerConfig producer_config;
1988     for (auto& config : tracing_session.config.producers()) {
1989       if (producer->name_ == config.producer_name()) {
1990         producer_config = config;
1991         break;
1992       }
1993     }
1994     for (const TraceConfig::DataSource& cfg_data_source :
1995          tracing_session.config.data_sources()) {
1996       if (cfg_data_source.config().name() != desc.name())
1997         continue;
1998       DataSourceInstance* ds_inst = SetupDataSource(
1999           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
2000       if (ds_inst && tracing_session.state == TracingSession::STARTED)
2001         StartDataSourceInstance(producer, &tracing_session, ds_inst);
2002     }
2003   }
2004 }
2005 
StopDataSourceInstance(ProducerEndpointImpl * producer,TracingSession * tracing_session,DataSourceInstance * instance,bool disable_immediately)2006 void TracingServiceImpl::StopDataSourceInstance(ProducerEndpointImpl* producer,
2007                                                 TracingSession* tracing_session,
2008                                                 DataSourceInstance* instance,
2009                                                 bool disable_immediately) {
2010   const DataSourceInstanceID ds_inst_id = instance->instance_id;
2011   if (instance->will_notify_on_stop && !disable_immediately) {
2012     instance->state = DataSourceInstance::STOPPING;
2013   } else {
2014     instance->state = DataSourceInstance::STOPPED;
2015   }
2016   if (tracing_session->consumer_maybe_null) {
2017     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2018         *producer, *instance);
2019   }
2020   producer->StopDataSource(ds_inst_id);
2021 }
2022 
UnregisterDataSource(ProducerID producer_id,const std::string & name)2023 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
2024                                               const std::string& name) {
2025   PERFETTO_DCHECK_THREAD(thread_checker_);
2026   PERFETTO_DLOG("Producer %" PRIu16 " unregistered data source \"%s\"",
2027                 producer_id, name.c_str());
2028   PERFETTO_CHECK(producer_id);
2029   ProducerEndpointImpl* producer = GetProducer(producer_id);
2030   PERFETTO_DCHECK(producer);
2031   for (auto& kv : tracing_sessions_) {
2032     auto& ds_instances = kv.second.data_source_instances;
2033     bool removed = false;
2034     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
2035       if (it->first == producer_id && it->second.data_source_name == name) {
2036         DataSourceInstanceID ds_inst_id = it->second.instance_id;
2037         if (it->second.state != DataSourceInstance::STOPPED) {
2038           if (it->second.state != DataSourceInstance::STOPPING)
2039             StopDataSourceInstance(producer, &kv.second, &it->second,
2040                                    /* disable_immediately = */ false);
2041           // Mark the instance as stopped immediately, since we are
2042           // unregistering it below.
2043           if (it->second.state == DataSourceInstance::STOPPING)
2044             NotifyDataSourceStopped(producer_id, ds_inst_id);
2045         }
2046         it = ds_instances.erase(it);
2047         removed = true;
2048       } else {
2049         ++it;
2050       }
2051     }  // for (data_source_instances)
2052     if (removed)
2053       MaybeNotifyAllDataSourcesStarted(&kv.second);
2054   }  // for (tracing_session)
2055 
2056   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
2057     if (it->second.producer_id == producer_id &&
2058         it->second.descriptor.name() == name) {
2059       data_sources_.erase(it);
2060       return;
2061     }
2062   }
2063 
2064   PERFETTO_DFATAL(
2065       "Tried to unregister a non-existent data source \"%s\" for "
2066       "producer %" PRIu16,
2067       name.c_str(), producer_id);
2068 }
2069 
SetupDataSource(const TraceConfig::DataSource & cfg_data_source,const TraceConfig::ProducerConfig & producer_config,const RegisteredDataSource & data_source,TracingSession * tracing_session)2070 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
2071     const TraceConfig::DataSource& cfg_data_source,
2072     const TraceConfig::ProducerConfig& producer_config,
2073     const RegisteredDataSource& data_source,
2074     TracingSession* tracing_session) {
2075   PERFETTO_DCHECK_THREAD(thread_checker_);
2076   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
2077   PERFETTO_DCHECK(producer);
2078   // An existing producer that is not ftrace could have registered itself as
2079   // ftrace, we must not enable it in that case.
2080   if (lockdown_mode_ && producer->uid_ != uid_) {
2081     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
2082     return nullptr;
2083   }
2084   // TODO(primiano): Add tests for registration ordering (data sources vs
2085   // consumers).
2086   if (!NameMatchesFilter(producer->name_,
2087                          cfg_data_source.producer_name_filter(),
2088                          cfg_data_source.producer_name_regex_filter())) {
2089     PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
2090                   cfg_data_source.config().name().c_str(),
2091                   producer->name_.c_str());
2092     return nullptr;
2093   }
2094 
2095   auto relative_buffer_id = cfg_data_source.config().target_buffer();
2096   if (relative_buffer_id >= tracing_session->num_buffers()) {
2097     PERFETTO_LOG(
2098         "The TraceConfig for DataSource %s specified a target_buffer out of "
2099         "bound (%d). Skipping it.",
2100         cfg_data_source.config().name().c_str(), relative_buffer_id);
2101     return nullptr;
2102   }
2103 
2104   // Create a copy of the DataSourceConfig specified in the trace config. This
2105   // will be passed to the producer after translating the |target_buffer| id.
2106   // The |target_buffer| parameter passed by the consumer in the trace config is
2107   // relative to the buffers declared in the same trace config. This has to be
2108   // translated to the global BufferID before passing it to the producers, which
2109   // don't know anything about tracing sessions and consumers.
2110 
2111   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
2112   auto insert_iter = tracing_session->data_source_instances.emplace(
2113       std::piecewise_construct,  //
2114       std::forward_as_tuple(producer->id_),
2115       std::forward_as_tuple(
2116           inst_id,
2117           cfg_data_source.config(),  //  Deliberate copy.
2118           data_source.descriptor.name(),
2119           data_source.descriptor.will_notify_on_start(),
2120           data_source.descriptor.will_notify_on_stop(),
2121           data_source.descriptor.handles_incremental_state_clear()));
2122   DataSourceInstance* ds_instance = &insert_iter->second;
2123 
2124   // New data source instance starts out in CONFIGURED state.
2125   if (tracing_session->consumer_maybe_null) {
2126     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
2127         *producer, *ds_instance);
2128   }
2129 
2130   DataSourceConfig& ds_config = ds_instance->config;
2131   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
2132   ds_config.set_stop_timeout_ms(tracing_session->data_source_stop_timeout_ms());
2133   ds_config.set_enable_extra_guardrails(
2134       tracing_session->config.enable_extra_guardrails());
2135   ds_config.set_tracing_session_id(tracing_session->id);
2136   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
2137   PERFETTO_DCHECK(global_id);
2138   ds_config.set_target_buffer(global_id);
2139 
2140   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
2141                 ds_config.name().c_str(), global_id);
2142   if (!producer->shared_memory()) {
2143     // Determine the SMB page size. Must be an integer multiple of 4k.
2144     // As for the SMB size below, the decision tree is as follows:
2145     // 1. Give priority to what is defined in the trace config.
2146     // 2. If unset give priority to the hint passed by the producer.
2147     // 3. Keep within bounds and ensure it's a multiple of 4k.
2148     size_t page_size = producer_config.page_size_kb() * 1024;
2149     if (page_size == 0)
2150       page_size = producer->shmem_page_size_hint_bytes_;
2151 
2152     // Determine the SMB size. Must be an integer multiple of the SMB page size.
2153     // The decision tree is as follows:
2154     // 1. Give priority to what defined in the trace config.
2155     // 2. If unset give priority to the hint passed by the producer.
2156     // 3. Keep within bounds and ensure it's a multiple of the page size.
2157     size_t shm_size = producer_config.shm_size_kb() * 1024;
2158     if (shm_size == 0)
2159       shm_size = producer->shmem_size_hint_bytes_;
2160 
2161     auto valid_sizes = EnsureValidShmSizes(shm_size, page_size);
2162     if (valid_sizes != std::tie(shm_size, page_size)) {
2163       PERFETTO_DLOG(
2164           "Invalid configured SMB sizes: shm_size %zu page_size %zu. Falling "
2165           "back to shm_size %zu page_size %zu.",
2166           shm_size, page_size, std::get<0>(valid_sizes),
2167           std::get<1>(valid_sizes));
2168     }
2169     std::tie(shm_size, page_size) = valid_sizes;
2170 
2171     // TODO(primiano): right now Create() will suicide in case of OOM if the
2172     // mmap fails. We should instead gracefully fail the request and tell the
2173     // client to go away.
2174     PERFETTO_DLOG("Creating SMB of %zu KB for producer \"%s\"", shm_size / 1024,
2175                   producer->name_.c_str());
2176     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
2177     producer->SetupSharedMemory(std::move(shared_memory), page_size,
2178                                 /*provided_by_producer=*/false);
2179   }
2180   producer->SetupDataSource(inst_id, ds_config);
2181   return ds_instance;
2182 }
2183 
2184 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
2185 // might be lying / returning garbage contents. |src| and |size| can be trusted
2186 // in terms of being a valid pointer, but not the contents.
CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,uid_t producer_uid_trusted,WriterID writer_id,ChunkID chunk_id,BufferID buffer_id,uint16_t num_fragments,uint8_t chunk_flags,bool chunk_complete,const uint8_t * src,size_t size)2187 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
2188     ProducerID producer_id_trusted,
2189     uid_t producer_uid_trusted,
2190     WriterID writer_id,
2191     ChunkID chunk_id,
2192     BufferID buffer_id,
2193     uint16_t num_fragments,
2194     uint8_t chunk_flags,
2195     bool chunk_complete,
2196     const uint8_t* src,
2197     size_t size) {
2198   PERFETTO_DCHECK_THREAD(thread_checker_);
2199 
2200   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
2201   if (!producer) {
2202     PERFETTO_DFATAL("Producer not found.");
2203     chunks_discarded_++;
2204     return;
2205   }
2206 
2207   TraceBuffer* buf = GetBufferByID(buffer_id);
2208   if (!buf) {
2209     PERFETTO_DLOG("Could not find target buffer %" PRIu16
2210                   " for producer %" PRIu16,
2211                   buffer_id, producer_id_trusted);
2212     chunks_discarded_++;
2213     return;
2214   }
2215 
2216   // Verify that the producer is actually allowed to write into the target
2217   // buffer specified in the request. This prevents a malicious producer from
2218   // injecting data into a log buffer that belongs to a tracing session the
2219   // producer is not part of.
2220   if (!producer->is_allowed_target_buffer(buffer_id)) {
2221     PERFETTO_ELOG("Producer %" PRIu16
2222                   " tried to write into forbidden target buffer %" PRIu16,
2223                   producer_id_trusted, buffer_id);
2224     PERFETTO_DFATAL("Forbidden target buffer");
2225     chunks_discarded_++;
2226     return;
2227   }
2228 
2229   // If the writer was registered by the producer, it should only write into the
2230   // buffer it was registered with.
2231   base::Optional<BufferID> associated_buffer =
2232       producer->buffer_id_for_writer(writer_id);
2233   if (associated_buffer && *associated_buffer != buffer_id) {
2234     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
2235                   " was registered to write into target buffer %" PRIu16
2236                   ", but tried to write into buffer %" PRIu16,
2237                   writer_id, producer_id_trusted, *associated_buffer,
2238                   buffer_id);
2239     PERFETTO_DFATAL("Wrong target buffer");
2240     chunks_discarded_++;
2241     return;
2242   }
2243 
2244   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
2245                           chunk_id, num_fragments, chunk_flags, chunk_complete,
2246                           src, size);
2247 }
2248 
ApplyChunkPatches(ProducerID producer_id_trusted,const std::vector<CommitDataRequest::ChunkToPatch> & chunks_to_patch)2249 void TracingServiceImpl::ApplyChunkPatches(
2250     ProducerID producer_id_trusted,
2251     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
2252   PERFETTO_DCHECK_THREAD(thread_checker_);
2253 
2254   for (const auto& chunk : chunks_to_patch) {
2255     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
2256     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
2257     TraceBuffer* buf =
2258         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
2259     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
2260                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
2261     if (!writer_id || writer_id > kMaxWriterID || !buf) {
2262       // This can genuinely happen when the trace is stopped. The producers
2263       // might see the stop signal with some delay and try to keep sending
2264       // patches left soon after.
2265       PERFETTO_DLOG(
2266           "Received invalid chunks_to_patch request from Producer: %" PRIu16
2267           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
2268           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
2269       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2270       continue;
2271     }
2272 
2273     // Note, there's no need to validate that the producer is allowed to write
2274     // to the specified buffer ID (or that it's the correct buffer ID for a
2275     // registered TraceWriter). That's because TraceBuffer uses the producer ID
2276     // and writer ID to look up the chunk to patch. If the producer specifies an
2277     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
2278     // patches. Because the producer ID is trusted, there's also no way for a
2279     // malicious producer to patch another producer's data.
2280 
2281     // Speculate on the fact that there are going to be a limited amount of
2282     // patches per request, so we can allocate the |patches| array on the stack.
2283     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
2284     if (chunk.patches().size() > patches.size()) {
2285       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
2286                     patches.size());
2287       PERFETTO_DFATAL("Too many patches");
2288       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
2289       continue;
2290     }
2291 
2292     size_t i = 0;
2293     for (const auto& patch : chunk.patches()) {
2294       const std::string& patch_data = patch.data();
2295       if (patch_data.size() != patches[i].data.size()) {
2296         PERFETTO_ELOG("Received patch from producer: %" PRIu16
2297                       " of unexpected size %zu",
2298                       producer_id_trusted, patch_data.size());
2299         patches_discarded_++;
2300         continue;
2301       }
2302       patches[i].offset_untrusted = patch.offset();
2303       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
2304       i++;
2305     }
2306     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
2307                                &patches[0], i, chunk.has_more_patches());
2308   }
2309 }
2310 
GetDetachedSession(uid_t uid,const std::string & key)2311 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
2312     uid_t uid,
2313     const std::string& key) {
2314   PERFETTO_DCHECK_THREAD(thread_checker_);
2315   for (auto& kv : tracing_sessions_) {
2316     TracingSession* session = &kv.second;
2317     if (session->consumer_uid == uid && session->detach_key == key) {
2318       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
2319       return session;
2320     }
2321   }
2322   return nullptr;
2323 }
2324 
GetTracingSession(TracingSessionID tsid)2325 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
2326     TracingSessionID tsid) {
2327   PERFETTO_DCHECK_THREAD(thread_checker_);
2328   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
2329   if (it == tracing_sessions_.end())
2330     return nullptr;
2331   return &it->second;
2332 }
2333 
GetNextProducerID()2334 ProducerID TracingServiceImpl::GetNextProducerID() {
2335   PERFETTO_DCHECK_THREAD(thread_checker_);
2336   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
2337   do {
2338     ++last_producer_id_;
2339   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
2340   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
2341   return last_producer_id_;
2342 }
2343 
GetBufferByID(BufferID buffer_id)2344 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
2345   auto buf_iter = buffers_.find(buffer_id);
2346   if (buf_iter == buffers_.end())
2347     return nullptr;
2348   return &*buf_iter->second;
2349 }
2350 
OnStartTriggersTimeout(TracingSessionID tsid)2351 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
2352   // Skip entirely the flush if the trace session doesn't exist anymore.
2353   // This is to prevent misleading error messages to be logged.
2354   //
2355   // if the trace has started from the trigger we rely on
2356   // the |stop_delay_ms| from the trigger so don't flush and
2357   // disable if we've moved beyond a CONFIGURED state
2358   auto* tracing_session_ptr = GetTracingSession(tsid);
2359   if (tracing_session_ptr &&
2360       tracing_session_ptr->state == TracingSession::CONFIGURED) {
2361     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
2362                   " since no triggers activated.",
2363                   tsid);
2364     // No data should be returned from ReadBuffers() regardless of if we
2365     // call FreeBuffers() or DisableTracing(). This is because in
2366     // STOP_TRACING we need this promise in either case, and using
2367     // DisableTracing() allows a graceful shutdown. Consumers can follow
2368     // their normal path and check the buffers through ReadBuffers() and
2369     // the code won't hang because the tracing session will still be
2370     // alive just disabled.
2371     DisableTracing(tsid);
2372   }
2373 }
2374 
UpdateMemoryGuardrail()2375 void TracingServiceImpl::UpdateMemoryGuardrail() {
2376 #if PERFETTO_BUILDFLAG(PERFETTO_WATCHDOG)
2377   uint64_t total_buffer_bytes = 0;
2378 
2379   // Sum up all the shared memory buffers.
2380   for (const auto& id_to_producer : producers_) {
2381     if (id_to_producer.second->shared_memory())
2382       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
2383   }
2384 
2385   // Sum up all the trace buffers.
2386   for (const auto& id_to_buffer : buffers_) {
2387     total_buffer_bytes += id_to_buffer.second->size();
2388   }
2389 
2390   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
2391   // interval.
2392   uint64_t guardrail = base::kWatchdogDefaultMemorySlack + total_buffer_bytes;
2393   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
2394 #endif
2395 }
2396 
PeriodicSnapshotTask(TracingSession * tracing_session,bool is_initial_snapshot)2397 void TracingServiceImpl::PeriodicSnapshotTask(TracingSession* tracing_session,
2398                                               bool is_initial_snapshot) {
2399   tracing_session->should_emit_sync_marker = true;
2400   tracing_session->should_emit_stats = true;
2401 
2402   if (!tracing_session->config.builtin_data_sources()
2403            .disable_clock_snapshotting()) {
2404     if (is_initial_snapshot)
2405       SnapshotClocks(&tracing_session->initial_clock_snapshot_);
2406     SnapshotClocks(&tracing_session->last_clock_snapshot_);
2407   }
2408 
2409   uint32_t interval_ms =
2410       tracing_session->config.builtin_data_sources().snapshot_interval_ms();
2411   if (!interval_ms)
2412     interval_ms = kDefaultSnapshotsIntervalMs;
2413 
2414   TracingSessionID tsid = tracing_session->id;
2415   auto weak_this = weak_ptr_factory_.GetWeakPtr();
2416   task_runner_->PostDelayedTask(
2417       [weak_this, tsid] {
2418         if (!weak_this)
2419           return;
2420         auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
2421         if (!tracing_session_ptr)
2422           return;
2423         if (tracing_session_ptr->state != TracingSession::STARTED)
2424           return;
2425         weak_this->PeriodicSnapshotTask(tracing_session_ptr,
2426                                         /*is_initial_snapshot=*/false);
2427       },
2428       interval_ms - (base::GetWallTimeMs().count() % interval_ms));
2429 }
2430 
SnapshotSyncMarker(std::vector<TracePacket> * packets)2431 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
2432   // The sync marks are used to tokenize large traces efficiently.
2433   // See description in trace_packet.proto.
2434   if (sync_marker_packet_size_ == 0) {
2435     // The marker ABI expects that the marker is written after the uid.
2436     // Protozero guarantees that fields are written in the same order of the
2437     // calls. The ResynchronizeTraceStreamUsingSyncMarker test verifies the ABI.
2438     protozero::StaticBuffered<protos::pbzero::TracePacket> packet(
2439         &sync_marker_packet_[0], sizeof(sync_marker_packet_));
2440     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2441     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2442 
2443     // Keep this last.
2444     packet->set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
2445     sync_marker_packet_size_ = packet.Finalize();
2446   }
2447   packets->emplace_back();
2448   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
2449 }
2450 
SnapshotClocks(TracingSession::ClockSnapshotData * snapshot_data)2451 void TracingServiceImpl::SnapshotClocks(
2452     TracingSession::ClockSnapshotData* snapshot_data) {
2453   // Minimum drift that justifies replacing a prior clock snapshot that hasn't
2454   // been emitted into the trace yet (see comment below).
2455   static constexpr int64_t kSignificantDriftNs = 10 * 1000 * 1000;  // 10 ms
2456 
2457   TracingSession::ClockSnapshotData new_snapshot_data;
2458 
2459 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
2460     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&    \
2461     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2462   struct {
2463     clockid_t id;
2464     protos::pbzero::BuiltinClock type;
2465     struct timespec ts;
2466   } clocks[] = {
2467       {CLOCK_BOOTTIME, protos::pbzero::BUILTIN_CLOCK_BOOTTIME, {0, 0}},
2468       {CLOCK_REALTIME_COARSE,
2469        protos::pbzero::BUILTIN_CLOCK_REALTIME_COARSE,
2470        {0, 0}},
2471       {CLOCK_MONOTONIC_COARSE,
2472        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_COARSE,
2473        {0, 0}},
2474       {CLOCK_REALTIME, protos::pbzero::BUILTIN_CLOCK_REALTIME, {0, 0}},
2475       {CLOCK_MONOTONIC, protos::pbzero::BUILTIN_CLOCK_MONOTONIC, {0, 0}},
2476       {CLOCK_MONOTONIC_RAW,
2477        protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW,
2478        {0, 0}},
2479   };
2480   // First snapshot all the clocks as atomically as we can.
2481   for (auto& clock : clocks) {
2482     if (clock_gettime(clock.id, &clock.ts) == -1)
2483       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
2484   }
2485   for (auto& clock : clocks) {
2486     new_snapshot_data.push_back(std::make_pair(
2487         static_cast<uint32_t>(clock.type),
2488         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())));
2489   }
2490 #else   // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2491         // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&
2492         // !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2493   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
2494   // The default trace clock is boot time, so we always need to emit a path to
2495   // it. However since we don't actually have a boot time source on these
2496   // platforms, pretend that wall time equals boot time.
2497   new_snapshot_data.push_back(
2498       std::make_pair(protos::pbzero::BUILTIN_CLOCK_BOOTTIME, wall_time_ns));
2499   new_snapshot_data.push_back(
2500       std::make_pair(protos::pbzero::BUILTIN_CLOCK_MONOTONIC, wall_time_ns));
2501 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) &&
2502         // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) &&
2503         // !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2504 
2505   // If we're about to update a session's latest clock snapshot that hasn't been
2506   // emitted into the trace yet, check whether the clocks have drifted enough to
2507   // warrant overriding the current snapshot values. The older snapshot would be
2508   // valid for a larger part of the currently buffered trace data because the
2509   // clock sync protocol in trace processor uses the latest clock <= timestamp
2510   // to translate times (see https://perfetto.dev/docs/concepts/clock-sync), so
2511   // we try to keep it if we can.
2512   if (!snapshot_data->empty()) {
2513     PERFETTO_DCHECK(snapshot_data->size() == new_snapshot_data.size());
2514     PERFETTO_DCHECK((*snapshot_data)[0].first ==
2515                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
2516 
2517     bool update_snapshot = false;
2518     uint64_t old_boot_ns = (*snapshot_data)[0].second;
2519     uint64_t new_boot_ns = new_snapshot_data[0].second;
2520     int64_t boot_diff =
2521         static_cast<int64_t>(new_boot_ns) - static_cast<int64_t>(old_boot_ns);
2522 
2523     for (size_t i = 1; i < snapshot_data->size(); i++) {
2524       uint64_t old_ns = (*snapshot_data)[i].second;
2525       uint64_t new_ns = new_snapshot_data[i].second;
2526 
2527       int64_t diff =
2528           static_cast<int64_t>(new_ns) - static_cast<int64_t>(old_ns);
2529 
2530       // Compare the boottime delta against the delta of this clock.
2531       if (std::abs(boot_diff - diff) >= kSignificantDriftNs) {
2532         update_snapshot = true;
2533         break;
2534       }
2535     }
2536     if (!update_snapshot)
2537       return;
2538     snapshot_data->clear();
2539   }
2540 
2541   *snapshot_data = std::move(new_snapshot_data);
2542 }
2543 
EmitClockSnapshot(TracingSession * tracing_session,TracingSession::ClockSnapshotData snapshot_data,bool set_root_timestamp,std::vector<TracePacket> * packets)2544 void TracingServiceImpl::EmitClockSnapshot(
2545     TracingSession* tracing_session,
2546     TracingSession::ClockSnapshotData snapshot_data,
2547     bool set_root_timestamp,
2548     std::vector<TracePacket>* packets) {
2549   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2550   if (set_root_timestamp) {
2551     // First timestamp from the snapshot is the default time domain (BOOTTIME on
2552     // systems that support it, or walltime otherwise).
2553     PERFETTO_DCHECK(snapshot_data[0].first ==
2554                     protos::gen::BUILTIN_CLOCK_BOOTTIME);
2555     packet->set_timestamp(snapshot_data[0].second);
2556   }
2557 
2558   auto* snapshot = packet->set_clock_snapshot();
2559 
2560   protos::gen::BuiltinClock trace_clock =
2561       tracing_session->config.builtin_data_sources().primary_trace_clock();
2562   if (!trace_clock)
2563     trace_clock = protos::gen::BUILTIN_CLOCK_BOOTTIME;
2564   snapshot->set_primary_trace_clock(
2565       static_cast<protos::pbzero::BuiltinClock>(trace_clock));
2566 
2567   for (auto& clock_id_and_ts : snapshot_data) {
2568     auto* c = snapshot->add_clocks();
2569     c->set_clock_id(clock_id_and_ts.first);
2570     c->set_timestamp(clock_id_and_ts.second);
2571   }
2572 
2573   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2574   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2575   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2576 }
2577 
SnapshotStats(TracingSession * tracing_session,std::vector<TracePacket> * packets)2578 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
2579                                        std::vector<TracePacket>* packets) {
2580   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2581   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2582   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2583   GetTraceStats(tracing_session).Serialize(packet->set_trace_stats());
2584   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2585 }
2586 
GetTraceStats(TracingSession * tracing_session)2587 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
2588   TraceStats trace_stats;
2589   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
2590   trace_stats.set_producers_seen(last_producer_id_);
2591   trace_stats.set_data_sources_registered(
2592       static_cast<uint32_t>(data_sources_.size()));
2593   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
2594   trace_stats.set_tracing_sessions(
2595       static_cast<uint32_t>(tracing_sessions_.size()));
2596   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
2597   trace_stats.set_chunks_discarded(chunks_discarded_);
2598   trace_stats.set_patches_discarded(patches_discarded_);
2599   trace_stats.set_invalid_packets(tracing_session->invalid_packets);
2600 
2601   for (BufferID buf_id : tracing_session->buffers_index) {
2602     TraceBuffer* buf = GetBufferByID(buf_id);
2603     if (!buf) {
2604       PERFETTO_DFATAL("Buffer not found.");
2605       continue;
2606     }
2607     *trace_stats.add_buffer_stats() = buf->stats();
2608   }  // for (buf in session).
2609   return trace_stats;
2610 }
2611 
MaybeEmitTraceConfig(TracingSession * tracing_session,std::vector<TracePacket> * packets)2612 void TracingServiceImpl::MaybeEmitTraceConfig(
2613     TracingSession* tracing_session,
2614     std::vector<TracePacket>* packets) {
2615   if (tracing_session->did_emit_config)
2616     return;
2617   tracing_session->did_emit_config = true;
2618   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2619   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2620   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2621   tracing_session->config.Serialize(packet->set_trace_config());
2622   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2623 }
2624 
MaybeEmitSystemInfo(TracingSession * tracing_session,std::vector<TracePacket> * packets)2625 void TracingServiceImpl::MaybeEmitSystemInfo(
2626     TracingSession* tracing_session,
2627     std::vector<TracePacket>* packets) {
2628   if (tracing_session->did_emit_system_info)
2629     return;
2630   tracing_session->did_emit_system_info = true;
2631   protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2632   auto* info = packet->set_system_info();
2633   base::ignore_result(info);  // For PERFETTO_OS_WIN.
2634 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) && \
2635     !PERFETTO_BUILDFLAG(PERFETTO_OS_NACL)
2636   struct utsname uname_info;
2637   if (uname(&uname_info) == 0) {
2638     auto* utsname_info = info->set_utsname();
2639     utsname_info->set_sysname(uname_info.sysname);
2640     utsname_info->set_version(uname_info.version);
2641     utsname_info->set_machine(uname_info.machine);
2642     utsname_info->set_release(uname_info.release);
2643   }
2644 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
2645 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2646   char value[PROP_VALUE_MAX];
2647   if (__system_property_get("ro.build.fingerprint", value)) {
2648     info->set_android_build_fingerprint(value);
2649   } else {
2650     PERFETTO_ELOG("Unable to read ro.build.fingerprint");
2651   }
2652   info->set_hz(sysconf(_SC_CLK_TCK));
2653 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
2654   packet->set_trusted_uid(static_cast<int32_t>(uid_));
2655   packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2656   SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2657 }
2658 
MaybeEmitServiceEvents(TracingSession * tracing_session,std::vector<TracePacket> * packets)2659 void TracingServiceImpl::MaybeEmitServiceEvents(
2660     TracingSession* tracing_session,
2661     std::vector<TracePacket>* packets) {
2662   int64_t all_start_ns = tracing_session->time_all_data_source_started.count();
2663   if (!tracing_session->did_emit_all_data_source_started && all_start_ns > 0) {
2664     tracing_session->did_emit_all_data_source_started = true;
2665     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2666     packet->set_timestamp(static_cast<uint64_t>(all_start_ns));
2667     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2668     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2669     packet->set_service_event()->set_all_data_sources_started(true);
2670     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2671   }
2672 }
2673 
MaybeEmitReceivedTriggers(TracingSession * tracing_session,std::vector<TracePacket> * packets)2674 void TracingServiceImpl::MaybeEmitReceivedTriggers(
2675     TracingSession* tracing_session,
2676     std::vector<TracePacket>* packets) {
2677   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
2678                   tracing_session->received_triggers.size());
2679   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
2680        i < tracing_session->received_triggers.size(); ++i) {
2681     const auto& info = tracing_session->received_triggers[i];
2682     protozero::HeapBuffered<protos::pbzero::TracePacket> packet;
2683     auto* trigger = packet->set_trigger();
2684     trigger->set_trigger_name(info.trigger_name);
2685     trigger->set_producer_name(info.producer_name);
2686     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
2687 
2688     packet->set_timestamp(info.boot_time_ns);
2689     packet->set_trusted_uid(static_cast<int32_t>(uid_));
2690     packet->set_trusted_packet_sequence_id(kServicePacketSequenceID);
2691     SerializeAndAppendPacket(packets, packet.SerializeAsArray());
2692     ++tracing_session->num_triggers_emitted_into_trace;
2693   }
2694 }
2695 
2696 ////////////////////////////////////////////////////////////////////////////////
2697 // TracingServiceImpl::ConsumerEndpointImpl implementation
2698 ////////////////////////////////////////////////////////////////////////////////
2699 
ConsumerEndpointImpl(TracingServiceImpl * service,base::TaskRunner * task_runner,Consumer * consumer,uid_t uid)2700 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
2701     TracingServiceImpl* service,
2702     base::TaskRunner* task_runner,
2703     Consumer* consumer,
2704     uid_t uid)
2705     : task_runner_(task_runner),
2706       service_(service),
2707       consumer_(consumer),
2708       uid_(uid),
2709       weak_ptr_factory_(this) {}
2710 
~ConsumerEndpointImpl()2711 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
2712   service_->DisconnectConsumer(this);
2713   consumer_->OnDisconnect();
2714 }
2715 
NotifyOnTracingDisabled()2716 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
2717   PERFETTO_DCHECK_THREAD(thread_checker_);
2718   auto weak_this = GetWeakPtr();
2719   task_runner_->PostTask([weak_this] {
2720     if (weak_this)
2721       weak_this->consumer_->OnTracingDisabled();
2722   });
2723 }
2724 
EnableTracing(const TraceConfig & cfg,base::ScopedFile fd)2725 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
2726     const TraceConfig& cfg,
2727     base::ScopedFile fd) {
2728   PERFETTO_DCHECK_THREAD(thread_checker_);
2729   if (!service_->EnableTracing(this, cfg, std::move(fd)))
2730     NotifyOnTracingDisabled();
2731 }
2732 
ChangeTraceConfig(const TraceConfig & cfg)2733 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
2734     const TraceConfig& cfg) {
2735   if (!tracing_session_id_) {
2736     PERFETTO_LOG(
2737         "Consumer called ChangeTraceConfig() but tracing was "
2738         "not active");
2739     return;
2740   }
2741   service_->ChangeTraceConfig(this, cfg);
2742 }
2743 
StartTracing()2744 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
2745   PERFETTO_DCHECK_THREAD(thread_checker_);
2746   if (!tracing_session_id_) {
2747     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
2748     return;
2749   }
2750   service_->StartTracing(tracing_session_id_);
2751 }
2752 
DisableTracing()2753 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
2754   PERFETTO_DCHECK_THREAD(thread_checker_);
2755   if (!tracing_session_id_) {
2756     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
2757     return;
2758   }
2759   service_->DisableTracing(tracing_session_id_);
2760 }
2761 
ReadBuffers()2762 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
2763   PERFETTO_DCHECK_THREAD(thread_checker_);
2764   if (!tracing_session_id_) {
2765     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
2766     consumer_->OnTraceData({}, /* has_more = */ false);
2767     return;
2768   }
2769   if (!service_->ReadBuffers(tracing_session_id_, this)) {
2770     consumer_->OnTraceData({}, /* has_more = */ false);
2771   }
2772 }
2773 
FreeBuffers()2774 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
2775   PERFETTO_DCHECK_THREAD(thread_checker_);
2776   if (!tracing_session_id_) {
2777     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
2778     return;
2779   }
2780   service_->FreeBuffers(tracing_session_id_);
2781   tracing_session_id_ = 0;
2782 }
2783 
Flush(uint32_t timeout_ms,FlushCallback callback)2784 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
2785                                                      FlushCallback callback) {
2786   PERFETTO_DCHECK_THREAD(thread_checker_);
2787   if (!tracing_session_id_) {
2788     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
2789     return;
2790   }
2791   service_->Flush(tracing_session_id_, timeout_ms, callback);
2792 }
2793 
Detach(const std::string & key)2794 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
2795   PERFETTO_DCHECK_THREAD(thread_checker_);
2796   bool success = service_->DetachConsumer(this, key);
2797   auto weak_this = GetWeakPtr();
2798   task_runner_->PostTask([weak_this, success] {
2799     if (weak_this)
2800       weak_this->consumer_->OnDetach(success);
2801   });
2802 }
2803 
Attach(const std::string & key)2804 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
2805   PERFETTO_DCHECK_THREAD(thread_checker_);
2806   bool success = service_->AttachConsumer(this, key);
2807   auto weak_this = GetWeakPtr();
2808   task_runner_->PostTask([weak_this, success] {
2809     if (!weak_this)
2810       return;
2811     Consumer* consumer = weak_this->consumer_;
2812     TracingSession* session =
2813         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
2814     if (!session) {
2815       consumer->OnAttach(false, TraceConfig());
2816       return;
2817     }
2818     consumer->OnAttach(success, session->config);
2819   });
2820 }
2821 
GetTraceStats()2822 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
2823   PERFETTO_DCHECK_THREAD(thread_checker_);
2824   bool success = false;
2825   TraceStats stats;
2826   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2827   if (session) {
2828     success = true;
2829     stats = service_->GetTraceStats(session);
2830   }
2831   auto weak_this = GetWeakPtr();
2832   task_runner_->PostTask([weak_this, success, stats] {
2833     if (weak_this)
2834       weak_this->consumer_->OnTraceStats(success, stats);
2835   });
2836 }
2837 
ObserveEvents(uint32_t events_mask)2838 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
2839     uint32_t events_mask) {
2840   PERFETTO_DCHECK_THREAD(thread_checker_);
2841   observable_events_mask_ = events_mask;
2842   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
2843   if (!session)
2844     return;
2845 
2846   if (observable_events_mask_ & ObservableEvents::TYPE_DATA_SOURCES_INSTANCES) {
2847     // Issue initial states.
2848     for (const auto& kv : session->data_source_instances) {
2849       ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
2850       PERFETTO_DCHECK(producer);
2851       OnDataSourceInstanceStateChange(*producer, kv.second);
2852     }
2853   }
2854 
2855   // If the ObserveEvents() call happens after data sources have acked already
2856   // notify immediately.
2857   if (observable_events_mask_ &
2858       ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED) {
2859     service_->MaybeNotifyAllDataSourcesStarted(session);
2860   }
2861 }
2862 
OnDataSourceInstanceStateChange(const ProducerEndpointImpl & producer,const DataSourceInstance & instance)2863 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
2864     const ProducerEndpointImpl& producer,
2865     const DataSourceInstance& instance) {
2866   if (!(observable_events_mask_ &
2867         ObservableEvents::TYPE_DATA_SOURCES_INSTANCES)) {
2868     return;
2869   }
2870 
2871   if (instance.state != DataSourceInstance::CONFIGURED &&
2872       instance.state != DataSourceInstance::STARTED &&
2873       instance.state != DataSourceInstance::STOPPED) {
2874     return;
2875   }
2876 
2877   auto* observable_events = AddObservableEvents();
2878   auto* change = observable_events->add_instance_state_changes();
2879   change->set_producer_name(producer.name_);
2880   change->set_data_source_name(instance.data_source_name);
2881   if (instance.state == DataSourceInstance::STARTED) {
2882     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED);
2883   } else {
2884     change->set_state(ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STOPPED);
2885   }
2886 }
2887 
OnAllDataSourcesStarted()2888 void TracingServiceImpl::ConsumerEndpointImpl::OnAllDataSourcesStarted() {
2889   if (!(observable_events_mask_ &
2890         ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED)) {
2891     return;
2892   }
2893   auto* observable_events = AddObservableEvents();
2894   observable_events->set_all_data_sources_started(true);
2895 }
2896 
2897 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
GetWeakPtr()2898 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
2899   PERFETTO_DCHECK_THREAD(thread_checker_);
2900   return weak_ptr_factory_.GetWeakPtr();
2901 }
2902 
2903 ObservableEvents*
AddObservableEvents()2904 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
2905   PERFETTO_DCHECK_THREAD(thread_checker_);
2906   if (!observable_events_) {
2907     observable_events_.reset(new ObservableEvents());
2908     auto weak_this = GetWeakPtr();
2909     task_runner_->PostTask([weak_this] {
2910       if (!weak_this)
2911         return;
2912 
2913       // Move into a temporary to allow reentrancy in OnObservableEvents.
2914       auto observable_events = std::move(weak_this->observable_events_);
2915       weak_this->consumer_->OnObservableEvents(*observable_events);
2916     });
2917   }
2918   return observable_events_.get();
2919 }
2920 
QueryServiceState(QueryServiceStateCallback callback)2921 void TracingServiceImpl::ConsumerEndpointImpl::QueryServiceState(
2922     QueryServiceStateCallback callback) {
2923   PERFETTO_DCHECK_THREAD(thread_checker_);
2924   TracingServiceState svc_state;
2925 
2926   const auto& sessions = service_->tracing_sessions_;
2927   svc_state.set_num_sessions(static_cast<int>(sessions.size()));
2928 
2929   int num_started = 0;
2930   for (const auto& kv : sessions)
2931     num_started += kv.second.state == TracingSession::State::STARTED ? 1 : 0;
2932   svc_state.set_num_sessions_started(static_cast<int>(num_started));
2933 
2934   for (const auto& kv : service_->producers_) {
2935     auto* producer = svc_state.add_producers();
2936     producer->set_id(static_cast<int>(kv.first));
2937     producer->set_name(kv.second->name_);
2938     producer->set_uid(static_cast<int32_t>(producer->uid()));
2939   }
2940 
2941   for (const auto& kv : service_->data_sources_) {
2942     const auto& registered_data_source = kv.second;
2943     auto* data_source = svc_state.add_data_sources();
2944     *data_source->mutable_ds_descriptor() = registered_data_source.descriptor;
2945     data_source->set_producer_id(
2946         static_cast<int>(registered_data_source.producer_id));
2947   }
2948   callback(/*success=*/true, svc_state);
2949 }
2950 
QueryCapabilities(QueryCapabilitiesCallback callback)2951 void TracingServiceImpl::ConsumerEndpointImpl::QueryCapabilities(
2952     QueryCapabilitiesCallback callback) {
2953   PERFETTO_DCHECK_THREAD(thread_checker_);
2954   TracingServiceCapabilities caps;
2955   caps.set_has_query_capabilities(true);
2956   caps.set_has_trace_config_output_path(true);
2957   caps.add_observable_events(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
2958   caps.add_observable_events(ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
2959   static_assert(ObservableEvents::Type_MAX ==
2960                     ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED,
2961                 "");
2962   callback(caps);
2963 }
2964 
2965 ////////////////////////////////////////////////////////////////////////////////
2966 // TracingServiceImpl::ProducerEndpointImpl implementation
2967 ////////////////////////////////////////////////////////////////////////////////
2968 
ProducerEndpointImpl(ProducerID id,uid_t uid,TracingServiceImpl * service,base::TaskRunner * task_runner,Producer * producer,const std::string & producer_name,bool in_process,bool smb_scraping_enabled)2969 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
2970     ProducerID id,
2971     uid_t uid,
2972     TracingServiceImpl* service,
2973     base::TaskRunner* task_runner,
2974     Producer* producer,
2975     const std::string& producer_name,
2976     bool in_process,
2977     bool smb_scraping_enabled)
2978     : id_(id),
2979       uid_(uid),
2980       service_(service),
2981       task_runner_(task_runner),
2982       producer_(producer),
2983       name_(producer_name),
2984       in_process_(in_process),
2985       smb_scraping_enabled_(smb_scraping_enabled),
2986       weak_ptr_factory_(this) {}
2987 
~ProducerEndpointImpl()2988 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
2989   service_->DisconnectProducer(id_);
2990   producer_->OnDisconnect();
2991 }
2992 
RegisterDataSource(const DataSourceDescriptor & desc)2993 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
2994     const DataSourceDescriptor& desc) {
2995   PERFETTO_DCHECK_THREAD(thread_checker_);
2996   if (desc.name().empty()) {
2997     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
2998     return;
2999   }
3000 
3001   service_->RegisterDataSource(id_, desc);
3002 }
3003 
UnregisterDataSource(const std::string & name)3004 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
3005     const std::string& name) {
3006   PERFETTO_DCHECK_THREAD(thread_checker_);
3007   service_->UnregisterDataSource(id_, name);
3008 }
3009 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)3010 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
3011     uint32_t writer_id,
3012     uint32_t target_buffer) {
3013   PERFETTO_DCHECK_THREAD(thread_checker_);
3014   writers_[static_cast<WriterID>(writer_id)] =
3015       static_cast<BufferID>(target_buffer);
3016 }
3017 
UnregisterTraceWriter(uint32_t writer_id)3018 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
3019     uint32_t writer_id) {
3020   PERFETTO_DCHECK_THREAD(thread_checker_);
3021   writers_.erase(static_cast<WriterID>(writer_id));
3022 }
3023 
CommitData(const CommitDataRequest & req_untrusted,CommitDataCallback callback)3024 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
3025     const CommitDataRequest& req_untrusted,
3026     CommitDataCallback callback) {
3027   PERFETTO_DCHECK_THREAD(thread_checker_);
3028 
3029   if (metatrace::IsEnabled(metatrace::TAG_TRACE_SERVICE)) {
3030     PERFETTO_METATRACE_COUNTER(TAG_TRACE_SERVICE, TRACE_SERVICE_COMMIT_DATA,
3031                                EncodeCommitDataRequest(id_, req_untrusted));
3032   }
3033 
3034   if (!shared_memory_) {
3035     PERFETTO_DLOG(
3036         "Attempted to commit data before the shared memory was allocated.");
3037     return;
3038   }
3039   PERFETTO_DCHECK(shmem_abi_.is_valid());
3040   for (const auto& entry : req_untrusted.chunks_to_move()) {
3041     const uint32_t page_idx = entry.page();
3042     if (page_idx >= shmem_abi_.num_pages())
3043       continue;  // A buggy or malicious producer.
3044 
3045     SharedMemoryABI::Chunk chunk =
3046         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
3047     if (!chunk.is_valid()) {
3048       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
3049                     entry.page(), entry.chunk());
3050       continue;
3051     }
3052 
3053     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
3054     // the ABI contract expects the producer to not touch the chunk anymore
3055     // (until the service marks that as free). This is why all the reads below
3056     // are just memory_order_relaxed. Also, the code here assumes that all this
3057     // data can be malicious and just gives up if anything is malformed.
3058     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
3059     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
3060     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
3061     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
3062     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
3063     uint16_t num_fragments = packets.count;
3064     uint8_t chunk_flags = packets.flags;
3065 
3066     service_->CopyProducerPageIntoLogBuffer(
3067         id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
3068         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
3069 
3070     // This one has release-store semantics.
3071     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
3072   }  // for(chunks_to_move)
3073 
3074   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
3075 
3076   if (req_untrusted.flush_request_id()) {
3077     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
3078   }
3079 
3080   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
3081   // callback being invoked within the same callstack and not posted. If this
3082   // changes, the code there needs to be changed accordingly.
3083   if (callback)
3084     callback();
3085 }
3086 
SetupSharedMemory(std::unique_ptr<SharedMemory> shared_memory,size_t page_size_bytes,bool provided_by_producer)3087 void TracingServiceImpl::ProducerEndpointImpl::SetupSharedMemory(
3088     std::unique_ptr<SharedMemory> shared_memory,
3089     size_t page_size_bytes,
3090     bool provided_by_producer) {
3091   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
3092   PERFETTO_DCHECK(page_size_bytes % 1024 == 0);
3093 
3094   shared_memory_ = std::move(shared_memory);
3095   shared_buffer_page_size_kb_ = page_size_bytes / 1024;
3096   is_shmem_provided_by_producer_ = provided_by_producer;
3097 
3098   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
3099                         shared_memory_->size(),
3100                         shared_buffer_page_size_kb() * 1024);
3101   if (in_process_) {
3102     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
3103         shared_memory_->start(), shared_memory_->size(),
3104         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
3105   }
3106 
3107   OnTracingSetup();
3108   service_->UpdateMemoryGuardrail();
3109 }
3110 
shared_memory() const3111 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
3112   PERFETTO_DCHECK_THREAD(thread_checker_);
3113   return shared_memory_.get();
3114 }
3115 
shared_buffer_page_size_kb() const3116 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
3117     const {
3118   return shared_buffer_page_size_kb_;
3119 }
3120 
ActivateTriggers(const std::vector<std::string> & triggers)3121 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
3122     const std::vector<std::string>& triggers) {
3123   service_->ActivateTriggers(id_, triggers);
3124 }
3125 
StopDataSource(DataSourceInstanceID ds_inst_id)3126 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
3127     DataSourceInstanceID ds_inst_id) {
3128   // TODO(primiano): When we'll support tearing down the SMB, at this point we
3129   // should send the Producer a TearDownTracing if all its data sources have
3130   // been disabled (see b/77532839 and aosp/655179 PS1).
3131   PERFETTO_DCHECK_THREAD(thread_checker_);
3132   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3133   task_runner_->PostTask([weak_this, ds_inst_id] {
3134     if (weak_this)
3135       weak_this->producer_->StopDataSource(ds_inst_id);
3136   });
3137 }
3138 
3139 SharedMemoryArbiter*
MaybeSharedMemoryArbiter()3140 TracingServiceImpl::ProducerEndpointImpl::MaybeSharedMemoryArbiter() {
3141   if (!inproc_shmem_arbiter_) {
3142     PERFETTO_FATAL(
3143         "The in-process SharedMemoryArbiter can only be used when "
3144         "CreateProducer has been called with in_process=true and after tracing "
3145         "has started.");
3146   }
3147 
3148   PERFETTO_DCHECK(in_process_);
3149   return inproc_shmem_arbiter_.get();
3150 }
3151 
IsShmemProvidedByProducer() const3152 bool TracingServiceImpl::ProducerEndpointImpl::IsShmemProvidedByProducer()
3153     const {
3154   return is_shmem_provided_by_producer_;
3155 }
3156 
3157 // Can be called on any thread.
3158 std::unique_ptr<TraceWriter>
CreateTraceWriter(BufferID buf_id,BufferExhaustedPolicy buffer_exhausted_policy)3159 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(
3160     BufferID buf_id,
3161     BufferExhaustedPolicy buffer_exhausted_policy) {
3162   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
3163   return MaybeSharedMemoryArbiter()->CreateTraceWriter(buf_id,
3164                                                        buffer_exhausted_policy);
3165 }
3166 
NotifyFlushComplete(FlushRequestID id)3167 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
3168     FlushRequestID id) {
3169   PERFETTO_DCHECK_THREAD(thread_checker_);
3170   PERFETTO_DCHECK(MaybeSharedMemoryArbiter());
3171   return MaybeSharedMemoryArbiter()->NotifyFlushComplete(id);
3172 }
3173 
OnTracingSetup()3174 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
3175   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3176   task_runner_->PostTask([weak_this] {
3177     if (weak_this)
3178       weak_this->producer_->OnTracingSetup();
3179   });
3180 }
3181 
Flush(FlushRequestID flush_request_id,const std::vector<DataSourceInstanceID> & data_sources)3182 void TracingServiceImpl::ProducerEndpointImpl::Flush(
3183     FlushRequestID flush_request_id,
3184     const std::vector<DataSourceInstanceID>& data_sources) {
3185   PERFETTO_DCHECK_THREAD(thread_checker_);
3186   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3187   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
3188     if (weak_this) {
3189       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
3190                                   data_sources.size());
3191     }
3192   });
3193 }
3194 
SetupDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)3195 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
3196     DataSourceInstanceID ds_id,
3197     const DataSourceConfig& config) {
3198   PERFETTO_DCHECK_THREAD(thread_checker_);
3199   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
3200   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3201   task_runner_->PostTask([weak_this, ds_id, config] {
3202     if (weak_this)
3203       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
3204   });
3205 }
3206 
StartDataSource(DataSourceInstanceID ds_id,const DataSourceConfig & config)3207 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
3208     DataSourceInstanceID ds_id,
3209     const DataSourceConfig& config) {
3210   PERFETTO_DCHECK_THREAD(thread_checker_);
3211   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3212   task_runner_->PostTask([weak_this, ds_id, config] {
3213     if (weak_this)
3214       weak_this->producer_->StartDataSource(ds_id, std::move(config));
3215   });
3216 }
3217 
NotifyDataSourceStarted(DataSourceInstanceID data_source_id)3218 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
3219     DataSourceInstanceID data_source_id) {
3220   PERFETTO_DCHECK_THREAD(thread_checker_);
3221   service_->NotifyDataSourceStarted(id_, data_source_id);
3222 }
3223 
NotifyDataSourceStopped(DataSourceInstanceID data_source_id)3224 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
3225     DataSourceInstanceID data_source_id) {
3226   PERFETTO_DCHECK_THREAD(thread_checker_);
3227   service_->NotifyDataSourceStopped(id_, data_source_id);
3228 }
3229 
OnFreeBuffers(const std::vector<BufferID> & target_buffers)3230 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
3231     const std::vector<BufferID>& target_buffers) {
3232   if (allowed_target_buffers_.empty())
3233     return;
3234   for (BufferID buffer : target_buffers)
3235     allowed_target_buffers_.erase(buffer);
3236 }
3237 
ClearIncrementalState(const std::vector<DataSourceInstanceID> & data_sources)3238 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
3239     const std::vector<DataSourceInstanceID>& data_sources) {
3240   PERFETTO_DCHECK_THREAD(thread_checker_);
3241   auto weak_this = weak_ptr_factory_.GetWeakPtr();
3242   task_runner_->PostTask([weak_this, data_sources] {
3243     if (weak_this) {
3244       weak_this->producer_->ClearIncrementalState(data_sources.data(),
3245                                                   data_sources.size());
3246     }
3247   });
3248 }
3249 
Sync(std::function<void ()> callback)3250 void TracingServiceImpl::ProducerEndpointImpl::Sync(
3251     std::function<void()> callback) {
3252   task_runner_->PostTask(callback);
3253 }
3254 
3255 ////////////////////////////////////////////////////////////////////////////////
3256 // TracingServiceImpl::TracingSession implementation
3257 ////////////////////////////////////////////////////////////////////////////////
3258 
TracingSession(TracingSessionID session_id,ConsumerEndpointImpl * consumer,const TraceConfig & new_config)3259 TracingServiceImpl::TracingSession::TracingSession(
3260     TracingSessionID session_id,
3261     ConsumerEndpointImpl* consumer,
3262     const TraceConfig& new_config)
3263     : id(session_id),
3264       consumer_maybe_null(consumer),
3265       consumer_uid(consumer->uid_),
3266       config(new_config) {}
3267 
3268 }  // namespace perfetto
3269