1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "src/tracing/core/null_trace_writer.h"
29 #include "src/tracing/core/trace_writer_impl.h"
30
31 namespace perfetto {
32
33 using Chunk = SharedMemoryABI::Chunk;
34
35 namespace {
36 static_assert(sizeof(BufferID) == sizeof(uint16_t),
37 "The MaybeUnboundBufferID logic requires BufferID not to grow "
38 "above uint16_t.");
39
MakeTargetBufferIdForReservation(uint16_t reservation_id)40 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
41 // Reservation IDs are stored in the upper bits.
42 PERFETTO_CHECK(reservation_id > 0);
43 return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
44 }
45
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)46 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
47 return (buffer_id >> 16) > 0;
48 }
49 } // namespace
50
51 // static
52 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
53 SharedMemoryABI::PageLayout::kPageDiv1;
54
55 // static
56 constexpr BufferID SharedMemoryArbiterImpl::kInvalidBufferId;
57
58 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)59 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
60 SharedMemory* shared_memory,
61 size_t page_size,
62 TracingService::ProducerEndpoint* producer_endpoint,
63 base::TaskRunner* task_runner) {
64 return std::unique_ptr<SharedMemoryArbiterImpl>(
65 new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
66 page_size, producer_endpoint, task_runner));
67 }
68
69 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size)70 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
71 SharedMemory* shared_memory,
72 size_t page_size) {
73 return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
74 shared_memory->start(), shared_memory->size(), page_size,
75 /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
76 }
77
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)78 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
79 void* start,
80 size_t size,
81 size_t page_size,
82 TracingService::ProducerEndpoint* producer_endpoint,
83 base::TaskRunner* task_runner)
84 : initially_bound_(task_runner && producer_endpoint),
85 producer_endpoint_(producer_endpoint),
86 task_runner_(task_runner),
87 shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
88 active_writer_ids_(kMaxWriterID),
89 fully_bound_(initially_bound_),
90 weak_ptr_factory_(this) {}
91
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy,size_t size_hint)92 Chunk SharedMemoryArbiterImpl::GetNewChunk(
93 const SharedMemoryABI::ChunkHeader& header,
94 BufferExhaustedPolicy buffer_exhausted_policy,
95 size_t size_hint) {
96 PERFETTO_DCHECK(size_hint == 0); // Not implemented yet.
97 // If initially unbound, we do not support stalling. In theory, we could
98 // support stalling for TraceWriters created after the arbiter and startup
99 // buffer reservations were bound, but to avoid raciness between the creation
100 // of startup writers and binding, we categorically forbid kStall mode.
101 PERFETTO_DCHECK(initially_bound_ ||
102 buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
103
104 int stall_count = 0;
105 unsigned stall_interval_us = 0;
106 bool task_runner_runs_on_current_thread = false;
107 static const unsigned kMaxStallIntervalUs = 100000;
108 static const int kLogAfterNStalls = 3;
109 static const int kFlushCommitsAfterEveryNStalls = 2;
110 static const int kAssertAtNStalls = 100;
111
112 for (;;) {
113 // TODO(primiano): Probably this lock is not really required and this code
114 // could be rewritten leveraging only the Try* atomic operations in
115 // SharedMemoryABI. But let's not be too adventurous for the moment.
116 {
117 std::unique_lock<std::mutex> scoped_lock(lock_);
118
119 task_runner_runs_on_current_thread =
120 task_runner_ && task_runner_->RunsTasksOnCurrentThread();
121
122 // If more than half of the SMB.size() is filled with completed chunks for
123 // which we haven't notified the service yet (i.e. they are still enqueued
124 // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
125 // we acquire a chunk, to reduce the likeliness of stalling the writer.
126 //
127 // We can only do this if we're writing on the same thread that we access
128 // the producer endpoint on, since we cannot notify the producer endpoint
129 // to commit synchronously on a different thread. Attempting to flush
130 // synchronously on another thread will lead to subtle bugs caused by
131 // out-of-order commit requests (crbug.com/919187#c28).
132 bool should_commit_synchronously =
133 task_runner_runs_on_current_thread &&
134 buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
135 commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
136
137 const size_t initial_page_idx = page_idx_;
138 for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
139 page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
140 bool is_new_page = false;
141
142 // TODO(primiano): make the page layout dynamic.
143 auto layout = SharedMemoryArbiterImpl::default_page_layout;
144
145 if (shmem_abi_.is_page_free(page_idx_)) {
146 // TODO(primiano): Use the |size_hint| here to decide the layout.
147 is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
148 }
149 uint32_t free_chunks;
150 if (is_new_page) {
151 free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
152 } else {
153 free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
154 }
155
156 for (uint32_t chunk_idx = 0; free_chunks;
157 chunk_idx++, free_chunks >>= 1) {
158 if (!(free_chunks & 1))
159 continue;
160 // We found a free chunk.
161 Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
162 page_idx_, chunk_idx, &header);
163 if (!chunk.is_valid())
164 continue;
165 if (stall_count > kLogAfterNStalls) {
166 PERFETTO_LOG("Recovered from stall after %d iterations",
167 stall_count);
168 }
169
170 if (should_commit_synchronously) {
171 // We can't flush while holding the lock.
172 scoped_lock.unlock();
173 FlushPendingCommitDataRequests();
174 return chunk;
175 } else {
176 return chunk;
177 }
178 }
179 }
180 } // scoped_lock
181
182 if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
183 PERFETTO_DLOG("Shared memory buffer exhaused, returning invalid Chunk!");
184 return Chunk();
185 }
186
187 PERFETTO_DCHECK(initially_bound_);
188
189 // All chunks are taken (either kBeingWritten by us or kBeingRead by the
190 // Service).
191 if (stall_count++ == kLogAfterNStalls) {
192 PERFETTO_LOG("Shared memory buffer overrun! Stalling");
193 }
194
195 if (stall_count == kAssertAtNStalls) {
196 PERFETTO_FATAL(
197 "Shared memory buffer max stall count exceeded; possible deadlock");
198 }
199
200 // If the IPC thread itself is stalled because the current process has
201 // filled up the SMB, we need to make sure that the service can process and
202 // purge the chunks written by our process, by flushing any pending commit
203 // requests. Because other threads in our process can continue to
204 // concurrently grab, fill and commit any chunks purged by the service, it
205 // is possible that the SMB remains full and the IPC thread remains stalled,
206 // needing to flush the concurrently queued up commits again. This is
207 // particularly likely with in-process perfetto service where the IPC thread
208 // is the service thread. To avoid remaining stalled forever in such a
209 // situation, we attempt to flush periodically after every N stalls.
210 if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
211 task_runner_runs_on_current_thread) {
212 // TODO(primiano): sending the IPC synchronously is a temporary workaround
213 // until the backpressure logic in probes_producer is sorted out. Until
214 // then the risk is that we stall the message loop waiting for the tracing
215 // service to consume the shared memory buffer (SMB) and, for this reason,
216 // never run the task that tells the service to purge the SMB. This must
217 // happen iff we are on the IPC thread, not doing this will cause
218 // deadlocks, doing this on the wrong thread causes out-of-order data
219 // commits (crbug.com/919187#c28).
220 FlushPendingCommitDataRequests();
221 } else {
222 base::SleepMicroseconds(stall_interval_us);
223 stall_interval_us =
224 std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
225 }
226 }
227 }
228
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)229 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
230 Chunk chunk,
231 MaybeUnboundBufferID target_buffer,
232 PatchList* patch_list) {
233 PERFETTO_DCHECK(chunk.is_valid());
234 const WriterID writer_id = chunk.writer_id();
235 UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
236 patch_list);
237 }
238
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)239 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
240 MaybeUnboundBufferID target_buffer,
241 PatchList* patch_list) {
242 PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
243 UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
244 }
245
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)246 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
247 Chunk chunk,
248 WriterID writer_id,
249 MaybeUnboundBufferID target_buffer,
250 PatchList* patch_list) {
251 // Note: chunk will be invalid if the call came from SendPatches().
252 base::TaskRunner* task_runner_to_post_callback_on = nullptr;
253 base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
254 {
255 std::lock_guard<std::mutex> scoped_lock(lock_);
256
257 if (!commit_data_req_) {
258 commit_data_req_.reset(new CommitDataRequest());
259
260 // Flushing the commit is only supported while we're |fully_bound_|. If we
261 // aren't, we'll flush when |fully_bound_| is updated.
262 if (fully_bound_) {
263 weak_this = weak_ptr_factory_.GetWeakPtr();
264 task_runner_to_post_callback_on = task_runner_;
265 }
266 }
267
268 // If a valid chunk is specified, return it and attach it to the request.
269 if (chunk.is_valid()) {
270 PERFETTO_DCHECK(chunk.writer_id() == writer_id);
271 uint8_t chunk_idx = chunk.chunk_idx();
272 bytes_pending_commit_ += chunk.size();
273 size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
274
275 // DO NOT access |chunk| after this point, has been std::move()-d above.
276
277 CommitDataRequest::ChunksToMove* ctm =
278 commit_data_req_->add_chunks_to_move();
279 ctm->set_page(static_cast<uint32_t>(page_idx));
280 ctm->set_chunk(chunk_idx);
281 ctm->set_target_buffer(target_buffer);
282 }
283
284 // Get the completed patches for previous chunks from the |patch_list|
285 // and attach them.
286 ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy.
287 CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
288 while (!patch_list->empty() && patch_list->front().is_patched()) {
289 if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
290 last_chunk_req = commit_data_req_->add_chunks_to_patch();
291 last_chunk_req->set_writer_id(writer_id);
292 last_chunk_id = patch_list->front().chunk_id;
293 last_chunk_req->set_chunk_id(last_chunk_id);
294 last_chunk_req->set_target_buffer(target_buffer);
295 }
296 auto* patch_req = last_chunk_req->add_patches();
297 patch_req->set_offset(patch_list->front().offset);
298 patch_req->set_data(&patch_list->front().size_field[0],
299 patch_list->front().size_field.size());
300 patch_list->pop_front();
301 }
302 // Patches are enqueued in the |patch_list| in order and are notified to
303 // the service when the chunk is returned. The only case when the current
304 // patch list is incomplete is if there is an unpatched entry at the head of
305 // the |patch_list| that belongs to the same ChunkID as the last one we are
306 // about to send to the service.
307 if (last_chunk_req && !patch_list->empty() &&
308 patch_list->front().chunk_id == last_chunk_id) {
309 last_chunk_req->set_has_more_patches(true);
310 }
311 } // scoped_lock(lock_)
312
313 // We shouldn't post tasks while locked. |task_runner_to_post_callback_on|
314 // remains valid after unlocking, because |task_runner_| is never reset.
315 if (task_runner_to_post_callback_on) {
316 task_runner_to_post_callback_on->PostTask([weak_this] {
317 if (weak_this)
318 weak_this->FlushPendingCommitDataRequests();
319 });
320 }
321 }
322
323 // This function is quite subtle. When making changes keep in mind these two
324 // challenges:
325 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
326 // thread (or, for in-process cases, on the same thread where
327 // TracingServiceImpl lives), the CommitData() call must be synchronous and
328 // not posted, to avoid deadlocks.
329 // 2) When different threads hit this function, we must guarantee that we don't
330 // accidentally make commits out of order. See commit 4e4fe8f56ef and
331 // crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)332 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
333 std::function<void()> callback) {
334 std::unique_ptr<CommitDataRequest> req;
335 {
336 std::unique_lock<std::mutex> scoped_lock(lock_);
337
338 // Flushing is only supported while |fully_bound_|, and there may still be
339 // unbound startup trace writers. If so, skip the commit for now - it'll be
340 // done when |fully_bound_| is updated.
341 if (!fully_bound_) {
342 if (callback)
343 pending_flush_callbacks_.push_back(callback);
344 return;
345 }
346
347 // May be called by TraceWriterImpl on any thread.
348 base::TaskRunner* task_runner = task_runner_;
349 if (!task_runner->RunsTasksOnCurrentThread()) {
350 // We shouldn't post a task while holding a lock. |task_runner| remains
351 // valid after unlocking, because |task_runner_| is never reset.
352 scoped_lock.unlock();
353
354 auto weak_this = weak_ptr_factory_.GetWeakPtr();
355 task_runner->PostTask([weak_this, callback] {
356 if (weak_this)
357 weak_this->FlushPendingCommitDataRequests(std::move(callback));
358 });
359 return;
360 }
361
362 // |commit_data_req_| could have become a nullptr, for example when a forced
363 // sync flush happens in GetNewChunk().
364 if (commit_data_req_) {
365 // Make sure any placeholder buffer IDs from StartupWriters are replaced
366 // before sending the request.
367 bool all_placeholders_replaced =
368 ReplaceCommitPlaceholderBufferIdsLocked();
369 // We're |fully_bound_|, thus all writers are bound and all placeholders
370 // should have been replaced.
371 PERFETTO_DCHECK(all_placeholders_replaced);
372
373 req = std::move(commit_data_req_);
374 bytes_pending_commit_ = 0;
375 }
376 } // scoped_lock
377
378 if (req) {
379 producer_endpoint_->CommitData(*req, callback);
380 } else if (callback) {
381 // If |req| was nullptr, it means that an enqueued deferred commit was
382 // executed just before this. At this point send an empty commit request
383 // to the service, just to linearize with it and give the guarantee to the
384 // caller that the data has been flushed into the service.
385 producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
386 }
387 }
388
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)389 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
390 BufferID target_buffer,
391 BufferExhaustedPolicy buffer_exhausted_policy) {
392 PERFETTO_CHECK(target_buffer > 0);
393 return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
394 }
395
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)396 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
397 uint16_t target_buffer_reservation_id) {
398 PERFETTO_CHECK(!initially_bound_);
399 return CreateTraceWriterInternal(
400 MakeTargetBufferIdForReservation(target_buffer_reservation_id),
401 BufferExhaustedPolicy::kDrop);
402 }
403
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)404 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
405 TracingService::ProducerEndpoint* producer_endpoint,
406 base::TaskRunner* task_runner) {
407 PERFETTO_DCHECK(producer_endpoint && task_runner);
408 PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
409 PERFETTO_CHECK(!initially_bound_);
410
411 bool should_flush = false;
412 std::function<void()> flush_callback;
413 {
414 std::lock_guard<std::mutex> scoped_lock(lock_);
415 PERFETTO_CHECK(!fully_bound_);
416 PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
417
418 producer_endpoint_ = producer_endpoint;
419 task_runner_ = task_runner;
420
421 // Now that we're bound to a task runner, also reset the WeakPtrFactory to
422 // it. Because this code runs on the task runner, the factory's weak
423 // pointers will be valid on it.
424 weak_ptr_factory_.Reset(this);
425
426 // All writers registered so far should be startup trace writers, since
427 // the producer cannot feasibly know the target buffer for any future
428 // session yet.
429 for (const auto& entry : pending_writers_) {
430 PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
431 }
432
433 // If all buffer reservations are bound, we can flush pending commits.
434 if (UpdateFullyBoundLocked()) {
435 should_flush = true;
436 flush_callback = TakePendingFlushCallbacksLocked();
437 }
438 } // scoped_lock
439
440 // Attempt to flush any pending commits (and run pending flush callbacks). If
441 // there are none, this will have no effect. If we ended up in a race that
442 // changed |fully_bound_| back to false, the commit will happen once we become
443 // |fully_bound_| again.
444 if (should_flush)
445 FlushPendingCommitDataRequests(flush_callback);
446 }
447
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)448 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
449 uint16_t target_buffer_reservation_id,
450 BufferID target_buffer_id) {
451 PERFETTO_DCHECK(target_buffer_id > 0);
452 PERFETTO_CHECK(!initially_bound_);
453
454 std::unique_lock<std::mutex> scoped_lock(lock_);
455
456 // We should already be bound to an endpoint, but not fully bound.
457 PERFETTO_CHECK(!fully_bound_);
458 PERFETTO_CHECK(producer_endpoint_);
459 PERFETTO_CHECK(task_runner_);
460 PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
461
462 BindStartupTargetBufferImpl(std::move(scoped_lock),
463 target_buffer_reservation_id, target_buffer_id);
464 }
465
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)466 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
467 uint16_t target_buffer_reservation_id) {
468 PERFETTO_CHECK(!initially_bound_);
469
470 std::unique_lock<std::mutex> scoped_lock(lock_);
471
472 // If we are already bound to an arbiter, we may need to flush after aborting
473 // the session, and thus should be running on the arbiter's task runner.
474 if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
475 // We shouldn't post tasks while locked.
476 auto* task_runner = task_runner_;
477 scoped_lock.unlock();
478
479 auto weak_this = weak_ptr_factory_.GetWeakPtr();
480 task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
481 if (!weak_this)
482 return;
483 weak_this->AbortStartupTracingForReservation(
484 target_buffer_reservation_id);
485 });
486 return;
487 }
488
489 PERFETTO_CHECK(!fully_bound_);
490
491 // Bind the target buffer reservation to an invalid buffer (ID 0), so that
492 // existing commits, as well as future commits (of currently acquired chunks),
493 // will be released as free free by the service but otherwise ignored (i.e.
494 // not copied into any valid target buffer).
495 BindStartupTargetBufferImpl(std::move(scoped_lock),
496 target_buffer_reservation_id,
497 /*target_buffer_id=*/kInvalidBufferId);
498 }
499
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)500 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
501 std::unique_lock<std::mutex> scoped_lock,
502 uint16_t target_buffer_reservation_id,
503 BufferID target_buffer_id) {
504 // We should already be bound to an endpoint if the target buffer is valid.
505 PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
506 target_buffer_id == kInvalidBufferId);
507
508 MaybeUnboundBufferID reserved_id =
509 MakeTargetBufferIdForReservation(target_buffer_reservation_id);
510
511 bool should_flush = false;
512 std::function<void()> flush_callback;
513 std::vector<std::pair<WriterID, BufferID>> writers_to_register;
514
515 TargetBufferReservation& reservation =
516 target_buffer_reservations_[reserved_id];
517 PERFETTO_CHECK(!reservation.resolved);
518 reservation.resolved = true;
519 reservation.target_buffer = target_buffer_id;
520
521 // Collect trace writers associated with the reservation.
522 for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
523 if (it->second == reserved_id) {
524 // No need to register writers that have an invalid target buffer.
525 if (target_buffer_id != kInvalidBufferId) {
526 writers_to_register.push_back(
527 std::make_pair(it->first, target_buffer_id));
528 }
529 it = pending_writers_.erase(it);
530 } else {
531 it++;
532 }
533 }
534
535 // If all buffer reservations are bound, we can flush pending commits.
536 if (UpdateFullyBoundLocked()) {
537 should_flush = true;
538 flush_callback = TakePendingFlushCallbacksLocked();
539 }
540
541 scoped_lock.unlock();
542
543 // Register any newly bound trace writers with the service.
544 for (const auto& writer_and_target_buffer : writers_to_register) {
545 producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
546 writer_and_target_buffer.second);
547 }
548
549 // Attempt to flush any pending commits (and run pending flush callbacks). If
550 // there are none, this will have no effect. If we ended up in a race that
551 // changed |fully_bound_| back to false, the commit will happen once we become
552 // |fully_bound_| again.
553 if (should_flush)
554 FlushPendingCommitDataRequests(flush_callback);
555 }
556
557 std::function<void()>
TakePendingFlushCallbacksLocked()558 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
559 if (pending_flush_callbacks_.empty())
560 return std::function<void()>();
561
562 std::vector<std::function<void()>> pending_flush_callbacks;
563 pending_flush_callbacks.swap(pending_flush_callbacks_);
564 // Capture the callback list into the lambda by copy.
565 return [pending_flush_callbacks]() {
566 for (auto& callback : pending_flush_callbacks)
567 callback();
568 };
569 }
570
NotifyFlushComplete(FlushRequestID req_id)571 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
572 base::TaskRunner* task_runner_to_commit_on = nullptr;
573
574 {
575 std::lock_guard<std::mutex> scoped_lock(lock_);
576 // If a commit_data_req_ exists it means that somebody else already posted a
577 // FlushPendingCommitDataRequests() task.
578 if (!commit_data_req_) {
579 commit_data_req_.reset(new CommitDataRequest());
580
581 // Flushing the commit is only supported while we're |fully_bound_|. If we
582 // aren't, we'll flush when |fully_bound_| is updated.
583 if (fully_bound_)
584 task_runner_to_commit_on = task_runner_;
585 } else {
586 // If there is another request queued and that also contains is a reply
587 // to a flush request, reply with the highest id.
588 req_id = std::max(req_id, commit_data_req_->flush_request_id());
589 }
590 commit_data_req_->set_flush_request_id(req_id);
591 } // scoped_lock
592
593 // We shouldn't post tasks while locked. |task_runner_to_commit_on|
594 // remains valid after unlocking, because |task_runner_| is never reset.
595 if (task_runner_to_commit_on) {
596 auto weak_this = weak_ptr_factory_.GetWeakPtr();
597 task_runner_to_commit_on->PostTask([weak_this] {
598 if (weak_this)
599 weak_this->FlushPendingCommitDataRequests();
600 });
601 }
602 }
603
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)604 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
605 MaybeUnboundBufferID target_buffer,
606 BufferExhaustedPolicy buffer_exhausted_policy) {
607 WriterID id;
608 base::TaskRunner* task_runner_to_register_on = nullptr;
609
610 {
611 std::lock_guard<std::mutex> scoped_lock(lock_);
612 id = active_writer_ids_.Allocate();
613
614 if (!id)
615 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
616
617 PERFETTO_DCHECK(!pending_writers_.count(id));
618
619 if (IsReservationTargetBufferId(target_buffer)) {
620 // If the reservation is new, mark it as unbound in
621 // |target_buffer_reservations_|. Otherwise, if the reservation was
622 // already bound, choose the bound buffer ID now.
623 auto it_and_inserted = target_buffer_reservations_.insert(
624 {target_buffer, TargetBufferReservation()});
625 if (it_and_inserted.first->second.resolved)
626 target_buffer = it_and_inserted.first->second.target_buffer;
627 }
628
629 if (IsReservationTargetBufferId(target_buffer)) {
630 // The arbiter and/or startup buffer reservations are not bound yet, so
631 // buffer the registration of the writer until after we're bound.
632 pending_writers_[id] = target_buffer;
633
634 // Mark the arbiter as not fully bound, since we now have at least one
635 // unbound trace writer / target buffer reservation.
636 fully_bound_ = false;
637 } else if (target_buffer != kInvalidBufferId) {
638 // Trace writer is bound, so arbiter should be bound to an endpoint, too.
639 PERFETTO_CHECK(producer_endpoint_ && task_runner_);
640 task_runner_to_register_on = task_runner_;
641 }
642 } // scoped_lock
643
644 // We shouldn't post tasks while locked. |task_runner_to_register_on|
645 // remains valid after unlocking, because |task_runner_| is never reset.
646 if (task_runner_to_register_on) {
647 auto weak_this = weak_ptr_factory_.GetWeakPtr();
648 task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
649 if (weak_this)
650 weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
651 });
652 }
653
654 return std::unique_ptr<TraceWriter>(
655 new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
656 }
657
ReleaseWriterID(WriterID id)658 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
659 base::TaskRunner* task_runner = nullptr;
660 {
661 std::lock_guard<std::mutex> scoped_lock(lock_);
662 active_writer_ids_.Free(id);
663
664 auto it = pending_writers_.find(id);
665 if (it != pending_writers_.end()) {
666 // Writer hasn't been bound yet and thus also not yet registered with the
667 // service.
668 pending_writers_.erase(it);
669 return;
670 }
671
672 // A trace writer from an aborted session may be destroyed before the
673 // arbiter is bound to a task runner. In that case, it was never registered
674 // with the service.
675 if (!task_runner_)
676 return;
677
678 task_runner = task_runner_;
679 } // scoped_lock
680
681 // We shouldn't post tasks while locked. |task_runner| remains valid after
682 // unlocking, because |task_runner_| is never reset.
683 auto weak_this = weak_ptr_factory_.GetWeakPtr();
684 task_runner->PostTask([weak_this, id] {
685 if (weak_this)
686 weak_this->producer_endpoint_->UnregisterTraceWriter(id);
687 });
688 }
689
ReplaceCommitPlaceholderBufferIdsLocked()690 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
691 if (!commit_data_req_)
692 return true;
693
694 bool all_placeholders_replaced = true;
695 for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
696 if (!IsReservationTargetBufferId(chunk.target_buffer()))
697 continue;
698 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
699 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
700 if (!it->second.resolved) {
701 all_placeholders_replaced = false;
702 continue;
703 }
704 chunk.set_target_buffer(it->second.target_buffer);
705 }
706 for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
707 if (!IsReservationTargetBufferId(chunk.target_buffer()))
708 continue;
709 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
710 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
711 if (!it->second.resolved) {
712 all_placeholders_replaced = false;
713 continue;
714 }
715 chunk.set_target_buffer(it->second.target_buffer);
716 }
717 return all_placeholders_replaced;
718 }
719
UpdateFullyBoundLocked()720 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
721 if (!producer_endpoint_) {
722 PERFETTO_DCHECK(!fully_bound_);
723 return false;
724 }
725 // We're fully bound if all target buffer reservations have a valid associated
726 // BufferID.
727 fully_bound_ = std::none_of(
728 target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
729 [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
730 return !entry.second.resolved;
731 });
732 return fully_bound_;
733 }
734
735 } // namespace perfetto
736