1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "src/tracing/core/null_trace_writer.h"
29 #include "src/tracing/core/trace_writer_impl.h"
30
31 namespace perfetto {
32
33 using Chunk = SharedMemoryABI::Chunk;
34
35 namespace {
36 static_assert(sizeof(BufferID) == sizeof(uint16_t),
37 "The MaybeUnboundBufferID logic requires BufferID not to grow "
38 "above uint16_t.");
39
MakeTargetBufferIdForReservation(uint16_t reservation_id)40 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
41 // Reservation IDs are stored in the upper bits.
42 PERFETTO_CHECK(reservation_id > 0);
43 return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
44 }
45
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)46 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
47 return (buffer_id >> 16) > 0;
48 }
49 } // namespace
50
51 // static
52 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
53 SharedMemoryABI::PageLayout::kPageDiv1;
54
55 // static
56 constexpr BufferID SharedMemoryArbiterImpl::kInvalidBufferId;
57
58 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)59 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
60 SharedMemory* shared_memory,
61 size_t page_size,
62 TracingService::ProducerEndpoint* producer_endpoint,
63 base::TaskRunner* task_runner) {
64 return std::unique_ptr<SharedMemoryArbiterImpl>(
65 new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
66 page_size, producer_endpoint, task_runner));
67 }
68
69 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size)70 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
71 SharedMemory* shared_memory,
72 size_t page_size) {
73 return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
74 shared_memory->start(), shared_memory->size(), page_size,
75 /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
76 }
77
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)78 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
79 void* start,
80 size_t size,
81 size_t page_size,
82 TracingService::ProducerEndpoint* producer_endpoint,
83 base::TaskRunner* task_runner)
84 : initially_bound_(task_runner && producer_endpoint),
85 producer_endpoint_(producer_endpoint),
86 task_runner_(task_runner),
87 shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
88 active_writer_ids_(kMaxWriterID),
89 fully_bound_(initially_bound_),
90 weak_ptr_factory_(this) {}
91
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy,size_t size_hint)92 Chunk SharedMemoryArbiterImpl::GetNewChunk(
93 const SharedMemoryABI::ChunkHeader& header,
94 BufferExhaustedPolicy buffer_exhausted_policy,
95 size_t size_hint) {
96 PERFETTO_DCHECK(size_hint == 0); // Not implemented yet.
97 // If initially unbound, we do not support stalling. In theory, we could
98 // support stalling for TraceWriters created after the arbiter and startup
99 // buffer reservations were bound, but to avoid raciness between the creation
100 // of startup writers and binding, we categorically forbid kStall mode.
101 PERFETTO_DCHECK(initially_bound_ ||
102 buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
103
104 int stall_count = 0;
105 unsigned stall_interval_us = 0;
106 bool task_runner_runs_on_current_thread = false;
107 static const unsigned kMaxStallIntervalUs = 100000;
108 static const int kLogAfterNStalls = 3;
109 static const int kFlushCommitsAfterEveryNStalls = 2;
110 static const int kAssertAtNStalls = 100;
111
112 for (;;) {
113 // TODO(primiano): Probably this lock is not really required and this code
114 // could be rewritten leveraging only the Try* atomic operations in
115 // SharedMemoryABI. But let's not be too adventurous for the moment.
116 {
117 std::unique_lock<std::mutex> scoped_lock(lock_);
118
119 task_runner_runs_on_current_thread =
120 task_runner_ && task_runner_->RunsTasksOnCurrentThread();
121
122 // If more than half of the SMB.size() is filled with completed chunks for
123 // which we haven't notified the service yet (i.e. they are still enqueued
124 // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
125 // we acquire a chunk, to reduce the likeliness of stalling the writer.
126 //
127 // We can only do this if we're writing on the same thread that we access
128 // the producer endpoint on, since we cannot notify the producer endpoint
129 // to commit synchronously on a different thread. Attempting to flush
130 // synchronously on another thread will lead to subtle bugs caused by
131 // out-of-order commit requests (crbug.com/919187#c28).
132 bool should_commit_synchronously =
133 task_runner_runs_on_current_thread &&
134 buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
135 commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
136
137 const size_t initial_page_idx = page_idx_;
138 for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
139 page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
140 bool is_new_page = false;
141
142 // TODO(primiano): make the page layout dynamic.
143 auto layout = SharedMemoryArbiterImpl::default_page_layout;
144
145 if (shmem_abi_.is_page_free(page_idx_)) {
146 // TODO(primiano): Use the |size_hint| here to decide the layout.
147 is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
148 }
149 uint32_t free_chunks;
150 if (is_new_page) {
151 free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
152 } else {
153 free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
154 }
155
156 for (uint32_t chunk_idx = 0; free_chunks;
157 chunk_idx++, free_chunks >>= 1) {
158 if (!(free_chunks & 1))
159 continue;
160 // We found a free chunk.
161 Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
162 page_idx_, chunk_idx, &header);
163 if (!chunk.is_valid())
164 continue;
165 if (stall_count > kLogAfterNStalls) {
166 PERFETTO_LOG("Recovered from stall after %d iterations",
167 stall_count);
168 }
169
170 if (should_commit_synchronously) {
171 // We can't flush while holding the lock.
172 scoped_lock.unlock();
173 FlushPendingCommitDataRequests();
174 return chunk;
175 } else {
176 return chunk;
177 }
178 }
179 }
180 } // scoped_lock
181
182 if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
183 PERFETTO_DLOG("Shared memory buffer exhaused, returning invalid Chunk!");
184 return Chunk();
185 }
186
187 PERFETTO_DCHECK(initially_bound_);
188
189 // All chunks are taken (either kBeingWritten by us or kBeingRead by the
190 // Service).
191 if (stall_count++ == kLogAfterNStalls) {
192 PERFETTO_LOG("Shared memory buffer overrun! Stalling");
193 }
194
195 if (stall_count == kAssertAtNStalls) {
196 PERFETTO_FATAL(
197 "Shared memory buffer max stall count exceeded; possible deadlock");
198 }
199
200 // If the IPC thread itself is stalled because the current process has
201 // filled up the SMB, we need to make sure that the service can process and
202 // purge the chunks written by our process, by flushing any pending commit
203 // requests. Because other threads in our process can continue to
204 // concurrently grab, fill and commit any chunks purged by the service, it
205 // is possible that the SMB remains full and the IPC thread remains stalled,
206 // needing to flush the concurrently queued up commits again. This is
207 // particularly likely with in-process perfetto service where the IPC thread
208 // is the service thread. To avoid remaining stalled forever in such a
209 // situation, we attempt to flush periodically after every N stalls.
210 if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
211 task_runner_runs_on_current_thread) {
212 // TODO(primiano): sending the IPC synchronously is a temporary workaround
213 // until the backpressure logic in probes_producer is sorted out. Until
214 // then the risk is that we stall the message loop waiting for the tracing
215 // service to consume the shared memory buffer (SMB) and, for this reason,
216 // never run the task that tells the service to purge the SMB. This must
217 // happen iff we are on the IPC thread, not doing this will cause
218 // deadlocks, doing this on the wrong thread causes out-of-order data
219 // commits (crbug.com/919187#c28).
220 FlushPendingCommitDataRequests();
221 } else {
222 base::SleepMicroseconds(stall_interval_us);
223 stall_interval_us =
224 std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
225 }
226 }
227 }
228
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)229 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
230 Chunk chunk,
231 MaybeUnboundBufferID target_buffer,
232 PatchList* patch_list) {
233 PERFETTO_DCHECK(chunk.is_valid());
234 const WriterID writer_id = chunk.writer_id();
235 UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
236 patch_list);
237 }
238
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)239 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
240 MaybeUnboundBufferID target_buffer,
241 PatchList* patch_list) {
242 PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
243 UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
244 }
245
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)246 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
247 Chunk chunk,
248 WriterID writer_id,
249 MaybeUnboundBufferID target_buffer,
250 PatchList* patch_list) {
251 // Note: chunk will be invalid if the call came from SendPatches().
252 base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
253 // The delay with which the flush will be posted.
254 uint32_t flush_delay_ms = 0;
255 base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
256 {
257 std::lock_guard<std::mutex> scoped_lock(lock_);
258
259 if (!commit_data_req_) {
260 commit_data_req_.reset(new CommitDataRequest());
261
262 // Flushing the commit is only supported while we're |fully_bound_|. If we
263 // aren't, we'll flush when |fully_bound_| is updated.
264 if (fully_bound_ && !delayed_flush_scheduled_) {
265 weak_this = weak_ptr_factory_.GetWeakPtr();
266 task_runner_to_post_delayed_callback_on = task_runner_;
267 flush_delay_ms = batch_commits_duration_ms_;
268 delayed_flush_scheduled_ = true;
269 }
270 }
271
272 // If a valid chunk is specified, return it and attach it to the request.
273 if (chunk.is_valid()) {
274 PERFETTO_DCHECK(chunk.writer_id() == writer_id);
275 uint8_t chunk_idx = chunk.chunk_idx();
276 bytes_pending_commit_ += chunk.size();
277 size_t page_idx;
278 // If the chunk needs patching, it should not be marked as complete yet,
279 // because this would indicate to the service that the producer will not
280 // be writing to it anymore, while the producer might still apply patches
281 // to the chunk later on. In particular, when re-reading (e.g. because of
282 // periodic scraping) a completed chunk, the service expects the flags of
283 // that chunk not to be removed between reads. So, let's say the producer
284 // marked the chunk as complete here and the service then read it for the
285 // first time. If the producer then fully patched the chunk, thus removing
286 // the kChunkNeedsPatching flag, and the service re-read the chunk after
287 // the patching, the service would be thrown off by the removed flag.
288 if (direct_patching_enabled_ &&
289 (chunk.GetPacketCountAndFlags().second &
290 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
291 page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
292 } else {
293 // If the chunk doesn't need patching, we can mark it as complete
294 // immediately. This allows the service to read it in full while
295 // scraping, which would not be the case if the chunk was left in a
296 // kChunkBeingWritten state.
297 page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
298 }
299
300 // DO NOT access |chunk| after this point, it has been std::move()-d
301 // above.
302 CommitDataRequest::ChunksToMove* ctm =
303 commit_data_req_->add_chunks_to_move();
304 ctm->set_page(static_cast<uint32_t>(page_idx));
305 ctm->set_chunk(chunk_idx);
306 ctm->set_target_buffer(target_buffer);
307 }
308
309 // Process the completed patches for previous chunks from the |patch_list|.
310 CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
311 while (!patch_list->empty() && patch_list->front().is_patched()) {
312 Patch curr_patch = patch_list->front();
313 patch_list->pop_front();
314 // Patches for the same chunk are contiguous in the |patch_list|. So, to
315 // determine if there are any other patches that apply to the chunk that
316 // is being patched, check if the next patch in the |patch_list| applies
317 // to the same chunk.
318 bool chunk_needs_more_patching =
319 !patch_list->empty() &&
320 patch_list->front().chunk_id == curr_patch.chunk_id;
321
322 if (direct_patching_enabled_ &&
323 TryDirectPatchLocked(writer_id, curr_patch,
324 chunk_needs_more_patching)) {
325 continue;
326 }
327
328 // The chunk that this patch applies to has already been released to the
329 // service, so it cannot be patches here. Add the patch to the commit data
330 // request, so that it can be sent to the service and applied there.
331 if (!last_patch_req ||
332 last_patch_req->chunk_id() != curr_patch.chunk_id) {
333 last_patch_req = commit_data_req_->add_chunks_to_patch();
334 last_patch_req->set_writer_id(writer_id);
335 last_patch_req->set_chunk_id(curr_patch.chunk_id);
336 last_patch_req->set_target_buffer(target_buffer);
337 }
338 auto* patch = last_patch_req->add_patches();
339 patch->set_offset(curr_patch.offset);
340 patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
341 }
342
343 // Patches are enqueued in the |patch_list| in order and are notified to
344 // the service when the chunk is returned. The only case when the current
345 // patch list is incomplete is if there is an unpatched entry at the head of
346 // the |patch_list| that belongs to the same ChunkID as the last one we are
347 // about to send to the service.
348 if (last_patch_req && !patch_list->empty() &&
349 patch_list->front().chunk_id == last_patch_req->chunk_id()) {
350 last_patch_req->set_has_more_patches(true);
351 }
352
353 // If the buffer is filling up or if we are given a patch for a chunk
354 // that was already sent to the service, we don't want to wait for the next
355 // delayed flush to happen and we flush immediately. Otherwise, if we
356 // accumulate the patch and a crash occurs before the patch is sent, the
357 // service will not know of the patch and won't be able to reconstruct the
358 // trace.
359 if (fully_bound_ &&
360 (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
361 weak_this = weak_ptr_factory_.GetWeakPtr();
362 task_runner_to_post_delayed_callback_on = task_runner_;
363 flush_delay_ms = 0;
364 }
365 } // scoped_lock(lock_)
366
367 // We shouldn't post tasks while locked.
368 // |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
369 // because |task_runner_| is never reset.
370 if (task_runner_to_post_delayed_callback_on) {
371 task_runner_to_post_delayed_callback_on->PostDelayedTask(
372 [weak_this] {
373 if (!weak_this)
374 return;
375 {
376 std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
377 // Clear |delayed_flush_scheduled_|, allowing the next call to
378 // UpdateCommitDataRequest to start another batching period.
379 weak_this->delayed_flush_scheduled_ = false;
380 }
381 weak_this->FlushPendingCommitDataRequests();
382 },
383 flush_delay_ms);
384 }
385 }
386
TryDirectPatchLocked(WriterID writer_id,const Patch & patch,bool chunk_needs_more_patching)387 bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
388 WriterID writer_id,
389 const Patch& patch,
390 bool chunk_needs_more_patching) {
391 // Search the chunks that are being batched in |commit_data_req_| for a chunk
392 // that needs patching and that matches the provided |writer_id| and
393 // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
394 // |commit_data_req_| is appended to at the end with newly-returned chunks,
395 // and patches are more likely to apply to chunks that have been returned
396 // recently.
397 SharedMemoryABI::Chunk chunk;
398 bool chunk_found = false;
399 auto& chunks_to_move = commit_data_req_->chunks_to_move();
400 for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
401 ++ctm_it) {
402 uint32_t layout = shmem_abi_.GetPageLayout(ctm_it->page());
403 auto chunk_state =
404 shmem_abi_.GetChunkStateFromLayout(layout, ctm_it->chunk());
405 // Note: the subset of |commit_data_req_| chunks that still need patching is
406 // also the subset of chunks that are still being written to. The rest of
407 // the chunks in |commit_data_req_| do not need patching and have already
408 // been marked as complete.
409 if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
410 continue;
411
412 chunk =
413 shmem_abi_.GetChunkUnchecked(ctm_it->page(), layout, ctm_it->chunk());
414 if (chunk.writer_id() == writer_id &&
415 chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
416 patch.chunk_id) {
417 chunk_found = true;
418 break;
419 }
420 }
421
422 if (!chunk_found) {
423 // The chunk has already been committed to the service and the patch cannot
424 // be applied in the producer.
425 return false;
426 }
427
428 // Apply the patch.
429 size_t page_idx;
430 uint8_t chunk_idx;
431 std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
432 PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
433 SharedMemoryABI::ChunkState::kChunkBeingWritten);
434 auto chunk_begin = chunk.payload_begin();
435 uint8_t* ptr = chunk_begin + patch.offset;
436 PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
437 // DCHECK that we are writing into a zero-filled size field and not into
438 // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
439 // zero-fill reservations in debug builds.
440 const char zero[SharedMemoryABI::kPacketHeaderSize]{};
441 PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
442
443 memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
444
445 if (!chunk_needs_more_patching) {
446 // Mark that the chunk doesn't need more patching and mark it as complete,
447 // as the producer will not write to it anymore. This allows the service to
448 // read the chunk in full while scraping, which would not be the case if the
449 // chunk was left in a kChunkBeingWritten state.
450 chunk.ClearNeedsPatchingFlag();
451 shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
452 }
453
454 return true;
455 }
456
SetBatchCommitsDuration(uint32_t batch_commits_duration_ms)457 void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
458 uint32_t batch_commits_duration_ms) {
459 std::lock_guard<std::mutex> scoped_lock(lock_);
460 batch_commits_duration_ms_ = batch_commits_duration_ms;
461 }
462
EnableDirectSMBPatching()463 bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
464 std::lock_guard<std::mutex> scoped_lock(lock_);
465 if (!direct_patching_supported_by_service_) {
466 return false;
467 }
468
469 return direct_patching_enabled_ = true;
470 }
471
SetDirectSMBPatchingSupportedByService()472 void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
473 std::lock_guard<std::mutex> scoped_lock(lock_);
474 direct_patching_supported_by_service_ = true;
475 }
476
477 // This function is quite subtle. When making changes keep in mind these two
478 // challenges:
479 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
480 // thread (or, for in-process cases, on the same thread where
481 // TracingServiceImpl lives), the CommitData() call must be synchronous and
482 // not posted, to avoid deadlocks.
483 // 2) When different threads hit this function, we must guarantee that we don't
484 // accidentally make commits out of order. See commit 4e4fe8f56ef and
485 // crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)486 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
487 std::function<void()> callback) {
488 std::unique_ptr<CommitDataRequest> req;
489 {
490 std::unique_lock<std::mutex> scoped_lock(lock_);
491
492 // Flushing is only supported while |fully_bound_|, and there may still be
493 // unbound startup trace writers. If so, skip the commit for now - it'll be
494 // done when |fully_bound_| is updated.
495 if (!fully_bound_) {
496 if (callback)
497 pending_flush_callbacks_.push_back(callback);
498 return;
499 }
500
501 // May be called by TraceWriterImpl on any thread.
502 base::TaskRunner* task_runner = task_runner_;
503 if (!task_runner->RunsTasksOnCurrentThread()) {
504 // We shouldn't post a task while holding a lock. |task_runner| remains
505 // valid after unlocking, because |task_runner_| is never reset.
506 scoped_lock.unlock();
507
508 auto weak_this = weak_ptr_factory_.GetWeakPtr();
509 task_runner->PostTask([weak_this, callback] {
510 if (weak_this)
511 weak_this->FlushPendingCommitDataRequests(std::move(callback));
512 });
513 return;
514 }
515
516 // |commit_data_req_| could have become a nullptr, for example when a forced
517 // sync flush happens in GetNewChunk().
518 if (commit_data_req_) {
519 // Make sure any placeholder buffer IDs from StartupWriters are replaced
520 // before sending the request.
521 bool all_placeholders_replaced =
522 ReplaceCommitPlaceholderBufferIdsLocked();
523 // We're |fully_bound_|, thus all writers are bound and all placeholders
524 // should have been replaced.
525 PERFETTO_DCHECK(all_placeholders_replaced);
526
527 // In order to allow patching in the producer we delay the kChunkComplete
528 // transition and keep batched chunks in the kChunkBeingWritten state.
529 // Since we are about to notify the service of all batched chunks, it will
530 // not be possible to apply any more patches to them and we need to move
531 // them to kChunkComplete - otherwise the service won't look at them.
532 for (auto& ctm : commit_data_req_->chunks_to_move()) {
533 uint32_t layout = shmem_abi_.GetPageLayout(ctm.page());
534 auto chunk_state =
535 shmem_abi_.GetChunkStateFromLayout(layout, ctm.chunk());
536 // Note: the subset of |commit_data_req_| chunks that still need
537 // patching is also the subset of chunks that are still being written
538 // to. The rest of the chunks in |commit_data_req_| do not need patching
539 // and have already been marked as complete.
540 if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
541 continue;
542
543 SharedMemoryABI::Chunk chunk =
544 shmem_abi_.GetChunkUnchecked(ctm.page(), layout, ctm.chunk());
545 shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
546 }
547
548 req = std::move(commit_data_req_);
549 bytes_pending_commit_ = 0;
550 }
551 } // scoped_lock
552
553 if (req) {
554 producer_endpoint_->CommitData(*req, callback);
555 } else if (callback) {
556 // If |req| was nullptr, it means that an enqueued deferred commit was
557 // executed just before this. At this point send an empty commit request
558 // to the service, just to linearize with it and give the guarantee to the
559 // caller that the data has been flushed into the service.
560 producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
561 }
562 }
563
TryShutdown()564 bool SharedMemoryArbiterImpl::TryShutdown() {
565 std::lock_guard<std::mutex> scoped_lock(lock_);
566 did_shutdown_ = true;
567 // Shutdown is safe if there are no active trace writers for this arbiter.
568 return active_writer_ids_.IsEmpty();
569 }
570
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)571 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
572 BufferID target_buffer,
573 BufferExhaustedPolicy buffer_exhausted_policy) {
574 PERFETTO_CHECK(target_buffer > 0);
575 return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
576 }
577
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)578 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
579 uint16_t target_buffer_reservation_id) {
580 PERFETTO_CHECK(!initially_bound_);
581 return CreateTraceWriterInternal(
582 MakeTargetBufferIdForReservation(target_buffer_reservation_id),
583 BufferExhaustedPolicy::kDrop);
584 }
585
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)586 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
587 TracingService::ProducerEndpoint* producer_endpoint,
588 base::TaskRunner* task_runner) {
589 PERFETTO_DCHECK(producer_endpoint && task_runner);
590 PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
591 PERFETTO_CHECK(!initially_bound_);
592
593 bool should_flush = false;
594 std::function<void()> flush_callback;
595 {
596 std::lock_guard<std::mutex> scoped_lock(lock_);
597 PERFETTO_CHECK(!fully_bound_);
598 PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
599
600 producer_endpoint_ = producer_endpoint;
601 task_runner_ = task_runner;
602
603 // Now that we're bound to a task runner, also reset the WeakPtrFactory to
604 // it. Because this code runs on the task runner, the factory's weak
605 // pointers will be valid on it.
606 weak_ptr_factory_.Reset(this);
607
608 // All writers registered so far should be startup trace writers, since
609 // the producer cannot feasibly know the target buffer for any future
610 // session yet.
611 for (const auto& entry : pending_writers_) {
612 PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
613 }
614
615 // If all buffer reservations are bound, we can flush pending commits.
616 if (UpdateFullyBoundLocked()) {
617 should_flush = true;
618 flush_callback = TakePendingFlushCallbacksLocked();
619 }
620 } // scoped_lock
621
622 // Attempt to flush any pending commits (and run pending flush callbacks). If
623 // there are none, this will have no effect. If we ended up in a race that
624 // changed |fully_bound_| back to false, the commit will happen once we become
625 // |fully_bound_| again.
626 if (should_flush)
627 FlushPendingCommitDataRequests(flush_callback);
628 }
629
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)630 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
631 uint16_t target_buffer_reservation_id,
632 BufferID target_buffer_id) {
633 PERFETTO_DCHECK(target_buffer_id > 0);
634 PERFETTO_CHECK(!initially_bound_);
635
636 std::unique_lock<std::mutex> scoped_lock(lock_);
637
638 // We should already be bound to an endpoint, but not fully bound.
639 PERFETTO_CHECK(!fully_bound_);
640 PERFETTO_CHECK(producer_endpoint_);
641 PERFETTO_CHECK(task_runner_);
642 PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
643
644 BindStartupTargetBufferImpl(std::move(scoped_lock),
645 target_buffer_reservation_id, target_buffer_id);
646 }
647
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)648 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
649 uint16_t target_buffer_reservation_id) {
650 PERFETTO_CHECK(!initially_bound_);
651
652 std::unique_lock<std::mutex> scoped_lock(lock_);
653
654 // If we are already bound to an arbiter, we may need to flush after aborting
655 // the session, and thus should be running on the arbiter's task runner.
656 if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
657 // We shouldn't post tasks while locked.
658 auto* task_runner = task_runner_;
659 scoped_lock.unlock();
660
661 auto weak_this = weak_ptr_factory_.GetWeakPtr();
662 task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
663 if (!weak_this)
664 return;
665 weak_this->AbortStartupTracingForReservation(
666 target_buffer_reservation_id);
667 });
668 return;
669 }
670
671 PERFETTO_CHECK(!fully_bound_);
672
673 // Bind the target buffer reservation to an invalid buffer (ID 0), so that
674 // existing commits, as well as future commits (of currently acquired chunks),
675 // will be released as free free by the service but otherwise ignored (i.e.
676 // not copied into any valid target buffer).
677 BindStartupTargetBufferImpl(std::move(scoped_lock),
678 target_buffer_reservation_id,
679 /*target_buffer_id=*/kInvalidBufferId);
680 }
681
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)682 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
683 std::unique_lock<std::mutex> scoped_lock,
684 uint16_t target_buffer_reservation_id,
685 BufferID target_buffer_id) {
686 // We should already be bound to an endpoint if the target buffer is valid.
687 PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
688 target_buffer_id == kInvalidBufferId);
689
690 MaybeUnboundBufferID reserved_id =
691 MakeTargetBufferIdForReservation(target_buffer_reservation_id);
692
693 bool should_flush = false;
694 std::function<void()> flush_callback;
695 std::vector<std::pair<WriterID, BufferID>> writers_to_register;
696
697 TargetBufferReservation& reservation =
698 target_buffer_reservations_[reserved_id];
699 PERFETTO_CHECK(!reservation.resolved);
700 reservation.resolved = true;
701 reservation.target_buffer = target_buffer_id;
702
703 // Collect trace writers associated with the reservation.
704 for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
705 if (it->second == reserved_id) {
706 // No need to register writers that have an invalid target buffer.
707 if (target_buffer_id != kInvalidBufferId) {
708 writers_to_register.push_back(
709 std::make_pair(it->first, target_buffer_id));
710 }
711 it = pending_writers_.erase(it);
712 } else {
713 it++;
714 }
715 }
716
717 // If all buffer reservations are bound, we can flush pending commits.
718 if (UpdateFullyBoundLocked()) {
719 should_flush = true;
720 flush_callback = TakePendingFlushCallbacksLocked();
721 }
722
723 scoped_lock.unlock();
724
725 // Register any newly bound trace writers with the service.
726 for (const auto& writer_and_target_buffer : writers_to_register) {
727 producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
728 writer_and_target_buffer.second);
729 }
730
731 // Attempt to flush any pending commits (and run pending flush callbacks). If
732 // there are none, this will have no effect. If we ended up in a race that
733 // changed |fully_bound_| back to false, the commit will happen once we become
734 // |fully_bound_| again.
735 if (should_flush)
736 FlushPendingCommitDataRequests(flush_callback);
737 }
738
739 std::function<void()>
TakePendingFlushCallbacksLocked()740 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
741 if (pending_flush_callbacks_.empty())
742 return std::function<void()>();
743
744 std::vector<std::function<void()>> pending_flush_callbacks;
745 pending_flush_callbacks.swap(pending_flush_callbacks_);
746 // Capture the callback list into the lambda by copy.
747 return [pending_flush_callbacks]() {
748 for (auto& callback : pending_flush_callbacks)
749 callback();
750 };
751 }
752
NotifyFlushComplete(FlushRequestID req_id)753 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
754 base::TaskRunner* task_runner_to_commit_on = nullptr;
755
756 {
757 std::lock_guard<std::mutex> scoped_lock(lock_);
758 // If a commit_data_req_ exists it means that somebody else already posted a
759 // FlushPendingCommitDataRequests() task.
760 if (!commit_data_req_) {
761 commit_data_req_.reset(new CommitDataRequest());
762
763 // Flushing the commit is only supported while we're |fully_bound_|. If we
764 // aren't, we'll flush when |fully_bound_| is updated.
765 if (fully_bound_)
766 task_runner_to_commit_on = task_runner_;
767 } else {
768 // If there is another request queued and that also contains is a reply
769 // to a flush request, reply with the highest id.
770 req_id = std::max(req_id, commit_data_req_->flush_request_id());
771 }
772 commit_data_req_->set_flush_request_id(req_id);
773 } // scoped_lock
774
775 // We shouldn't post tasks while locked. |task_runner_to_commit_on|
776 // remains valid after unlocking, because |task_runner_| is never reset.
777 if (task_runner_to_commit_on) {
778 auto weak_this = weak_ptr_factory_.GetWeakPtr();
779 task_runner_to_commit_on->PostTask([weak_this] {
780 if (weak_this)
781 weak_this->FlushPendingCommitDataRequests();
782 });
783 }
784 }
785
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)786 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
787 MaybeUnboundBufferID target_buffer,
788 BufferExhaustedPolicy buffer_exhausted_policy) {
789 WriterID id;
790 base::TaskRunner* task_runner_to_register_on = nullptr;
791
792 {
793 std::lock_guard<std::mutex> scoped_lock(lock_);
794 if (did_shutdown_)
795 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
796
797 id = active_writer_ids_.Allocate();
798 if (!id)
799 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
800
801 PERFETTO_DCHECK(!pending_writers_.count(id));
802
803 if (IsReservationTargetBufferId(target_buffer)) {
804 // If the reservation is new, mark it as unbound in
805 // |target_buffer_reservations_|. Otherwise, if the reservation was
806 // already bound, choose the bound buffer ID now.
807 auto it_and_inserted = target_buffer_reservations_.insert(
808 {target_buffer, TargetBufferReservation()});
809 if (it_and_inserted.first->second.resolved)
810 target_buffer = it_and_inserted.first->second.target_buffer;
811 }
812
813 if (IsReservationTargetBufferId(target_buffer)) {
814 // The arbiter and/or startup buffer reservations are not bound yet, so
815 // buffer the registration of the writer until after we're bound.
816 pending_writers_[id] = target_buffer;
817
818 // Mark the arbiter as not fully bound, since we now have at least one
819 // unbound trace writer / target buffer reservation.
820 fully_bound_ = false;
821 } else if (target_buffer != kInvalidBufferId) {
822 // Trace writer is bound, so arbiter should be bound to an endpoint, too.
823 PERFETTO_CHECK(producer_endpoint_ && task_runner_);
824 task_runner_to_register_on = task_runner_;
825 }
826 } // scoped_lock
827
828 // We shouldn't post tasks while locked. |task_runner_to_register_on|
829 // remains valid after unlocking, because |task_runner_| is never reset.
830 if (task_runner_to_register_on) {
831 auto weak_this = weak_ptr_factory_.GetWeakPtr();
832 task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
833 if (weak_this)
834 weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
835 });
836 }
837
838 return std::unique_ptr<TraceWriter>(
839 new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
840 }
841
ReleaseWriterID(WriterID id)842 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
843 base::TaskRunner* task_runner = nullptr;
844 {
845 std::lock_guard<std::mutex> scoped_lock(lock_);
846 active_writer_ids_.Free(id);
847
848 auto it = pending_writers_.find(id);
849 if (it != pending_writers_.end()) {
850 // Writer hasn't been bound yet and thus also not yet registered with the
851 // service.
852 pending_writers_.erase(it);
853 return;
854 }
855
856 // A trace writer from an aborted session may be destroyed before the
857 // arbiter is bound to a task runner. In that case, it was never registered
858 // with the service.
859 if (!task_runner_)
860 return;
861
862 task_runner = task_runner_;
863 } // scoped_lock
864
865 // We shouldn't post tasks while locked. |task_runner| remains valid after
866 // unlocking, because |task_runner_| is never reset.
867 auto weak_this = weak_ptr_factory_.GetWeakPtr();
868 task_runner->PostTask([weak_this, id] {
869 if (weak_this)
870 weak_this->producer_endpoint_->UnregisterTraceWriter(id);
871 });
872 }
873
ReplaceCommitPlaceholderBufferIdsLocked()874 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
875 if (!commit_data_req_)
876 return true;
877
878 bool all_placeholders_replaced = true;
879 for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
880 if (!IsReservationTargetBufferId(chunk.target_buffer()))
881 continue;
882 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
883 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
884 if (!it->second.resolved) {
885 all_placeholders_replaced = false;
886 continue;
887 }
888 chunk.set_target_buffer(it->second.target_buffer);
889 }
890 for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
891 if (!IsReservationTargetBufferId(chunk.target_buffer()))
892 continue;
893 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
894 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
895 if (!it->second.resolved) {
896 all_placeholders_replaced = false;
897 continue;
898 }
899 chunk.set_target_buffer(it->second.target_buffer);
900 }
901 return all_placeholders_replaced;
902 }
903
UpdateFullyBoundLocked()904 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
905 if (!producer_endpoint_) {
906 PERFETTO_DCHECK(!fully_bound_);
907 return false;
908 }
909 // We're fully bound if all target buffer reservations have a valid associated
910 // BufferID.
911 fully_bound_ = std::none_of(
912 target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
913 [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
914 return !entry.second.resolved;
915 });
916 return fully_bound_;
917 }
918
919 } // namespace perfetto
920