• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18 
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
29 #include "src/tracing/core/null_trace_writer.h"
30 #include "src/tracing/core/trace_writer_impl.h"
31 
32 namespace perfetto {
33 
34 using Chunk = SharedMemoryABI::Chunk;
35 
36 namespace {
37 static_assert(sizeof(BufferID) == sizeof(uint16_t),
38               "The MaybeUnboundBufferID logic requires BufferID not to grow "
39               "above uint16_t.");
40 
MakeTargetBufferIdForReservation(uint16_t reservation_id)41 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
42   // Reservation IDs are stored in the upper bits.
43   PERFETTO_CHECK(reservation_id > 0);
44   return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
45 }
46 
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)47 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
48   return (buffer_id >> 16) > 0;
49 }
50 }  // namespace
51 
52 // static
53 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
54     SharedMemoryABI::PageLayout::kPageDiv1;
55 
56 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,ShmemMode mode,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)57 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
58     SharedMemory* shared_memory,
59     size_t page_size,
60     ShmemMode mode,
61     TracingService::ProducerEndpoint* producer_endpoint,
62     base::TaskRunner* task_runner) {
63   return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
64       shared_memory->start(), shared_memory->size(), mode, page_size,
65       producer_endpoint, task_runner));
66 }
67 
68 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size,ShmemMode mode)69 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
70     SharedMemory* shared_memory,
71     size_t page_size,
72     ShmemMode mode) {
73   return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
74       shared_memory->start(), shared_memory->size(), mode, page_size,
75       /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
76 }
77 
SharedMemoryArbiterImpl(void * start,size_t size,ShmemMode mode,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)78 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
79     void* start,
80     size_t size,
81     ShmemMode mode,
82     size_t page_size,
83     TracingService::ProducerEndpoint* producer_endpoint,
84     base::TaskRunner* task_runner)
85     : producer_endpoint_(producer_endpoint),
86       use_shmem_emulation_(mode == ShmemMode::kShmemEmulation),
87       task_runner_(task_runner),
88       shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size, mode),
89       active_writer_ids_(kMaxWriterID),
90       fully_bound_(task_runner && producer_endpoint),
91       was_always_bound_(fully_bound_),
92       weak_ptr_factory_(this) {}
93 
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy)94 Chunk SharedMemoryArbiterImpl::GetNewChunk(
95     const SharedMemoryABI::ChunkHeader& header,
96     BufferExhaustedPolicy buffer_exhausted_policy) {
97   int stall_count = 0;
98   unsigned stall_interval_us = 0;
99   bool task_runner_runs_on_current_thread = false;
100   static const unsigned kMaxStallIntervalUs = 100000;
101   static const int kLogAfterNStalls = 3;
102   static const int kFlushCommitsAfterEveryNStalls = 2;
103   static const int kAssertAtNStalls = 200;
104 
105   for (;;) {
106     // TODO(primiano): Probably this lock is not really required and this code
107     // could be rewritten leveraging only the Try* atomic operations in
108     // SharedMemoryABI. But let's not be too adventurous for the moment.
109     {
110       std::unique_lock<std::mutex> scoped_lock(lock_);
111 
112       // If ever unbound, we do not support stalling. In theory, we could
113       // support stalling for TraceWriters created after the arbiter and startup
114       // buffer reservations were bound, but to avoid raciness between the
115       // creation of startup writers and binding, we categorically forbid kStall
116       // mode.
117       PERFETTO_DCHECK(was_always_bound_ ||
118                       buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
119 
120       task_runner_runs_on_current_thread =
121           task_runner_ && task_runner_->RunsTasksOnCurrentThread();
122 
123       // If more than half of the SMB.size() is filled with completed chunks for
124       // which we haven't notified the service yet (i.e. they are still enqueued
125       // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
126       // we acquire a chunk, to reduce the likeliness of stalling the writer.
127       //
128       // We can only do this if we're writing on the same thread that we access
129       // the producer endpoint on, since we cannot notify the producer endpoint
130       // to commit synchronously on a different thread. Attempting to flush
131       // synchronously on another thread will lead to subtle bugs caused by
132       // out-of-order commit requests (crbug.com/919187#c28).
133       bool should_commit_synchronously =
134           task_runner_runs_on_current_thread &&
135           buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
136           commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
137 
138       const size_t initial_page_idx = page_idx_;
139       for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
140         page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
141         bool is_new_page = false;
142 
143         // TODO(primiano): make the page layout dynamic.
144         auto layout = SharedMemoryArbiterImpl::default_page_layout;
145 
146         if (shmem_abi_.is_page_free(page_idx_)) {
147           is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
148         }
149         uint32_t free_chunks;
150         if (is_new_page) {
151           free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
152         } else {
153           free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
154         }
155 
156         for (uint32_t chunk_idx = 0; free_chunks;
157              chunk_idx++, free_chunks >>= 1) {
158           if (!(free_chunks & 1))
159             continue;
160           // We found a free chunk.
161           Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
162               page_idx_, chunk_idx, &header);
163           if (!chunk.is_valid())
164             continue;
165           if (stall_count > kLogAfterNStalls) {
166             PERFETTO_LOG("Recovered from stall after %d iterations",
167                          stall_count);
168           }
169 
170           if (should_commit_synchronously) {
171             // We can't flush while holding the lock.
172             scoped_lock.unlock();
173             FlushPendingCommitDataRequests();
174             return chunk;
175           } else {
176             return chunk;
177           }
178         }
179       }
180     }  // scoped_lock
181 
182     if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
183       PERFETTO_DLOG("Shared memory buffer exhausted, returning invalid Chunk!");
184       return Chunk();
185     }
186 
187     // Stalling is not supported if we were ever unbound (see earlier comment).
188     PERFETTO_CHECK(was_always_bound_);
189 
190     // All chunks are taken (either kBeingWritten by us or kBeingRead by the
191     // Service).
192     if (stall_count++ == kLogAfterNStalls) {
193       PERFETTO_LOG("Shared memory buffer overrun! Stalling");
194     }
195 
196     if (stall_count == kAssertAtNStalls) {
197       Stats stats = GetStats();
198       PERFETTO_FATAL(
199           "Shared memory buffer max stall count exceeded; possible deadlock "
200           "free=%zu bw=%zu br=%zu comp=%zu pages_free=%zu pages_err=%zu",
201           stats.chunks_free, stats.chunks_being_written,
202           stats.chunks_being_read, stats.chunks_complete, stats.pages_free,
203           stats.pages_unexpected);
204     }
205 
206     // If the IPC thread itself is stalled because the current process has
207     // filled up the SMB, we need to make sure that the service can process and
208     // purge the chunks written by our process, by flushing any pending commit
209     // requests. Because other threads in our process can continue to
210     // concurrently grab, fill and commit any chunks purged by the service, it
211     // is possible that the SMB remains full and the IPC thread remains stalled,
212     // needing to flush the concurrently queued up commits again. This is
213     // particularly likely with in-process perfetto service where the IPC thread
214     // is the service thread. To avoid remaining stalled forever in such a
215     // situation, we attempt to flush periodically after every N stalls.
216     if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
217         task_runner_runs_on_current_thread) {
218       // TODO(primiano): sending the IPC synchronously is a temporary workaround
219       // until the backpressure logic in probes_producer is sorted out. Until
220       // then the risk is that we stall the message loop waiting for the tracing
221       // service to consume the shared memory buffer (SMB) and, for this reason,
222       // never run the task that tells the service to purge the SMB. This must
223       // happen iff we are on the IPC thread, not doing this will cause
224       // deadlocks, doing this on the wrong thread causes out-of-order data
225       // commits (crbug.com/919187#c28).
226       FlushPendingCommitDataRequests();
227     } else {
228       base::SleepMicroseconds(stall_interval_us);
229       stall_interval_us =
230           std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
231     }
232   }
233 }
234 
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)235 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
236     Chunk chunk,
237     MaybeUnboundBufferID target_buffer,
238     PatchList* patch_list) {
239   PERFETTO_DCHECK(chunk.is_valid());
240   const WriterID writer_id = chunk.writer_id();
241   UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
242                           patch_list);
243 }
244 
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)245 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
246                                           MaybeUnboundBufferID target_buffer,
247                                           PatchList* patch_list) {
248   PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
249   UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
250 }
251 
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)252 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
253     Chunk chunk,
254     WriterID writer_id,
255     MaybeUnboundBufferID target_buffer,
256     PatchList* patch_list) {
257   // Note: chunk will be invalid if the call came from SendPatches().
258   base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
259   // The delay with which the flush will be posted.
260   uint32_t flush_delay_ms = 0;
261   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
262   {
263     std::unique_lock<std::mutex> scoped_lock(lock_);
264 
265     if (!commit_data_req_) {
266       commit_data_req_.reset(new CommitDataRequest());
267 
268       // Flushing the commit is only supported while we're |fully_bound_|. If we
269       // aren't, we'll flush when |fully_bound_| is updated.
270       if (fully_bound_ && !delayed_flush_scheduled_) {
271         weak_this = weak_ptr_factory_.GetWeakPtr();
272         task_runner_to_post_delayed_callback_on = task_runner_;
273         flush_delay_ms = batch_commits_duration_ms_;
274         delayed_flush_scheduled_ = true;
275       }
276     }
277 
278     CommitDataRequest::ChunksToMove* ctm = nullptr;  // Set if chunk is valid.
279     // If a valid chunk is specified, return it and attach it to the request.
280     if (chunk.is_valid()) {
281       PERFETTO_DCHECK(chunk.writer_id() == writer_id);
282       uint8_t chunk_idx = chunk.chunk_idx();
283       bytes_pending_commit_ += chunk.size();
284       size_t page_idx;
285 
286       ctm = commit_data_req_->add_chunks_to_move();
287       // If the chunk needs patching, it should not be marked as complete yet,
288       // because this would indicate to the service that the producer will not
289       // be writing to it anymore, while the producer might still apply patches
290       // to the chunk later on. In particular, when re-reading (e.g. because of
291       // periodic scraping) a completed chunk, the service expects the flags of
292       // that chunk not to be removed between reads. So, let's say the producer
293       // marked the chunk as complete here and the service then read it for the
294       // first time. If the producer then fully patched the chunk, thus removing
295       // the kChunkNeedsPatching flag, and the service re-read the chunk after
296       // the patching, the service would be thrown off by the removed flag.
297       if (direct_patching_enabled_ &&
298           (chunk.GetPacketCountAndFlags().second &
299            SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
300         page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
301       } else {
302         // If the chunk doesn't need patching, we can mark it as complete
303         // immediately. This allows the service to read it in full while
304         // scraping, which would not be the case if the chunk was left in a
305         // kChunkBeingWritten state.
306         page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
307       }
308 
309       // DO NOT access |chunk| after this point, it has been std::move()-d
310       // above.
311       ctm->set_page(static_cast<uint32_t>(page_idx));
312       ctm->set_chunk(chunk_idx);
313       ctm->set_target_buffer(target_buffer);
314     }
315 
316     // Process the completed patches for previous chunks from the |patch_list|.
317     CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
318     while (!patch_list->empty() && patch_list->front().is_patched()) {
319       Patch curr_patch = patch_list->front();
320       patch_list->pop_front();
321       // Patches for the same chunk are contiguous in the |patch_list|. So, to
322       // determine if there are any other patches that apply to the chunk that
323       // is being patched, check if the next patch in the |patch_list| applies
324       // to the same chunk.
325       bool chunk_needs_more_patching =
326           !patch_list->empty() &&
327           patch_list->front().chunk_id == curr_patch.chunk_id;
328 
329       if (direct_patching_enabled_ &&
330           TryDirectPatchLocked(writer_id, curr_patch,
331                                chunk_needs_more_patching)) {
332         continue;
333       }
334 
335       // The chunk that this patch applies to has already been released to the
336       // service, so it cannot be patches here. Add the patch to the commit data
337       // request, so that it can be sent to the service and applied there.
338       if (!last_patch_req ||
339           last_patch_req->chunk_id() != curr_patch.chunk_id) {
340         last_patch_req = commit_data_req_->add_chunks_to_patch();
341         last_patch_req->set_writer_id(writer_id);
342         last_patch_req->set_chunk_id(curr_patch.chunk_id);
343         last_patch_req->set_target_buffer(target_buffer);
344       }
345       auto* patch = last_patch_req->add_patches();
346       patch->set_offset(curr_patch.offset);
347       patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
348     }
349 
350     // Patches are enqueued in the |patch_list| in order and are notified to
351     // the service when the chunk is returned. The only case when the current
352     // patch list is incomplete is if there is an unpatched entry at the head of
353     // the |patch_list| that belongs to the same ChunkID as the last one we are
354     // about to send to the service.
355     if (last_patch_req && !patch_list->empty() &&
356         patch_list->front().chunk_id == last_patch_req->chunk_id()) {
357       last_patch_req->set_has_more_patches(true);
358     }
359 
360     // If the buffer is filling up or if we are given a patch for a chunk
361     // that was already sent to the service, we don't want to wait for the next
362     // delayed flush to happen and we flush immediately. Otherwise, if we
363     // accumulate the patch and a crash occurs before the patch is sent, the
364     // service will not know of the patch and won't be able to reconstruct the
365     // trace.
366     if (fully_bound_ &&
367         (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
368       weak_this = weak_ptr_factory_.GetWeakPtr();
369       task_runner_to_post_delayed_callback_on = task_runner_;
370       flush_delay_ms = 0;
371     }
372 
373     // When using shmem emulation we commit the completed chunks immediately
374     // to prevent the |bytes_pending_commit_| to become greater than the size
375     // of the IPC buffer, since the chunk's data must be passed in the commit
376     // data request proto through the network socket. Not doing so could
377     // result in a "IPC Frame too large" issue on the host traced side.
378     if (fully_bound_ && use_shmem_emulation_) {
379       if (task_runner_->RunsTasksOnCurrentThread()) {
380         task_runner_to_post_delayed_callback_on = nullptr;
381         // Allow next call to UpdateCommitDataRequest to start
382         // another batching period.
383         delayed_flush_scheduled_ = false;
384         // We can't flush while holding the lock
385         scoped_lock.unlock();
386         FlushPendingCommitDataRequests();
387       } else {
388         // Since we aren't on the |task_runner_| thread post a task instead,
389         // in order to prevent non-overlaping commit data request flushes.
390         weak_this = weak_ptr_factory_.GetWeakPtr();
391         task_runner_to_post_delayed_callback_on = task_runner_;
392         flush_delay_ms = 0;
393       }
394     }
395   }  // scoped_lock(lock_)
396 
397   // We shouldn't post tasks while locked.
398   // |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
399   // because |task_runner_| is never reset.
400   if (task_runner_to_post_delayed_callback_on) {
401     task_runner_to_post_delayed_callback_on->PostDelayedTask(
402         [weak_this] {
403           if (!weak_this)
404             return;
405           {
406             std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
407             // Clear |delayed_flush_scheduled_|, allowing the next call to
408             // UpdateCommitDataRequest to start another batching period.
409             weak_this->delayed_flush_scheduled_ = false;
410           }
411           weak_this->FlushPendingCommitDataRequests();
412         },
413         flush_delay_ms);
414   }
415 }
416 
TryDirectPatchLocked(WriterID writer_id,const Patch & patch,bool chunk_needs_more_patching)417 bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
418     WriterID writer_id,
419     const Patch& patch,
420     bool chunk_needs_more_patching) {
421   // Search the chunks that are being batched in |commit_data_req_| for a chunk
422   // that needs patching and that matches the provided |writer_id| and
423   // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
424   // |commit_data_req_| is appended to at the end with newly-returned chunks,
425   // and patches are more likely to apply to chunks that have been returned
426   // recently.
427   SharedMemoryABI::Chunk chunk;
428   bool chunk_found = false;
429   auto& chunks_to_move = commit_data_req_->chunks_to_move();
430   for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
431        ++ctm_it) {
432     uint32_t header_bitmap = shmem_abi_.GetPageHeaderBitmap(ctm_it->page());
433     auto chunk_state = shmem_abi_.GetChunkStateFromHeaderBitmap(
434         header_bitmap, ctm_it->chunk());
435     // Note: the subset of |commit_data_req_| chunks that still need patching is
436     // also the subset of chunks that are still being written to. The rest of
437     // the chunks in |commit_data_req_| do not need patching and have already
438     // been marked as complete.
439     if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
440       continue;
441 
442     chunk = shmem_abi_.GetChunkUnchecked(ctm_it->page(), header_bitmap,
443                                          ctm_it->chunk());
444     if (chunk.writer_id() == writer_id &&
445         chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
446             patch.chunk_id) {
447       chunk_found = true;
448       break;
449     }
450   }
451 
452   if (!chunk_found) {
453     // The chunk has already been committed to the service and the patch cannot
454     // be applied in the producer.
455     return false;
456   }
457 
458   // Apply the patch.
459   size_t page_idx;
460   uint8_t chunk_idx;
461   std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
462   PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
463                   SharedMemoryABI::ChunkState::kChunkBeingWritten);
464   auto chunk_begin = chunk.payload_begin();
465   uint8_t* ptr = chunk_begin + patch.offset;
466   PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
467   // DCHECK that we are writing into a zero-filled size field and not into
468   // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
469   // zero-fill reservations in debug builds.
470   const char zero[SharedMemoryABI::kPacketHeaderSize]{};
471   PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
472 
473   memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
474 
475   if (!chunk_needs_more_patching) {
476     // Mark that the chunk doesn't need more patching and mark it as complete,
477     // as the producer will not write to it anymore. This allows the service to
478     // read the chunk in full while scraping, which would not be the case if the
479     // chunk was left in a kChunkBeingWritten state.
480     chunk.ClearNeedsPatchingFlag();
481     shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
482   }
483 
484   return true;
485 }
486 
SetBatchCommitsDuration(uint32_t batch_commits_duration_ms)487 void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
488     uint32_t batch_commits_duration_ms) {
489   std::lock_guard<std::mutex> scoped_lock(lock_);
490   batch_commits_duration_ms_ = batch_commits_duration_ms;
491 }
492 
EnableDirectSMBPatching()493 bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
494   std::lock_guard<std::mutex> scoped_lock(lock_);
495   if (!direct_patching_supported_by_service_) {
496     return false;
497   }
498 
499   return direct_patching_enabled_ = true;
500 }
501 
SetDirectSMBPatchingSupportedByService()502 void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
503   std::lock_guard<std::mutex> scoped_lock(lock_);
504   direct_patching_supported_by_service_ = true;
505 }
506 
507 // This function is quite subtle. When making changes keep in mind these two
508 // challenges:
509 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
510 //    thread (or, for in-process cases, on the same thread where
511 //    TracingServiceImpl lives), the CommitData() call must be synchronous and
512 //    not posted, to avoid deadlocks.
513 // 2) When different threads hit this function, we must guarantee that we don't
514 //    accidentally make commits out of order. See commit 4e4fe8f56ef and
515 //    crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)516 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
517     std::function<void()> callback) {
518   std::unique_ptr<CommitDataRequest> req;
519   {
520     std::unique_lock<std::mutex> scoped_lock(lock_);
521 
522     // Flushing is only supported while |fully_bound_|, and there may still be
523     // unbound startup trace writers. If so, skip the commit for now - it'll be
524     // done when |fully_bound_| is updated.
525     if (!fully_bound_) {
526       if (callback)
527         pending_flush_callbacks_.push_back(callback);
528       return;
529     }
530 
531     // May be called by TraceWriterImpl on any thread.
532     base::TaskRunner* task_runner = task_runner_;
533     if (!task_runner->RunsTasksOnCurrentThread()) {
534       // We shouldn't post a task while holding a lock. |task_runner| remains
535       // valid after unlocking, because |task_runner_| is never reset.
536       scoped_lock.unlock();
537 
538       auto weak_this = weak_ptr_factory_.GetWeakPtr();
539       task_runner->PostTask([weak_this, callback] {
540         if (weak_this)
541           weak_this->FlushPendingCommitDataRequests(std::move(callback));
542       });
543       return;
544     }
545 
546     // |commit_data_req_| could have become a nullptr, for example when a forced
547     // sync flush happens in GetNewChunk().
548     if (commit_data_req_) {
549       // Make sure any placeholder buffer IDs from StartupWriters are replaced
550       // before sending the request.
551       bool all_placeholders_replaced =
552           ReplaceCommitPlaceholderBufferIdsLocked();
553       // We're |fully_bound_|, thus all writers are bound and all placeholders
554       // should have been replaced.
555       PERFETTO_DCHECK(all_placeholders_replaced);
556 
557       // In order to allow patching in the producer we delay the kChunkComplete
558       // transition and keep batched chunks in the kChunkBeingWritten state.
559       // Since we are about to notify the service of all batched chunks, it will
560       // not be possible to apply any more patches to them and we need to move
561       // them to kChunkComplete - otherwise the service won't look at them.
562       for (auto& ctm : *commit_data_req_->mutable_chunks_to_move()) {
563         uint32_t header_bitmap = shmem_abi_.GetPageHeaderBitmap(ctm.page());
564         auto chunk_state = shmem_abi_.GetChunkStateFromHeaderBitmap(
565             header_bitmap, ctm.chunk());
566         // Note: the subset of |commit_data_req_| chunks that still need
567         // patching is also the subset of chunks that are still being written
568         // to. The rest of the chunks in |commit_data_req_| do not need patching
569         // and have already been marked as complete.
570         if (chunk_state == SharedMemoryABI::kChunkBeingWritten) {
571           auto chunk = shmem_abi_.GetChunkUnchecked(ctm.page(), header_bitmap,
572                                                     ctm.chunk());
573           shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
574         }
575 
576         if (use_shmem_emulation_) {
577           // When running in the emulation mode:
578           // 1. serialize the chunk data to |ctm| as we won't modify the chunk
579           // anymore.
580           // 2. free the chunk as the service won't be able to do this.
581           auto chunk = shmem_abi_.GetChunkUnchecked(ctm.page(), header_bitmap,
582                                                     ctm.chunk());
583           PERFETTO_CHECK(chunk.is_valid());
584           ctm.set_data(chunk.begin(), chunk.size());
585           shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
586         }
587       }
588 
589       req = std::move(commit_data_req_);
590       bytes_pending_commit_ = 0;
591     }
592   }  // scoped_lock
593 
594   if (req) {
595     producer_endpoint_->CommitData(*req, callback);
596   } else if (callback) {
597     // If |req| was nullptr, it means that an enqueued deferred commit was
598     // executed just before this. At this point send an empty commit request
599     // to the service, just to linearize with it and give the guarantee to the
600     // caller that the data has been flushed into the service.
601     producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
602   }
603 }
604 
TryShutdown()605 bool SharedMemoryArbiterImpl::TryShutdown() {
606   std::lock_guard<std::mutex> scoped_lock(lock_);
607   did_shutdown_ = true;
608   // Shutdown is safe if there are no active trace writers for this arbiter.
609   return active_writer_ids_.IsEmpty();
610 }
611 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)612 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
613     BufferID target_buffer,
614     BufferExhaustedPolicy buffer_exhausted_policy) {
615   PERFETTO_CHECK(target_buffer > 0);
616   return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
617 }
618 
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)619 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
620     uint16_t target_buffer_reservation_id) {
621   return CreateTraceWriterInternal(
622       MakeTargetBufferIdForReservation(target_buffer_reservation_id),
623       BufferExhaustedPolicy::kDrop);
624 }
625 
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)626 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
627     TracingService::ProducerEndpoint* producer_endpoint,
628     base::TaskRunner* task_runner) {
629   PERFETTO_DCHECK(producer_endpoint && task_runner);
630   PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
631 
632   bool should_flush = false;
633   std::function<void()> flush_callback;
634   {
635     std::lock_guard<std::mutex> scoped_lock(lock_);
636     PERFETTO_CHECK(!fully_bound_);
637     PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
638 
639     producer_endpoint_ = producer_endpoint;
640     task_runner_ = task_runner;
641 
642     // Now that we're bound to a task runner, also reset the WeakPtrFactory to
643     // it. Because this code runs on the task runner, the factory's weak
644     // pointers will be valid on it.
645     weak_ptr_factory_.Reset(this);
646 
647     // All writers registered so far should be startup trace writers, since
648     // the producer cannot feasibly know the target buffer for any future
649     // session yet.
650     for (const auto& entry : pending_writers_) {
651       PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
652     }
653 
654     // If all buffer reservations are bound, we can flush pending commits.
655     if (UpdateFullyBoundLocked()) {
656       should_flush = true;
657       flush_callback = TakePendingFlushCallbacksLocked();
658     }
659   }  // scoped_lock
660 
661   // Attempt to flush any pending commits (and run pending flush callbacks). If
662   // there are none, this will have no effect. If we ended up in a race that
663   // changed |fully_bound_| back to false, the commit will happen once we become
664   // |fully_bound_| again.
665   if (should_flush)
666     FlushPendingCommitDataRequests(flush_callback);
667 }
668 
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)669 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
670     uint16_t target_buffer_reservation_id,
671     BufferID target_buffer_id) {
672   PERFETTO_DCHECK(target_buffer_id > 0);
673 
674   std::unique_lock<std::mutex> scoped_lock(lock_);
675 
676   // We should already be bound to an endpoint.
677   PERFETTO_CHECK(producer_endpoint_);
678   PERFETTO_CHECK(task_runner_);
679   PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
680 
681   BindStartupTargetBufferImpl(std::move(scoped_lock),
682                               target_buffer_reservation_id, target_buffer_id);
683 }
684 
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)685 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
686     uint16_t target_buffer_reservation_id) {
687   std::unique_lock<std::mutex> scoped_lock(lock_);
688 
689   // If we are already bound to an arbiter, we may need to flush after aborting
690   // the session, and thus should be running on the arbiter's task runner.
691   if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
692     // We shouldn't post tasks while locked.
693     auto* task_runner = task_runner_;
694     scoped_lock.unlock();
695 
696     auto weak_this = weak_ptr_factory_.GetWeakPtr();
697     task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
698       if (!weak_this)
699         return;
700       weak_this->AbortStartupTracingForReservation(
701           target_buffer_reservation_id);
702     });
703     return;
704   }
705 
706   // Bind the target buffer reservation to an invalid buffer (ID 0), so that
707   // existing commits, as well as future commits (of currently acquired chunks),
708   // will be released as free free by the service but otherwise ignored (i.e.
709   // not copied into any valid target buffer).
710   BindStartupTargetBufferImpl(std::move(scoped_lock),
711                               target_buffer_reservation_id,
712                               /*target_buffer_id=*/kInvalidBufferId);
713 }
714 
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)715 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
716     std::unique_lock<std::mutex> scoped_lock,
717     uint16_t target_buffer_reservation_id,
718     BufferID target_buffer_id) {
719   // We should already be bound to an endpoint if the target buffer is valid.
720   PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
721                   target_buffer_id == kInvalidBufferId);
722 
723   PERFETTO_DLOG("Binding startup target buffer reservation %" PRIu16
724                 " to buffer %" PRIu16,
725                 target_buffer_reservation_id, target_buffer_id);
726 
727   MaybeUnboundBufferID reserved_id =
728       MakeTargetBufferIdForReservation(target_buffer_reservation_id);
729 
730   bool should_flush = false;
731   std::function<void()> flush_callback;
732   std::vector<std::pair<WriterID, BufferID>> writers_to_register;
733 
734   TargetBufferReservation& reservation =
735       target_buffer_reservations_[reserved_id];
736   PERFETTO_CHECK(!reservation.resolved);
737   reservation.resolved = true;
738   reservation.target_buffer = target_buffer_id;
739 
740   // Collect trace writers associated with the reservation.
741   for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
742     if (it->second == reserved_id) {
743       // No need to register writers that have an invalid target buffer.
744       if (target_buffer_id != kInvalidBufferId) {
745         writers_to_register.push_back(
746             std::make_pair(it->first, target_buffer_id));
747       }
748       it = pending_writers_.erase(it);
749     } else {
750       it++;
751     }
752   }
753 
754   // If all buffer reservations are bound, we can flush pending commits.
755   if (UpdateFullyBoundLocked()) {
756     should_flush = true;
757     flush_callback = TakePendingFlushCallbacksLocked();
758   }
759 
760   scoped_lock.unlock();
761 
762   // Register any newly bound trace writers with the service.
763   for (const auto& writer_and_target_buffer : writers_to_register) {
764     producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
765                                             writer_and_target_buffer.second);
766   }
767 
768   // Attempt to flush any pending commits (and run pending flush callbacks). If
769   // there are none, this will have no effect. If we ended up in a race that
770   // changed |fully_bound_| back to false, the commit will happen once we become
771   // |fully_bound_| again.
772   if (should_flush)
773     FlushPendingCommitDataRequests(flush_callback);
774 }
775 
GetStats()776 SharedMemoryArbiterImpl::Stats SharedMemoryArbiterImpl::GetStats() {
777   std::lock_guard<std::mutex> scoped_lock(lock_);
778   Stats res;
779 
780   for (size_t page_idx = 0; page_idx < shmem_abi_.num_pages(); page_idx++) {
781     uint32_t bitmap = shmem_abi_.page_header(page_idx)->header_bitmap.load(
782         std::memory_order_relaxed);
783     SharedMemoryABI::PageLayout layout =
784         SharedMemoryABI::GetLayoutFromHeaderBitmap(bitmap);
785     if (layout == SharedMemoryABI::kPageNotPartitioned) {
786       res.pages_free++;
787     } else if (layout == SharedMemoryABI::kPageDivReserved1 ||
788                layout == SharedMemoryABI::kPageDivReserved2) {
789       res.pages_unexpected++;
790     }
791     // Free and unexpected pages have zero chunks.
792     const uint32_t num_chunks =
793         SharedMemoryABI::GetNumChunksFromHeaderBitmap(bitmap);
794     for (uint32_t i = 0; i < num_chunks; i++) {
795       switch (SharedMemoryABI::GetChunkStateFromHeaderBitmap(bitmap, i)) {
796         case SharedMemoryABI::kChunkFree:
797           res.chunks_free++;
798           break;
799         case SharedMemoryABI::kChunkBeingWritten:
800           res.chunks_being_written++;
801           break;
802         case SharedMemoryABI::kChunkBeingRead:
803           res.chunks_being_read++;
804           break;
805         case SharedMemoryABI::kChunkComplete:
806           res.chunks_complete++;
807           break;
808       }
809     }
810   }
811 
812   return res;
813 }
814 
815 std::function<void()>
TakePendingFlushCallbacksLocked()816 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
817   if (pending_flush_callbacks_.empty())
818     return std::function<void()>();
819 
820   std::vector<std::function<void()>> pending_flush_callbacks;
821   pending_flush_callbacks.swap(pending_flush_callbacks_);
822   // Capture the callback list into the lambda by copy.
823   return [pending_flush_callbacks]() {
824     for (auto& callback : pending_flush_callbacks)
825       callback();
826   };
827 }
828 
NotifyFlushComplete(FlushRequestID req_id)829 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
830   base::TaskRunner* task_runner_to_commit_on = nullptr;
831 
832   {
833     std::lock_guard<std::mutex> scoped_lock(lock_);
834     // If a commit_data_req_ exists it means that somebody else already posted a
835     // FlushPendingCommitDataRequests() task.
836     if (!commit_data_req_) {
837       commit_data_req_.reset(new CommitDataRequest());
838 
839       // Flushing the commit is only supported while we're |fully_bound_|. If we
840       // aren't, we'll flush when |fully_bound_| is updated.
841       if (fully_bound_)
842         task_runner_to_commit_on = task_runner_;
843     } else {
844       // If there is another request queued and that also contains is a reply
845       // to a flush request, reply with the highest id.
846       req_id = std::max(req_id, commit_data_req_->flush_request_id());
847     }
848     commit_data_req_->set_flush_request_id(req_id);
849   }  // scoped_lock
850 
851   // We shouldn't post tasks while locked. |task_runner_to_commit_on|
852   // remains valid after unlocking, because |task_runner_| is never reset.
853   if (task_runner_to_commit_on) {
854     auto weak_this = weak_ptr_factory_.GetWeakPtr();
855     task_runner_to_commit_on->PostTask([weak_this] {
856       if (weak_this)
857         weak_this->FlushPendingCommitDataRequests();
858     });
859   }
860 }
861 
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)862 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
863     MaybeUnboundBufferID target_buffer,
864     BufferExhaustedPolicy buffer_exhausted_policy) {
865   WriterID id;
866   base::TaskRunner* task_runner_to_register_on = nullptr;
867 
868   {
869     std::lock_guard<std::mutex> scoped_lock(lock_);
870     if (did_shutdown_)
871       return std::unique_ptr<TraceWriter>(new NullTraceWriter());
872 
873     id = active_writer_ids_.Allocate();
874     if (!id)
875       return std::unique_ptr<TraceWriter>(new NullTraceWriter());
876 
877     PERFETTO_DCHECK(!pending_writers_.count(id));
878 
879     if (IsReservationTargetBufferId(target_buffer)) {
880       // If the reservation is new, mark it as unbound in
881       // |target_buffer_reservations_|. Otherwise, if the reservation was
882       // already bound, choose the bound buffer ID now.
883       auto it_and_inserted = target_buffer_reservations_.insert(
884           {target_buffer, TargetBufferReservation()});
885       if (it_and_inserted.first->second.resolved)
886         target_buffer = it_and_inserted.first->second.target_buffer;
887     }
888 
889     if (IsReservationTargetBufferId(target_buffer)) {
890       // The arbiter and/or startup buffer reservations are not bound yet, so
891       // buffer the registration of the writer until after we're bound.
892       pending_writers_[id] = target_buffer;
893 
894       // Mark the arbiter as not fully bound, since we now have at least one
895       // unbound trace writer / target buffer reservation.
896       fully_bound_ = false;
897       was_always_bound_ = false;
898     } else if (target_buffer != kInvalidBufferId) {
899       // Trace writer is bound, so arbiter should be bound to an endpoint, too.
900       PERFETTO_CHECK(producer_endpoint_ && task_runner_);
901       task_runner_to_register_on = task_runner_;
902     }
903 
904     // All trace writers must use kDrop policy if the arbiter ever becomes
905     // unbound.
906     bool uses_drop_policy =
907         buffer_exhausted_policy == BufferExhaustedPolicy::kDrop;
908     all_writers_have_drop_policy_ &= uses_drop_policy;
909     PERFETTO_DCHECK(fully_bound_ || uses_drop_policy);
910     PERFETTO_CHECK(fully_bound_ || all_writers_have_drop_policy_);
911     PERFETTO_CHECK(was_always_bound_ || uses_drop_policy);
912   }  // scoped_lock
913 
914   // We shouldn't post tasks while locked. |task_runner_to_register_on|
915   // remains valid after unlocking, because |task_runner_| is never reset.
916   if (task_runner_to_register_on) {
917     auto weak_this = weak_ptr_factory_.GetWeakPtr();
918     task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
919       if (weak_this)
920         weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
921     });
922   }
923 
924   return std::unique_ptr<TraceWriter>(
925       new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
926 }
927 
ReleaseWriterID(WriterID id)928 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
929   base::TaskRunner* task_runner = nullptr;
930   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
931   {
932     std::lock_guard<std::mutex> scoped_lock(lock_);
933     active_writer_ids_.Free(id);
934 
935     auto it = pending_writers_.find(id);
936     if (it != pending_writers_.end()) {
937       // Writer hasn't been bound yet and thus also not yet registered with the
938       // service.
939       pending_writers_.erase(it);
940       return;
941     }
942 
943     // A trace writer from an aborted session may be destroyed before the
944     // arbiter is bound to a task runner. In that case, it was never registered
945     // with the service.
946     if (!task_runner_)
947       return;
948 
949     // If `active_writer_ids_` is empty, `TryShutdown()` can return true
950     // and `*this` can be deleted. Let's grab everything we need from `*this`
951     // before releasing the lock.
952     weak_this = weak_ptr_factory_.GetWeakPtr();
953     task_runner = task_runner_;
954   }  // scoped_lock
955 
956   // We shouldn't post tasks while locked. |task_runner| remains valid after
957   // unlocking, because |task_runner_| is never reset.
958   task_runner->PostTask([weak_this, id] {
959     if (weak_this)
960       weak_this->producer_endpoint_->UnregisterTraceWriter(id);
961   });
962 }
963 
ReplaceCommitPlaceholderBufferIdsLocked()964 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
965   if (!commit_data_req_)
966     return true;
967 
968   bool all_placeholders_replaced = true;
969   for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
970     if (!IsReservationTargetBufferId(chunk.target_buffer()))
971       continue;
972     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
973     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
974     if (!it->second.resolved) {
975       all_placeholders_replaced = false;
976       continue;
977     }
978     chunk.set_target_buffer(it->second.target_buffer);
979   }
980   for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
981     if (!IsReservationTargetBufferId(chunk.target_buffer()))
982       continue;
983     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
984     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
985     if (!it->second.resolved) {
986       all_placeholders_replaced = false;
987       continue;
988     }
989     chunk.set_target_buffer(it->second.target_buffer);
990   }
991   return all_placeholders_replaced;
992 }
993 
UpdateFullyBoundLocked()994 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
995   if (!producer_endpoint_) {
996     PERFETTO_DCHECK(!fully_bound_);
997     return false;
998   }
999   // We're fully bound if all target buffer reservations have a valid associated
1000   // BufferID.
1001   fully_bound_ = std::none_of(
1002       target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
1003       [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
1004         return !entry.second.resolved;
1005       });
1006   if (!fully_bound_)
1007     was_always_bound_ = false;
1008   return fully_bound_;
1009 }
1010 
1011 }  // namespace perfetto
1012