• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18 
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22 
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "src/tracing/core/null_trace_writer.h"
29 #include "src/tracing/core/trace_writer_impl.h"
30 
31 namespace perfetto {
32 
33 using Chunk = SharedMemoryABI::Chunk;
34 
35 namespace {
36 static_assert(sizeof(BufferID) == sizeof(uint16_t),
37               "The MaybeUnboundBufferID logic requires BufferID not to grow "
38               "above uint16_t.");
39 
MakeTargetBufferIdForReservation(uint16_t reservation_id)40 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
41   // Reservation IDs are stored in the upper bits.
42   PERFETTO_CHECK(reservation_id > 0);
43   return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
44 }
45 
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)46 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
47   return (buffer_id >> 16) > 0;
48 }
49 }  // namespace
50 
51 // static
52 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
53     SharedMemoryABI::PageLayout::kPageDiv1;
54 
55 // static
56 constexpr BufferID SharedMemoryArbiterImpl::kInvalidBufferId;
57 
58 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)59 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
60     SharedMemory* shared_memory,
61     size_t page_size,
62     TracingService::ProducerEndpoint* producer_endpoint,
63     base::TaskRunner* task_runner) {
64   return std::unique_ptr<SharedMemoryArbiterImpl>(
65       new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
66                                   page_size, producer_endpoint, task_runner));
67 }
68 
69 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size)70 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
71     SharedMemory* shared_memory,
72     size_t page_size) {
73   return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
74       shared_memory->start(), shared_memory->size(), page_size,
75       /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
76 }
77 
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)78 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
79     void* start,
80     size_t size,
81     size_t page_size,
82     TracingService::ProducerEndpoint* producer_endpoint,
83     base::TaskRunner* task_runner)
84     : initially_bound_(task_runner && producer_endpoint),
85       producer_endpoint_(producer_endpoint),
86       task_runner_(task_runner),
87       shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
88       active_writer_ids_(kMaxWriterID),
89       fully_bound_(initially_bound_),
90       weak_ptr_factory_(this) {}
91 
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy,size_t size_hint)92 Chunk SharedMemoryArbiterImpl::GetNewChunk(
93     const SharedMemoryABI::ChunkHeader& header,
94     BufferExhaustedPolicy buffer_exhausted_policy,
95     size_t size_hint) {
96   PERFETTO_DCHECK(size_hint == 0);  // Not implemented yet.
97   // If initially unbound, we do not support stalling. In theory, we could
98   // support stalling for TraceWriters created after the arbiter and startup
99   // buffer reservations were bound, but to avoid raciness between the creation
100   // of startup writers and binding, we categorically forbid kStall mode.
101   PERFETTO_DCHECK(initially_bound_ ||
102                   buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
103 
104   int stall_count = 0;
105   unsigned stall_interval_us = 0;
106   bool task_runner_runs_on_current_thread = false;
107   static const unsigned kMaxStallIntervalUs = 100000;
108   static const int kLogAfterNStalls = 3;
109   static const int kFlushCommitsAfterEveryNStalls = 2;
110   static const int kAssertAtNStalls = 100;
111 
112   for (;;) {
113     // TODO(primiano): Probably this lock is not really required and this code
114     // could be rewritten leveraging only the Try* atomic operations in
115     // SharedMemoryABI. But let's not be too adventurous for the moment.
116     {
117       std::unique_lock<std::mutex> scoped_lock(lock_);
118 
119       task_runner_runs_on_current_thread =
120           task_runner_ && task_runner_->RunsTasksOnCurrentThread();
121 
122       // If more than half of the SMB.size() is filled with completed chunks for
123       // which we haven't notified the service yet (i.e. they are still enqueued
124       // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
125       // we acquire a chunk, to reduce the likeliness of stalling the writer.
126       //
127       // We can only do this if we're writing on the same thread that we access
128       // the producer endpoint on, since we cannot notify the producer endpoint
129       // to commit synchronously on a different thread. Attempting to flush
130       // synchronously on another thread will lead to subtle bugs caused by
131       // out-of-order commit requests (crbug.com/919187#c28).
132       bool should_commit_synchronously =
133           task_runner_runs_on_current_thread &&
134           buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
135           commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
136 
137       const size_t initial_page_idx = page_idx_;
138       for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
139         page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
140         bool is_new_page = false;
141 
142         // TODO(primiano): make the page layout dynamic.
143         auto layout = SharedMemoryArbiterImpl::default_page_layout;
144 
145         if (shmem_abi_.is_page_free(page_idx_)) {
146           // TODO(primiano): Use the |size_hint| here to decide the layout.
147           is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
148         }
149         uint32_t free_chunks;
150         if (is_new_page) {
151           free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
152         } else {
153           free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
154         }
155 
156         for (uint32_t chunk_idx = 0; free_chunks;
157              chunk_idx++, free_chunks >>= 1) {
158           if (!(free_chunks & 1))
159             continue;
160           // We found a free chunk.
161           Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
162               page_idx_, chunk_idx, &header);
163           if (!chunk.is_valid())
164             continue;
165           if (stall_count > kLogAfterNStalls) {
166             PERFETTO_LOG("Recovered from stall after %d iterations",
167                          stall_count);
168           }
169 
170           if (should_commit_synchronously) {
171             // We can't flush while holding the lock.
172             scoped_lock.unlock();
173             FlushPendingCommitDataRequests();
174             return chunk;
175           } else {
176             return chunk;
177           }
178         }
179       }
180     }  // scoped_lock
181 
182     if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
183       PERFETTO_DLOG("Shared memory buffer exhaused, returning invalid Chunk!");
184       return Chunk();
185     }
186 
187     PERFETTO_DCHECK(initially_bound_);
188 
189     // All chunks are taken (either kBeingWritten by us or kBeingRead by the
190     // Service).
191     if (stall_count++ == kLogAfterNStalls) {
192       PERFETTO_LOG("Shared memory buffer overrun! Stalling");
193     }
194 
195     if (stall_count == kAssertAtNStalls) {
196       PERFETTO_FATAL(
197           "Shared memory buffer max stall count exceeded; possible deadlock");
198     }
199 
200     // If the IPC thread itself is stalled because the current process has
201     // filled up the SMB, we need to make sure that the service can process and
202     // purge the chunks written by our process, by flushing any pending commit
203     // requests. Because other threads in our process can continue to
204     // concurrently grab, fill and commit any chunks purged by the service, it
205     // is possible that the SMB remains full and the IPC thread remains stalled,
206     // needing to flush the concurrently queued up commits again. This is
207     // particularly likely with in-process perfetto service where the IPC thread
208     // is the service thread. To avoid remaining stalled forever in such a
209     // situation, we attempt to flush periodically after every N stalls.
210     if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
211         task_runner_runs_on_current_thread) {
212       // TODO(primiano): sending the IPC synchronously is a temporary workaround
213       // until the backpressure logic in probes_producer is sorted out. Until
214       // then the risk is that we stall the message loop waiting for the tracing
215       // service to consume the shared memory buffer (SMB) and, for this reason,
216       // never run the task that tells the service to purge the SMB. This must
217       // happen iff we are on the IPC thread, not doing this will cause
218       // deadlocks, doing this on the wrong thread causes out-of-order data
219       // commits (crbug.com/919187#c28).
220       FlushPendingCommitDataRequests();
221     } else {
222       base::SleepMicroseconds(stall_interval_us);
223       stall_interval_us =
224           std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
225     }
226   }
227 }
228 
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)229 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
230     Chunk chunk,
231     MaybeUnboundBufferID target_buffer,
232     PatchList* patch_list) {
233   PERFETTO_DCHECK(chunk.is_valid());
234   const WriterID writer_id = chunk.writer_id();
235   UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
236                           patch_list);
237 }
238 
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)239 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
240                                           MaybeUnboundBufferID target_buffer,
241                                           PatchList* patch_list) {
242   PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
243   UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
244 }
245 
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)246 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
247     Chunk chunk,
248     WriterID writer_id,
249     MaybeUnboundBufferID target_buffer,
250     PatchList* patch_list) {
251   // Note: chunk will be invalid if the call came from SendPatches().
252   base::TaskRunner* task_runner_to_post_callback_on = nullptr;
253   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
254   {
255     std::lock_guard<std::mutex> scoped_lock(lock_);
256 
257     if (!commit_data_req_) {
258       commit_data_req_.reset(new CommitDataRequest());
259 
260       // Flushing the commit is only supported while we're |fully_bound_|. If we
261       // aren't, we'll flush when |fully_bound_| is updated.
262       if (fully_bound_) {
263         weak_this = weak_ptr_factory_.GetWeakPtr();
264         task_runner_to_post_callback_on = task_runner_;
265       }
266     }
267 
268     // If a valid chunk is specified, return it and attach it to the request.
269     if (chunk.is_valid()) {
270       PERFETTO_DCHECK(chunk.writer_id() == writer_id);
271       uint8_t chunk_idx = chunk.chunk_idx();
272       bytes_pending_commit_ += chunk.size();
273       size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
274 
275       // DO NOT access |chunk| after this point, has been std::move()-d above.
276 
277       CommitDataRequest::ChunksToMove* ctm =
278           commit_data_req_->add_chunks_to_move();
279       ctm->set_page(static_cast<uint32_t>(page_idx));
280       ctm->set_chunk(chunk_idx);
281       ctm->set_target_buffer(target_buffer);
282     }
283 
284     // Get the completed patches for previous chunks from the |patch_list|
285     // and attach them.
286     ChunkID last_chunk_id = 0;  // 0 is irrelevant but keeps the compiler happy.
287     CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
288     while (!patch_list->empty() && patch_list->front().is_patched()) {
289       if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
290         last_chunk_req = commit_data_req_->add_chunks_to_patch();
291         last_chunk_req->set_writer_id(writer_id);
292         last_chunk_id = patch_list->front().chunk_id;
293         last_chunk_req->set_chunk_id(last_chunk_id);
294         last_chunk_req->set_target_buffer(target_buffer);
295       }
296       auto* patch_req = last_chunk_req->add_patches();
297       patch_req->set_offset(patch_list->front().offset);
298       patch_req->set_data(&patch_list->front().size_field[0],
299                           patch_list->front().size_field.size());
300       patch_list->pop_front();
301     }
302     // Patches are enqueued in the |patch_list| in order and are notified to
303     // the service when the chunk is returned. The only case when the current
304     // patch list is incomplete is if there is an unpatched entry at the head of
305     // the |patch_list| that belongs to the same ChunkID as the last one we are
306     // about to send to the service.
307     if (last_chunk_req && !patch_list->empty() &&
308         patch_list->front().chunk_id == last_chunk_id) {
309       last_chunk_req->set_has_more_patches(true);
310     }
311   }  // scoped_lock(lock_)
312 
313   // We shouldn't post tasks while locked. |task_runner_to_post_callback_on|
314   // remains valid after unlocking, because |task_runner_| is never reset.
315   if (task_runner_to_post_callback_on) {
316     task_runner_to_post_callback_on->PostTask([weak_this] {
317       if (weak_this)
318         weak_this->FlushPendingCommitDataRequests();
319     });
320   }
321 }
322 
323 // This function is quite subtle. When making changes keep in mind these two
324 // challenges:
325 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
326 //    thread (or, for in-process cases, on the same thread where
327 //    TracingServiceImpl lives), the CommitData() call must be synchronous and
328 //    not posted, to avoid deadlocks.
329 // 2) When different threads hit this function, we must guarantee that we don't
330 //    accidentally make commits out of order. See commit 4e4fe8f56ef and
331 //    crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)332 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
333     std::function<void()> callback) {
334   std::unique_ptr<CommitDataRequest> req;
335   {
336     std::unique_lock<std::mutex> scoped_lock(lock_);
337 
338     // Flushing is only supported while |fully_bound_|, and there may still be
339     // unbound startup trace writers. If so, skip the commit for now - it'll be
340     // done when |fully_bound_| is updated.
341     if (!fully_bound_) {
342       if (callback)
343         pending_flush_callbacks_.push_back(callback);
344       return;
345     }
346 
347     // May be called by TraceWriterImpl on any thread.
348     base::TaskRunner* task_runner = task_runner_;
349     if (!task_runner->RunsTasksOnCurrentThread()) {
350       // We shouldn't post a task while holding a lock. |task_runner| remains
351       // valid after unlocking, because |task_runner_| is never reset.
352       scoped_lock.unlock();
353 
354       auto weak_this = weak_ptr_factory_.GetWeakPtr();
355       task_runner->PostTask([weak_this, callback] {
356         if (weak_this)
357           weak_this->FlushPendingCommitDataRequests(std::move(callback));
358       });
359       return;
360     }
361 
362     // |commit_data_req_| could have become a nullptr, for example when a forced
363     // sync flush happens in GetNewChunk().
364     if (commit_data_req_) {
365       // Make sure any placeholder buffer IDs from StartupWriters are replaced
366       // before sending the request.
367       bool all_placeholders_replaced =
368           ReplaceCommitPlaceholderBufferIdsLocked();
369       // We're |fully_bound_|, thus all writers are bound and all placeholders
370       // should have been replaced.
371       PERFETTO_DCHECK(all_placeholders_replaced);
372 
373       req = std::move(commit_data_req_);
374       bytes_pending_commit_ = 0;
375     }
376   }  // scoped_lock
377 
378   if (req) {
379     producer_endpoint_->CommitData(*req, callback);
380   } else if (callback) {
381     // If |req| was nullptr, it means that an enqueued deferred commit was
382     // executed just before this. At this point send an empty commit request
383     // to the service, just to linearize with it and give the guarantee to the
384     // caller that the data has been flushed into the service.
385     producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
386   }
387 }
388 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)389 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
390     BufferID target_buffer,
391     BufferExhaustedPolicy buffer_exhausted_policy) {
392   PERFETTO_CHECK(target_buffer > 0);
393   return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
394 }
395 
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)396 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
397     uint16_t target_buffer_reservation_id) {
398   PERFETTO_CHECK(!initially_bound_);
399   return CreateTraceWriterInternal(
400       MakeTargetBufferIdForReservation(target_buffer_reservation_id),
401       BufferExhaustedPolicy::kDrop);
402 }
403 
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)404 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
405     TracingService::ProducerEndpoint* producer_endpoint,
406     base::TaskRunner* task_runner) {
407   PERFETTO_DCHECK(producer_endpoint && task_runner);
408   PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
409   PERFETTO_CHECK(!initially_bound_);
410 
411   bool should_flush = false;
412   std::function<void()> flush_callback;
413   {
414     std::lock_guard<std::mutex> scoped_lock(lock_);
415     PERFETTO_CHECK(!fully_bound_);
416     PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
417 
418     producer_endpoint_ = producer_endpoint;
419     task_runner_ = task_runner;
420 
421     // Now that we're bound to a task runner, also reset the WeakPtrFactory to
422     // it. Because this code runs on the task runner, the factory's weak
423     // pointers will be valid on it.
424     weak_ptr_factory_.Reset(this);
425 
426     // All writers registered so far should be startup trace writers, since
427     // the producer cannot feasibly know the target buffer for any future
428     // session yet.
429     for (const auto& entry : pending_writers_) {
430       PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
431     }
432 
433     // If all buffer reservations are bound, we can flush pending commits.
434     if (UpdateFullyBoundLocked()) {
435       should_flush = true;
436       flush_callback = TakePendingFlushCallbacksLocked();
437     }
438   }  // scoped_lock
439 
440   // Attempt to flush any pending commits (and run pending flush callbacks). If
441   // there are none, this will have no effect. If we ended up in a race that
442   // changed |fully_bound_| back to false, the commit will happen once we become
443   // |fully_bound_| again.
444   if (should_flush)
445     FlushPendingCommitDataRequests(flush_callback);
446 }
447 
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)448 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
449     uint16_t target_buffer_reservation_id,
450     BufferID target_buffer_id) {
451   PERFETTO_DCHECK(target_buffer_id > 0);
452   PERFETTO_CHECK(!initially_bound_);
453 
454   std::unique_lock<std::mutex> scoped_lock(lock_);
455 
456   // We should already be bound to an endpoint, but not fully bound.
457   PERFETTO_CHECK(!fully_bound_);
458   PERFETTO_CHECK(producer_endpoint_);
459   PERFETTO_CHECK(task_runner_);
460   PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
461 
462   BindStartupTargetBufferImpl(std::move(scoped_lock),
463                               target_buffer_reservation_id, target_buffer_id);
464 }
465 
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)466 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
467     uint16_t target_buffer_reservation_id) {
468   PERFETTO_CHECK(!initially_bound_);
469 
470   std::unique_lock<std::mutex> scoped_lock(lock_);
471 
472   // If we are already bound to an arbiter, we may need to flush after aborting
473   // the session, and thus should be running on the arbiter's task runner.
474   if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
475     // We shouldn't post tasks while locked.
476     auto* task_runner = task_runner_;
477     scoped_lock.unlock();
478 
479     auto weak_this = weak_ptr_factory_.GetWeakPtr();
480     task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
481       if (!weak_this)
482         return;
483       weak_this->AbortStartupTracingForReservation(
484           target_buffer_reservation_id);
485     });
486     return;
487   }
488 
489   PERFETTO_CHECK(!fully_bound_);
490 
491   // Bind the target buffer reservation to an invalid buffer (ID 0), so that
492   // existing commits, as well as future commits (of currently acquired chunks),
493   // will be released as free free by the service but otherwise ignored (i.e.
494   // not copied into any valid target buffer).
495   BindStartupTargetBufferImpl(std::move(scoped_lock),
496                               target_buffer_reservation_id,
497                               /*target_buffer_id=*/kInvalidBufferId);
498 }
499 
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)500 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
501     std::unique_lock<std::mutex> scoped_lock,
502     uint16_t target_buffer_reservation_id,
503     BufferID target_buffer_id) {
504   // We should already be bound to an endpoint if the target buffer is valid.
505   PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
506                   target_buffer_id == kInvalidBufferId);
507 
508   MaybeUnboundBufferID reserved_id =
509       MakeTargetBufferIdForReservation(target_buffer_reservation_id);
510 
511   bool should_flush = false;
512   std::function<void()> flush_callback;
513   std::vector<std::pair<WriterID, BufferID>> writers_to_register;
514 
515   TargetBufferReservation& reservation =
516       target_buffer_reservations_[reserved_id];
517   PERFETTO_CHECK(!reservation.resolved);
518   reservation.resolved = true;
519   reservation.target_buffer = target_buffer_id;
520 
521   // Collect trace writers associated with the reservation.
522   for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
523     if (it->second == reserved_id) {
524       // No need to register writers that have an invalid target buffer.
525       if (target_buffer_id != kInvalidBufferId) {
526         writers_to_register.push_back(
527             std::make_pair(it->first, target_buffer_id));
528       }
529       it = pending_writers_.erase(it);
530     } else {
531       it++;
532     }
533   }
534 
535   // If all buffer reservations are bound, we can flush pending commits.
536   if (UpdateFullyBoundLocked()) {
537     should_flush = true;
538     flush_callback = TakePendingFlushCallbacksLocked();
539   }
540 
541   scoped_lock.unlock();
542 
543   // Register any newly bound trace writers with the service.
544   for (const auto& writer_and_target_buffer : writers_to_register) {
545     producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
546                                             writer_and_target_buffer.second);
547   }
548 
549   // Attempt to flush any pending commits (and run pending flush callbacks). If
550   // there are none, this will have no effect. If we ended up in a race that
551   // changed |fully_bound_| back to false, the commit will happen once we become
552   // |fully_bound_| again.
553   if (should_flush)
554     FlushPendingCommitDataRequests(flush_callback);
555 }
556 
557 std::function<void()>
TakePendingFlushCallbacksLocked()558 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
559   if (pending_flush_callbacks_.empty())
560     return std::function<void()>();
561 
562   std::vector<std::function<void()>> pending_flush_callbacks;
563   pending_flush_callbacks.swap(pending_flush_callbacks_);
564   // Capture the callback list into the lambda by copy.
565   return [pending_flush_callbacks]() {
566     for (auto& callback : pending_flush_callbacks)
567       callback();
568   };
569 }
570 
NotifyFlushComplete(FlushRequestID req_id)571 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
572   base::TaskRunner* task_runner_to_commit_on = nullptr;
573 
574   {
575     std::lock_guard<std::mutex> scoped_lock(lock_);
576     // If a commit_data_req_ exists it means that somebody else already posted a
577     // FlushPendingCommitDataRequests() task.
578     if (!commit_data_req_) {
579       commit_data_req_.reset(new CommitDataRequest());
580 
581       // Flushing the commit is only supported while we're |fully_bound_|. If we
582       // aren't, we'll flush when |fully_bound_| is updated.
583       if (fully_bound_)
584         task_runner_to_commit_on = task_runner_;
585     } else {
586       // If there is another request queued and that also contains is a reply
587       // to a flush request, reply with the highest id.
588       req_id = std::max(req_id, commit_data_req_->flush_request_id());
589     }
590     commit_data_req_->set_flush_request_id(req_id);
591   }  // scoped_lock
592 
593   // We shouldn't post tasks while locked. |task_runner_to_commit_on|
594   // remains valid after unlocking, because |task_runner_| is never reset.
595   if (task_runner_to_commit_on) {
596     auto weak_this = weak_ptr_factory_.GetWeakPtr();
597     task_runner_to_commit_on->PostTask([weak_this] {
598       if (weak_this)
599         weak_this->FlushPendingCommitDataRequests();
600     });
601   }
602 }
603 
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)604 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
605     MaybeUnboundBufferID target_buffer,
606     BufferExhaustedPolicy buffer_exhausted_policy) {
607   WriterID id;
608   base::TaskRunner* task_runner_to_register_on = nullptr;
609 
610   {
611     std::lock_guard<std::mutex> scoped_lock(lock_);
612     id = active_writer_ids_.Allocate();
613 
614     if (!id)
615       return std::unique_ptr<TraceWriter>(new NullTraceWriter());
616 
617     PERFETTO_DCHECK(!pending_writers_.count(id));
618 
619     if (IsReservationTargetBufferId(target_buffer)) {
620       // If the reservation is new, mark it as unbound in
621       // |target_buffer_reservations_|. Otherwise, if the reservation was
622       // already bound, choose the bound buffer ID now.
623       auto it_and_inserted = target_buffer_reservations_.insert(
624           {target_buffer, TargetBufferReservation()});
625       if (it_and_inserted.first->second.resolved)
626         target_buffer = it_and_inserted.first->second.target_buffer;
627     }
628 
629     if (IsReservationTargetBufferId(target_buffer)) {
630       // The arbiter and/or startup buffer reservations are not bound yet, so
631       // buffer the registration of the writer until after we're bound.
632       pending_writers_[id] = target_buffer;
633 
634       // Mark the arbiter as not fully bound, since we now have at least one
635       // unbound trace writer / target buffer reservation.
636       fully_bound_ = false;
637     } else if (target_buffer != kInvalidBufferId) {
638       // Trace writer is bound, so arbiter should be bound to an endpoint, too.
639       PERFETTO_CHECK(producer_endpoint_ && task_runner_);
640       task_runner_to_register_on = task_runner_;
641     }
642   }  // scoped_lock
643 
644   // We shouldn't post tasks while locked. |task_runner_to_register_on|
645   // remains valid after unlocking, because |task_runner_| is never reset.
646   if (task_runner_to_register_on) {
647     auto weak_this = weak_ptr_factory_.GetWeakPtr();
648     task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
649       if (weak_this)
650         weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
651     });
652   }
653 
654   return std::unique_ptr<TraceWriter>(
655       new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
656 }
657 
ReleaseWriterID(WriterID id)658 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
659   base::TaskRunner* task_runner = nullptr;
660   {
661     std::lock_guard<std::mutex> scoped_lock(lock_);
662     active_writer_ids_.Free(id);
663 
664     auto it = pending_writers_.find(id);
665     if (it != pending_writers_.end()) {
666       // Writer hasn't been bound yet and thus also not yet registered with the
667       // service.
668       pending_writers_.erase(it);
669       return;
670     }
671 
672     // A trace writer from an aborted session may be destroyed before the
673     // arbiter is bound to a task runner. In that case, it was never registered
674     // with the service.
675     if (!task_runner_)
676       return;
677 
678     task_runner = task_runner_;
679   }  // scoped_lock
680 
681   // We shouldn't post tasks while locked. |task_runner| remains valid after
682   // unlocking, because |task_runner_| is never reset.
683   auto weak_this = weak_ptr_factory_.GetWeakPtr();
684   task_runner->PostTask([weak_this, id] {
685     if (weak_this)
686       weak_this->producer_endpoint_->UnregisterTraceWriter(id);
687   });
688 }
689 
ReplaceCommitPlaceholderBufferIdsLocked()690 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
691   if (!commit_data_req_)
692     return true;
693 
694   bool all_placeholders_replaced = true;
695   for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
696     if (!IsReservationTargetBufferId(chunk.target_buffer()))
697       continue;
698     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
699     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
700     if (!it->second.resolved) {
701       all_placeholders_replaced = false;
702       continue;
703     }
704     chunk.set_target_buffer(it->second.target_buffer);
705   }
706   for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
707     if (!IsReservationTargetBufferId(chunk.target_buffer()))
708       continue;
709     const auto it = target_buffer_reservations_.find(chunk.target_buffer());
710     PERFETTO_DCHECK(it != target_buffer_reservations_.end());
711     if (!it->second.resolved) {
712       all_placeholders_replaced = false;
713       continue;
714     }
715     chunk.set_target_buffer(it->second.target_buffer);
716   }
717   return all_placeholders_replaced;
718 }
719 
UpdateFullyBoundLocked()720 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
721   if (!producer_endpoint_) {
722     PERFETTO_DCHECK(!fully_bound_);
723     return false;
724   }
725   // We're fully bound if all target buffer reservations have a valid associated
726   // BufferID.
727   fully_bound_ = std::none_of(
728       target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
729       [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
730         return !entry.second.resolved;
731       });
732   return fully_bound_;
733 }
734 
735 }  // namespace perfetto
736