• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18 
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "src/tracing/core/null_trace_writer.h"
29 #include "src/tracing/core/trace_writer_impl.h"
30 
31 namespace perfetto {
32 
33 using Chunk = SharedMemoryABI::Chunk;
34 
35 namespace {
36 static_assert(sizeof(BufferID) == sizeof(uint16_t),
37               "The MaybeUnboundBufferID logic requires BufferID not to grow "
38               "above uint16_t.");
39 
MakeTargetBufferIdForReservation(uint16_t reservation_id)40 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
41   // Reservation IDs are stored in the upper bits.
42   PERFETTO_CHECK(reservation_id > 0);
43   return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
44 }
45 
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)46 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
47   return (buffer_id >> 16) > 0;
48 }
49 }  // namespace
50 
51 // static
52 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
53     SharedMemoryABI::PageLayout::kPageDiv1;
54 
55 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)56 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
57     SharedMemory* shared_memory,
58     size_t page_size,
59     TracingService::ProducerEndpoint* producer_endpoint,
60     base::TaskRunner* task_runner) {
61   return std::unique_ptr<SharedMemoryArbiterImpl>(
62       new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
63                                   page_size, producer_endpoint, task_runner));
64 }
65 
66 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size)67 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
68     SharedMemory* shared_memory,
69     size_t page_size) {
70   return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
71       shared_memory->start(), shared_memory->size(), page_size,
72       /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
73 }
74 
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)75 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
76     void* start,
77     size_t size,
78     size_t page_size,
79     TracingService::ProducerEndpoint* producer_endpoint,
80     base::TaskRunner* task_runner)
81     : producer_endpoint_(producer_endpoint),
82       task_runner_(task_runner),
83       shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
84       active_writer_ids_(kMaxWriterID),
85       fully_bound_(task_runner && producer_endpoint),
86       was_always_bound_(fully_bound_),
87       weak_ptr_factory_(this) {}
88 
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy,size_t size_hint)89 Chunk SharedMemoryArbiterImpl::GetNewChunk(
90     const SharedMemoryABI::ChunkHeader& header,
91     BufferExhaustedPolicy buffer_exhausted_policy,
92     size_t size_hint) {
93   PERFETTO_DCHECK(size_hint == 0);  // Not implemented yet.
94 
95   int stall_count = 0;
96   unsigned stall_interval_us = 0;
97   bool task_runner_runs_on_current_thread = false;
98   static const unsigned kMaxStallIntervalUs = 100000;
99   static const int kLogAfterNStalls = 3;
100   static const int kFlushCommitsAfterEveryNStalls = 2;
101   static const int kAssertAtNStalls = 200;
102 
103   for (;;) {
104     // TODO(primiano): Probably this lock is not really required and this code
105     // could be rewritten leveraging only the Try* atomic operations in
106     // SharedMemoryABI. But let's not be too adventurous for the moment.
107     {
108       std::unique_lock<std::mutex> scoped_lock(lock_);
109 
110       // If ever unbound, we do not support stalling. In theory, we could
111       // support stalling for TraceWriters created after the arbiter and startup
112       // buffer reservations were bound, but to avoid raciness between the
113       // creation of startup writers and binding, we categorically forbid kStall
114       // mode.
115       PERFETTO_DCHECK(was_always_bound_ ||
116                       buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
117 
118       task_runner_runs_on_current_thread =
119           task_runner_ && task_runner_->RunsTasksOnCurrentThread();
120 
121       // If more than half of the SMB.size() is filled with completed chunks for
122       // which we haven't notified the service yet (i.e. they are still enqueued
123       // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
124       // we acquire a chunk, to reduce the likeliness of stalling the writer.
125       //
126       // We can only do this if we're writing on the same thread that we access
127       // the producer endpoint on, since we cannot notify the producer endpoint
128       // to commit synchronously on a different thread. Attempting to flush
129       // synchronously on another thread will lead to subtle bugs caused by
130       // out-of-order commit requests (crbug.com/919187#c28).
131       bool should_commit_synchronously =
132           task_runner_runs_on_current_thread &&
133           buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
134           commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
135 
136       const size_t initial_page_idx = page_idx_;
137       for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
138         page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
139         bool is_new_page = false;
140 
141         // TODO(primiano): make the page layout dynamic.
142         auto layout = SharedMemoryArbiterImpl::default_page_layout;
143 
144         if (shmem_abi_.is_page_free(page_idx_)) {
145           // TODO(primiano): Use the |size_hint| here to decide the layout.
146           is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
147         }
148         uint32_t free_chunks;
149         if (is_new_page) {
150           free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
151         } else {
152           free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
153         }
154 
155         for (uint32_t chunk_idx = 0; free_chunks;
156              chunk_idx++, free_chunks >>= 1) {
157           if (!(free_chunks & 1))
158             continue;
159           // We found a free chunk.
160           Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
161               page_idx_, chunk_idx, &header);
162           if (!chunk.is_valid())
163             continue;
164           if (stall_count > kLogAfterNStalls) {
165             PERFETTO_LOG("Recovered from stall after %d iterations",
166                          stall_count);
167           }
168 
169           if (should_commit_synchronously) {
170             // We can't flush while holding the lock.
171             scoped_lock.unlock();
172             FlushPendingCommitDataRequests();
173             return chunk;
174           } else {
175             return chunk;
176           }
177         }
178       }
179     }  // scoped_lock
180 
181     if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
182       PERFETTO_DLOG("Shared memory buffer exhausted, returning invalid Chunk!");
183       return Chunk();
184     }
185 
186     // Stalling is not supported if we were ever unbound (see earlier comment).
187     PERFETTO_CHECK(was_always_bound_);
188 
189     // All chunks are taken (either kBeingWritten by us or kBeingRead by the
190     // Service).
191     if (stall_count++ == kLogAfterNStalls) {
192       PERFETTO_LOG("Shared memory buffer overrun! Stalling");
193     }
194 
195     if (stall_count == kAssertAtNStalls) {
196       PERFETTO_FATAL(
197           "Shared memory buffer max stall count exceeded; possible deadlock");
198     }
199 
200     // If the IPC thread itself is stalled because the current process has
201     // filled up the SMB, we need to make sure that the service can process and
202     // purge the chunks written by our process, by flushing any pending commit
203     // requests. Because other threads in our process can continue to
204     // concurrently grab, fill and commit any chunks purged by the service, it
205     // is possible that the SMB remains full and the IPC thread remains stalled,
206     // needing to flush the concurrently queued up commits again. This is
207     // particularly likely with in-process perfetto service where the IPC thread
208     // is the service thread. To avoid remaining stalled forever in such a
209     // situation, we attempt to flush periodically after every N stalls.
210     if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
211         task_runner_runs_on_current_thread) {
212       // TODO(primiano): sending the IPC synchronously is a temporary workaround
213       // until the backpressure logic in probes_producer is sorted out. Until
214       // then the risk is that we stall the message loop waiting for the tracing
215       // service to consume the shared memory buffer (SMB) and, for this reason,
216       // never run the task that tells the service to purge the SMB. This must
217       // happen iff we are on the IPC thread, not doing this will cause
218       // deadlocks, doing this on the wrong thread causes out-of-order data
219       // commits (crbug.com/919187#c28).
220       FlushPendingCommitDataRequests();
221     } else {
222       base::SleepMicroseconds(stall_interval_us);
223       stall_interval_us =
224           std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
225     }
226   }
227 }
228 
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)229 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
230     Chunk chunk,
231     MaybeUnboundBufferID target_buffer,
232     PatchList* patch_list) {
233   PERFETTO_DCHECK(chunk.is_valid());
234   const WriterID writer_id = chunk.writer_id();
235   UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
236                           patch_list);
237 }
238 
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)239 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
240                                           MaybeUnboundBufferID target_buffer,
241                                           PatchList* patch_list) {
242   PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
243   UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
244 }
245 
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)246 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
247     Chunk chunk,
248     WriterID writer_id,
249     MaybeUnboundBufferID target_buffer,
250     PatchList* patch_list) {
251   // Note: chunk will be invalid if the call came from SendPatches().
252   base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
253   // The delay with which the flush will be posted.
254   uint32_t flush_delay_ms = 0;
255   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
256   {
257     std::lock_guard<std::mutex> scoped_lock(lock_);
258 
259     if (!commit_data_req_) {
260       commit_data_req_.reset(new CommitDataRequest());
261 
262       // Flushing the commit is only supported while we're |fully_bound_|. If we
263       // aren't, we'll flush when |fully_bound_| is updated.
264       if (fully_bound_ && !delayed_flush_scheduled_) {
265         weak_this = weak_ptr_factory_.GetWeakPtr();
266         task_runner_to_post_delayed_callback_on = task_runner_;
267         flush_delay_ms = batch_commits_duration_ms_;
268         delayed_flush_scheduled_ = true;
269       }
270     }
271 
272     // If a valid chunk is specified, return it and attach it to the request.
273     if (chunk.is_valid()) {
274       PERFETTO_DCHECK(chunk.writer_id() == writer_id);
275       uint8_t chunk_idx = chunk.chunk_idx();
276       bytes_pending_commit_ += chunk.size();
277       size_t page_idx;
278       // If the chunk needs patching, it should not be marked as complete yet,
279       // because this would indicate to the service that the producer will not
280       // be writing to it anymore, while the producer might still apply patches
281       // to the chunk later on. In particular, when re-reading (e.g. because of
282       // periodic scraping) a completed chunk, the service expects the flags of
283       // that chunk not to be removed between reads. So, let's say the producer
284       // marked the chunk as complete here and the service then read it for the
285       // first time. If the producer then fully patched the chunk, thus removing
286       // the kChunkNeedsPatching flag, and the service re-read the chunk after
287       // the patching, the service would be thrown off by the removed flag.
288       if (direct_patching_enabled_ &&
289           (chunk.GetPacketCountAndFlags().second &
290            SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
291         page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
292       } else {
293         // If the chunk doesn't need patching, we can mark it as complete
294         // immediately. This allows the service to read it in full while
295         // scraping, which would not be the case if the chunk was left in a
296         // kChunkBeingWritten state.
297         page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
298       }
299 
300       // DO NOT access |chunk| after this point, it has been std::move()-d
301       // above.
302       CommitDataRequest::ChunksToMove* ctm =
303           commit_data_req_->add_chunks_to_move();
304       ctm->set_page(static_cast<uint32_t>(page_idx));
305       ctm->set_chunk(chunk_idx);
306       ctm->set_target_buffer(target_buffer);
307     }
308 
309     // Process the completed patches for previous chunks from the |patch_list|.
310     CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
311     while (!patch_list->empty() && patch_list->front().is_patched()) {
312       Patch curr_patch = patch_list->front();
313       patch_list->pop_front();
314       // Patches for the same chunk are contiguous in the |patch_list|. So, to
315       // determine if there are any other patches that apply to the chunk that
316       // is being patched, check if the next patch in the |patch_list| applies
317       // to the same chunk.
318       bool chunk_needs_more_patching =
319           !patch_list->empty() &&
320           patch_list->front().chunk_id == curr_patch.chunk_id;
321 
322       if (direct_patching_enabled_ &&
323           TryDirectPatchLocked(writer_id, curr_patch,
324                                chunk_needs_more_patching)) {
325         continue;
326       }
327 
328       // The chunk that this patch applies to has already been released to the
329       // service, so it cannot be patches here. Add the patch to the commit data
330       // request, so that it can be sent to the service and applied there.
331       if (!last_patch_req ||
332           last_patch_req->chunk_id() != curr_patch.chunk_id) {
333         last_patch_req = commit_data_req_->add_chunks_to_patch();
334         last_patch_req->set_writer_id(writer_id);
335         last_patch_req->set_chunk_id(curr_patch.chunk_id);
336         last_patch_req->set_target_buffer(target_buffer);
337       }
338       auto* patch = last_patch_req->add_patches();
339       patch->set_offset(curr_patch.offset);
340       patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
341     }
342 
343     // Patches are enqueued in the |patch_list| in order and are notified to
344     // the service when the chunk is returned. The only case when the current
345     // patch list is incomplete is if there is an unpatched entry at the head of
346     // the |patch_list| that belongs to the same ChunkID as the last one we are
347     // about to send to the service.
348     if (last_patch_req && !patch_list->empty() &&
349         patch_list->front().chunk_id == last_patch_req->chunk_id()) {
350       last_patch_req->set_has_more_patches(true);
351     }
352 
353     // If the buffer is filling up or if we are given a patch for a chunk
354     // that was already sent to the service, we don't want to wait for the next
355     // delayed flush to happen and we flush immediately. Otherwise, if we
356     // accumulate the patch and a crash occurs before the patch is sent, the
357     // service will not know of the patch and won't be able to reconstruct the
358     // trace.
359     if (fully_bound_ &&
360         (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
361       weak_this = weak_ptr_factory_.GetWeakPtr();
362       task_runner_to_post_delayed_callback_on = task_runner_;
363       flush_delay_ms = 0;
364     }
365   }  // scoped_lock(lock_)
366 
367   // We shouldn't post tasks while locked.
368   // |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
369   // because |task_runner_| is never reset.
370   if (task_runner_to_post_delayed_callback_on) {
371     task_runner_to_post_delayed_callback_on->PostDelayedTask(
372         [weak_this] {
373           if (!weak_this)
374             return;
375           {
376             std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
377             // Clear |delayed_flush_scheduled_|, allowing the next call to
378             // UpdateCommitDataRequest to start another batching period.
379             weak_this->delayed_flush_scheduled_ = false;
380           }
381           weak_this->FlushPendingCommitDataRequests();
382         },
383         flush_delay_ms);
384   }
385 }
386 
TryDirectPatchLocked(WriterID writer_id,const Patch & patch,bool chunk_needs_more_patching)387 bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
388     WriterID writer_id,
389     const Patch& patch,
390     bool chunk_needs_more_patching) {
391   // Search the chunks that are being batched in |commit_data_req_| for a chunk
392   // that needs patching and that matches the provided |writer_id| and
393   // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
394   // |commit_data_req_| is appended to at the end with newly-returned chunks,
395   // and patches are more likely to apply to chunks that have been returned
396   // recently.
397   SharedMemoryABI::Chunk chunk;
398   bool chunk_found = false;
399   auto& chunks_to_move = commit_data_req_->chunks_to_move();
400   for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
401        ++ctm_it) {
402     uint32_t layout = shmem_abi_.GetPageLayout(ctm_it->page());
403     auto chunk_state =
404         shmem_abi_.GetChunkStateFromLayout(layout, ctm_it->chunk());
405     // Note: the subset of |commit_data_req_| chunks that still need patching is
406     // also the subset of chunks that are still being written to. The rest of
407     // the chunks in |commit_data_req_| do not need patching and have already
408     // been marked as complete.
409     if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
410       continue;
411 
412     chunk =
413         shmem_abi_.GetChunkUnchecked(ctm_it->page(), layout, ctm_it->chunk());
414     if (chunk.writer_id() == writer_id &&
415         chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
416             patch.chunk_id) {
417       chunk_found = true;
418       break;
419     }
420   }
421 
422   if (!chunk_found) {
423     // The chunk has already been committed to the service and the patch cannot
424     // be applied in the producer.
425     return false;
426   }
427 
428   // Apply the patch.
429   size_t page_idx;
430   uint8_t chunk_idx;
431   std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
432   PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
433                   SharedMemoryABI::ChunkState::kChunkBeingWritten);
434   auto chunk_begin = chunk.payload_begin();
435   uint8_t* ptr = chunk_begin + patch.offset;
436   PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
437   // DCHECK that we are writing into a zero-filled size field and not into
438   // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
439   // zero-fill reservations in debug builds.
440   const char zero[SharedMemoryABI::kPacketHeaderSize]{};
441   PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
442 
443   memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
444 
445   if (!chunk_needs_more_patching) {
446     // Mark that the chunk doesn't need more patching and mark it as complete,
447     // as the producer will not write to it anymore. This allows the service to
448     // read the chunk in full while scraping, which would not be the case if the
449     // chunk was left in a kChunkBeingWritten state.
450     chunk.ClearNeedsPatchingFlag();
451     shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
452   }
453 
454   return true;
455 }
456 
SetBatchCommitsDuration(uint32_t batch_commits_duration_ms)457 void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
458     uint32_t batch_commits_duration_ms) {
459   std::lock_guard<std::mutex> scoped_lock(lock_);
460   batch_commits_duration_ms_ = batch_commits_duration_ms;
461 }
462 
EnableDirectSMBPatching()463 bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
464   std::lock_guard<std::mutex> scoped_lock(lock_);
465   if (!direct_patching_supported_by_service_) {
466     return false;
467   }
468 
469   return direct_patching_enabled_ = true;
470 }
471 
SetDirectSMBPatchingSupportedByService()472 void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
473   std::lock_guard<std::mutex> scoped_lock(lock_);
474   direct_patching_supported_by_service_ = true;
475 }
476 
477 // This function is quite subtle. When making changes keep in mind these two
478 // challenges:
479 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
480 //    thread (or, for in-process cases, on the same thread where
481 //    TracingServiceImpl lives), the CommitData() call must be synchronous and
482 //    not posted, to avoid deadlocks.
483 // 2) When different threads hit this function, we must guarantee that we don't
484 //    accidentally make commits out of order. See commit 4e4fe8f56ef and
485 //    crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)486 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
487     std::function<void()> callback) {
488   std::unique_ptr<CommitDataRequest> req;
489   {
490     std::unique_lock<std::mutex> scoped_lock(lock_);
491 
492     // Flushing is only supported while |fully_bound_|, and there may still be
493     // unbound startup trace writers. If so, skip the commit for now - it'll be
494     // done when |fully_bound_| is updated.
495     if (!fully_bound_) {
496       if (callback)
497         pending_flush_callbacks_.push_back(callback);
498       return;
499     }
500 
501     // May be called by TraceWriterImpl on any thread.
502     base::TaskRunner* task_runner = task_runner_;
503     if (!task_runner->RunsTasksOnCurrentThread()) {
504       // We shouldn't post a task while holding a lock. |task_runner| remains
505       // valid after unlocking, because |task_runner_| is never reset.
506       scoped_lock.unlock();
507 
508       auto weak_this = weak_ptr_factory_.GetWeakPtr();
509       task_runner->PostTask([weak_this, callback] {
510         if (weak_this)
511           weak_this->FlushPendingCommitDataRequests(std::move(callback));
512       });
513       return;
514     }
515 
516     // |commit_data_req_| could have become a nullptr, for example when a forced
517     // sync flush happens in GetNewChunk().
518     if (commit_data_req_) {
519       // Make sure any placeholder buffer IDs from StartupWriters are replaced
520       // before sending the request.
521       bool all_placeholders_replaced =
522           ReplaceCommitPlaceholderBufferIdsLocked();
523       // We're |fully_bound_|, thus all writers are bound and all placeholders
524       // should have been replaced.
525       PERFETTO_DCHECK(all_placeholders_replaced);
526 
527       // In order to allow patching in the producer we delay the kChunkComplete
528       // transition and keep batched chunks in the kChunkBeingWritten state.
529       // Since we are about to notify the service of all batched chunks, it will
530       // not be possible to apply any more patches to them and we need to move
531       // them to kChunkComplete - otherwise the service won't look at them.
532       for (auto& ctm : commit_data_req_->chunks_to_move()) {
533         uint32_t layout = shmem_abi_.GetPageLayout(ctm.page());
534         auto chunk_state =
535             shmem_abi_.GetChunkStateFromLayout(layout, ctm.chunk());
536         // Note: the subset of |commit_data_req_| chunks that still need
537         // patching is also the subset of chunks that are still being written
538         // to. The rest of the chunks in |commit_data_req_| do not need patching
539         // and have already been marked as complete.
540         if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
541           continue;
542 
543         SharedMemoryABI::Chunk chunk =
544             shmem_abi_.GetChunkUnchecked(ctm.page(), layout, ctm.chunk());
545         shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
546       }
547 
548       req = std::move(commit_data_req_);
549       bytes_pending_commit_ = 0;
550     }
551   }  // scoped_lock
552 
553   if (req) {
554     producer_endpoint_->CommitData(*req, callback);
555   } else if (callback) {
556     // If |req| was nullptr, it means that an enqueued deferred commit was
557     // executed just before this. At this point send an empty commit request
558     // to the service, just to linearize with it and give the guarantee to the
559     // caller that the data has been flushed into the service.
560     producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
561   }
562 }
563 
TryShutdown()564 bool SharedMemoryArbiterImpl::TryShutdown() {
565   std::lock_guard<std::mutex> scoped_lock(lock_);
566   did_shutdown_ = true;
567   // Shutdown is safe if there are no active trace writers for this arbiter.
568   return active_writer_ids_.IsEmpty();
569 }
570 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)571 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
572     BufferID target_buffer,
573     BufferExhaustedPolicy buffer_exhausted_policy) {
574   PERFETTO_CHECK(target_buffer > 0);
575   return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
576 }
577 
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)578 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
579     uint16_t target_buffer_reservation_id) {
580   return CreateTraceWriterInternal(
581       MakeTargetBufferIdForReservation(target_buffer_reservation_id),
582       BufferExhaustedPolicy::kDrop);
583 }
584 
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)585 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
586     TracingService::ProducerEndpoint* producer_endpoint,
587     base::TaskRunner* task_runner) {
588   PERFETTO_DCHECK(producer_endpoint && task_runner);
589   PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
590 
591   bool should_flush = false;
592   std::function<void()> flush_callback;
593   {
594     std::lock_guard<std::mutex> scoped_lock(lock_);
595     PERFETTO_CHECK(!fully_bound_);
596     PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
597 
598     producer_endpoint_ = producer_endpoint;
599     task_runner_ = task_runner;
600 
601     // Now that we're bound to a task runner, also reset the WeakPtrFactory to
602     // it. Because this code runs on the task runner, the factory's weak
603     // pointers will be valid on it.
604     weak_ptr_factory_.Reset(this);
605 
606     // All writers registered so far should be startup trace writers, since
607     // the producer cannot feasibly know the target buffer for any future
608     // session yet.
609     for (const auto& entry : pending_writers_) {
610       PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
611     }
612 
613     // If all buffer reservations are bound, we can flush pending commits.
614     if (UpdateFullyBoundLocked()) {
615       should_flush = true;
616       flush_callback = TakePendingFlushCallbacksLocked();
617     }
618   }  // scoped_lock
619 
620   // Attempt to flush any pending commits (and run pending flush callbacks). If
621   // there are none, this will have no effect. If we ended up in a race that
622   // changed |fully_bound_| back to false, the commit will happen once we become
623   // |fully_bound_| again.
624   if (should_flush)
625     FlushPendingCommitDataRequests(flush_callback);
626 }
627 
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)628 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
629     uint16_t target_buffer_reservation_id,
630     BufferID target_buffer_id) {
631   PERFETTO_DCHECK(target_buffer_id > 0);
632 
633   std::unique_lock<std::mutex> scoped_lock(lock_);
634 
635   // We should already be bound to an endpoint.
636   PERFETTO_CHECK(producer_endpoint_);
637   PERFETTO_CHECK(task_runner_);
638   PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
639 
640   BindStartupTargetBufferImpl(std::move(scoped_lock),
641                               target_buffer_reservation_id, target_buffer_id);
642 }
643 
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)644 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
645     uint16_t target_buffer_reservation_id) {
646   std::unique_lock<std::mutex> scoped_lock(lock_);
647 
648   // If we are already bound to an arbiter, we may need to flush after aborting
649   // the session, and thus should be running on the arbiter's task runner.
650   if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
651     // We shouldn't post tasks while locked.
652     auto* task_runner = task_runner_;
653     scoped_lock.unlock();
654 
655     auto weak_this = weak_ptr_factory_.GetWeakPtr();
656     task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
657       if (!weak_this)
658         return;
659       weak_this->AbortStartupTracingForReservation(
660           target_buffer_reservation_id);
661     });
662     return;
663   }
664 
665   // Bind the target buffer reservation to an invalid buffer (ID 0), so that
666   // existing commits, as well as future commits (of currently acquired chunks),
667   // will be released as free free by the service but otherwise ignored (i.e.
668   // not copied into any valid target buffer).
669   BindStartupTargetBufferImpl(std::move(scoped_lock),
670                               target_buffer_reservation_id,
671                               /*target_buffer_id=*/kInvalidBufferId);
672 }
673 
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)674 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
675     std::unique_lock<std::mutex> scoped_lock,
676     uint16_t target_buffer_reservation_id,
677     BufferID target_buffer_id) {
678   // We should already be bound to an endpoint if the target buffer is valid.
679   PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
680                   target_buffer_id == kInvalidBufferId);
681 
682   PERFETTO_DLOG("Binding startup target buffer reservation %" PRIu16
683                 " to buffer %" PRIu16,
684                 target_buffer_reservation_id, target_buffer_id);
685 
686   MaybeUnboundBufferID reserved_id =
687       MakeTargetBufferIdForReservation(target_buffer_reservation_id);
688 
689   bool should_flush = false;
690   std::function<void()> flush_callback;
691   std::vector<std::pair<WriterID, BufferID>> writers_to_register;
692 
693   TargetBufferReservation& reservation =
694       target_buffer_reservations_[reserved_id];
695   PERFETTO_CHECK(!reservation.resolved);
696   reservation.resolved = true;
697   reservation.target_buffer = target_buffer_id;
698 
699   // Collect trace writers associated with the reservation.
700   for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
701     if (it->second == reserved_id) {
702       // No need to register writers that have an invalid target buffer.
703       if (target_buffer_id != kInvalidBufferId) {
704         writers_to_register.push_back(
705             std::make_pair(it->first, target_buffer_id));
706       }
707       it = pending_writers_.erase(it);
708     } else {
709       it++;
710     }
711   }
712 
713   // If all buffer reservations are bound, we can flush pending commits.
714   if (UpdateFullyBoundLocked()) {
715     should_flush = true;
716     flush_callback = TakePendingFlushCallbacksLocked();
717   }
718 
719   scoped_lock.unlock();
720 
721   // Register any newly bound trace writers with the service.
722   for (const auto& writer_and_target_buffer : writers_to_register) {
723     producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
724                                             writer_and_target_buffer.second);
725   }
726 
727   // Attempt to flush any pending commits (and run pending flush callbacks). If
728   // there are none, this will have no effect. If we ended up in a race that
729   // changed |fully_bound_| back to false, the commit will happen once we become
730   // |fully_bound_| again.
731   if (should_flush)
732     FlushPendingCommitDataRequests(flush_callback);
733 }
734 
735 std::function<void()>
TakePendingFlushCallbacksLocked()736 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
737   if (pending_flush_callbacks_.empty())
738     return std::function<void()>();
739 
740   std::vector<std::function<void()>> pending_flush_callbacks;
741   pending_flush_callbacks.swap(pending_flush_callbacks_);
742   // Capture the callback list into the lambda by copy.
743   return [pending_flush_callbacks]() {
744     for (auto& callback : pending_flush_callbacks)
745       callback();
746   };
747 }
748 
NotifyFlushComplete(FlushRequestID req_id)749 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
750   base::TaskRunner* task_runner_to_commit_on = nullptr;
751 
752   {
753     std::lock_guard<std::mutex> scoped_lock(lock_);
754     // If a commit_data_req_ exists it means that somebody else already posted a
755     // FlushPendingCommitDataRequests() task.
756     if (!commit_data_req_) {
757       commit_data_req_.reset(new CommitDataRequest());
758 
759       // Flushing the commit is only supported while we're |fully_bound_|. If we
760       // aren't, we'll flush when |fully_bound_| is updated.
761       if (fully_bound_)
762         task_runner_to_commit_on = task_runner_;
763     } else {
764       // If there is another request queued and that also contains is a reply
765       // to a flush request, reply with the highest id.
766       req_id = std::max(req_id, commit_data_req_->flush_request_id());
767     }
768     commit_data_req_->set_flush_request_id(req_id);
769   }  // scoped_lock
770 
771   // We shouldn't post tasks while locked. |task_runner_to_commit_on|
772   // remains valid after unlocking, because |task_runner_| is never reset.
773   if (task_runner_to_commit_on) {
774     auto weak_this = weak_ptr_factory_.GetWeakPtr();
775     task_runner_to_commit_on->PostTask([weak_this] {
776       if (weak_this)
777         weak_this->FlushPendingCommitDataRequests();
778     });
779   }
780 }
781 
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)782 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
783     MaybeUnboundBufferID target_buffer,
784     BufferExhaustedPolicy buffer_exhausted_policy) {
785   WriterID id;
786   base::TaskRunner* task_runner_to_register_on = nullptr;
787 
788   {
789     std::lock_guard<std::mutex> scoped_lock(lock_);
790     if (did_shutdown_)
791       return std::unique_ptr<TraceWriter>(new NullTraceWriter());
792 
793     id = active_writer_ids_.Allocate();
794     if (!id)
795       return std::unique_ptr<TraceWriter>(new NullTraceWriter());
796 
797     PERFETTO_DCHECK(!pending_writers_.count(id));
798 
799     if (IsReservationTargetBufferId(target_buffer)) {
800       // If the reservation is new, mark it as unbound in
801       // |target_buffer_reservations_|. Otherwise, if the reservation was
802       // already bound, choose the bound buffer ID now.
803       auto it_and_inserted = target_buffer_reservations_.insert(
804           {target_buffer, TargetBufferReservation()});
805       if (it_and_inserted.first->second.resolved)
806         target_buffer = it_and_inserted.first->second.target_buffer;
807     }
808 
809     if (IsReservationTargetBufferId(target_buffer)) {
810       // The arbiter and/or startup buffer reservations are not bound yet, so
811       // buffer the registration of the writer until after we're bound.
812       pending_writers_[id] = target_buffer;
813 
814       // Mark the arbiter as not fully bound, since we now have at least one
815       // unbound trace writer / target buffer reservation.
816       fully_bound_ = false;
817       was_always_bound_ = false;
818     } else if (target_buffer != kInvalidBufferId) {
819       // Trace writer is bound, so arbiter should be bound to an endpoint, too.
820       PERFETTO_CHECK(producer_endpoint_ && task_runner_);
821       task_runner_to_register_on = task_runner_;
822     }
823 
824     // All trace writers must use kDrop policy if the arbiter ever becomes
825     // unbound.
826     bool uses_drop_policy =
827         buffer_exhausted_policy == BufferExhaustedPolicy::kDrop;
828     all_writers_have_drop_policy_ &= uses_drop_policy;
829     PERFETTO_DCHECK(fully_bound_ || uses_drop_policy);
830     PERFETTO_CHECK(fully_bound_ || all_writers_have_drop_policy_);
831     PERFETTO_CHECK(was_always_bound_ || uses_drop_policy);
832   }  // scoped_lock
833 
834   // We shouldn't post tasks while locked. |task_runner_to_register_on|
835   // remains valid after unlocking, because |task_runner_| is never reset.
836   if (task_runner_to_register_on) {
837     auto weak_this = weak_ptr_factory_.GetWeakPtr();
838     task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
839       if (weak_this)
840         weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
841     });
842   }
843 
844   return std::unique_ptr<TraceWriter>(
845       new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
846 }
847 
ReleaseWriterID(WriterID id)848 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
849   base::TaskRunner* task_runner = nullptr;
850   {
851     std::lock_guard<std::mutex> scoped_lock(lock_);
852     active_writer_ids_.Free(id);
853 
854     auto it = pending_writers_.find(id);
855     if (it != pending_writers_.end()) {
856       // Writer hasn't been bound yet and thus also not yet registered with the
857       // service.
858       pending_writers_.erase(it);
859       return;
860     }
861 
862     // A trace writer from an aborted session may be destroyed before the
863     // arbiter is bound to a task runner. In that case, it was never registered
864     // with the service.
865     if (!task_runner_)
866       return;
867 
868     task_runner = task_runner_;
869   }  // scoped_lock
870 
871   // We shouldn't post tasks while locked. |task_runner| remains valid after
872   // unlocking, because |task_runner_| is never reset.
873   auto weak_this = weak_ptr_factory_.GetWeakPtr();
874   task_runner->PostTask([weak_this, id] {
875     if (weak_this)
876       weak_this->producer_endpoint_->UnregisterTraceWriter(id);
877   });
878 }
879 
ReplaceCommitPlaceholderBufferIdsLocked()880 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
881   if (!commit_data_req_)
882     return true;
883 
884   bool all_placeholders_replaced = true;
885   for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
886     if (!IsReservationTargetBufferId(chunk.target_buffer()))
887       continue;
888     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
889     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
890     if (!it->second.resolved) {
891       all_placeholders_replaced = false;
892       continue;
893     }
894     chunk.set_target_buffer(it->second.target_buffer);
895   }
896   for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
897     if (!IsReservationTargetBufferId(chunk.target_buffer()))
898       continue;
899     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
900     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
901     if (!it->second.resolved) {
902       all_placeholders_replaced = false;
903       continue;
904     }
905     chunk.set_target_buffer(it->second.target_buffer);
906   }
907   return all_placeholders_replaced;
908 }
909 
UpdateFullyBoundLocked()910 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
911   if (!producer_endpoint_) {
912     PERFETTO_DCHECK(!fully_bound_);
913     return false;
914   }
915   // We're fully bound if all target buffer reservations have a valid associated
916   // BufferID.
917   fully_bound_ = std::none_of(
918       target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
919       [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
920         return !entry.second.resolved;
921       });
922   if (!fully_bound_)
923     was_always_bound_ = false;
924   return fully_bound_;
925 }
926 
927 }  // namespace perfetto
928