• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18 
19 #include "perfetto/base/logging.h"
20 #include "perfetto/base/task_runner.h"
21 #include "perfetto/base/time.h"
22 #include "perfetto/tracing/core/commit_data_request.h"
23 #include "perfetto/tracing/core/shared_memory.h"
24 #include "perfetto/tracing/core/startup_trace_writer_registry.h"
25 #include "src/tracing/core/null_trace_writer.h"
26 #include "src/tracing/core/trace_writer_impl.h"
27 
28 #include <limits>
29 #include <utility>
30 
31 namespace perfetto {
32 
33 using Chunk = SharedMemoryABI::Chunk;
34 
35 // static
36 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
37     SharedMemoryABI::PageLayout::kPageDiv1;
38 
39 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)40 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
41     SharedMemory* shared_memory,
42     size_t page_size,
43     TracingService::ProducerEndpoint* producer_endpoint,
44     base::TaskRunner* task_runner) {
45   return std::unique_ptr<SharedMemoryArbiterImpl>(
46       new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
47                                   page_size, producer_endpoint, task_runner));
48 }
49 
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)50 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
51     void* start,
52     size_t size,
53     size_t page_size,
54     TracingService::ProducerEndpoint* producer_endpoint,
55     base::TaskRunner* task_runner)
56     : task_runner_(task_runner),
57       producer_endpoint_(producer_endpoint),
58       shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
59       active_writer_ids_(kMaxWriterID),
60       weak_ptr_factory_(this) {}
61 
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,size_t size_hint)62 Chunk SharedMemoryArbiterImpl::GetNewChunk(
63     const SharedMemoryABI::ChunkHeader& header,
64     size_t size_hint) {
65   PERFETTO_DCHECK(size_hint == 0);  // Not implemented yet.
66   int stall_count = 0;
67   unsigned stall_interval_us = 0;
68   static const unsigned kMaxStallIntervalUs = 100000;
69   static const int kLogAfterNStalls = 3;
70   static const int kFlushCommitsAfterEveryNStalls = 2;
71 
72   for (;;) {
73     // TODO(primiano): Probably this lock is not really required and this code
74     // could be rewritten leveraging only the Try* atomic operations in
75     // SharedMemoryABI. But let's not be too adventurous for the moment.
76     {
77       std::lock_guard<std::mutex> scoped_lock(lock_);
78       const size_t initial_page_idx = page_idx_;
79       for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
80         page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
81         bool is_new_page = false;
82 
83         // TODO(primiano): make the page layout dynamic.
84         auto layout = SharedMemoryArbiterImpl::default_page_layout;
85 
86         if (shmem_abi_.is_page_free(page_idx_)) {
87           // TODO(primiano): Use the |size_hint| here to decide the layout.
88           is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
89         }
90         uint32_t free_chunks;
91         if (is_new_page) {
92           free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
93         } else {
94           free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
95         }
96 
97         for (uint32_t chunk_idx = 0; free_chunks;
98              chunk_idx++, free_chunks >>= 1) {
99           if (!(free_chunks & 1))
100             continue;
101           // We found a free chunk.
102           Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
103               page_idx_, chunk_idx, &header);
104           if (!chunk.is_valid())
105             continue;
106           if (stall_count > kLogAfterNStalls) {
107             PERFETTO_LOG("Recovered from stall after %d iterations",
108                          stall_count);
109           }
110           return chunk;
111         }
112       }
113     }  // std::lock_guard<std::mutex>
114 
115     // All chunks are taken (either kBeingWritten by us or kBeingRead by the
116     // Service). TODO: at this point we should return a bankrupcy chunk, not
117     // crash the process.
118     if (stall_count++ == kLogAfterNStalls) {
119       PERFETTO_ELOG("Shared memory buffer overrun! Stalling");
120     }
121 
122     // If the IPC thread itself is stalled because the current process has
123     // filled up the SMB, we need to make sure that the service can process and
124     // purge the chunks written by our process, by flushing any pending commit
125     // requests. Because other threads in our process can continue to
126     // concurrently grab, fill and commit any chunks purged by the service, it
127     // is possible that the SMB remains full and the IPC thread remains stalled,
128     // needing to flush the concurrently queued up commits again. This is
129     // particularly likely with in-process perfetto service where the IPC thread
130     // is the service thread. To avoid remaining stalled forever in such a
131     // situation, we attempt to flush periodically after every N stalls.
132     if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
133         task_runner_->RunsTasksOnCurrentThread()) {
134       // TODO(primiano): sending the IPC synchronously is a temporary workaround
135       // until the backpressure logic in probes_producer is sorted out. Until
136       // then the risk is that we stall the message loop waiting for the tracing
137       // service to consume the shared memory buffer (SMB) and, for this reason,
138       // never run the task that tells the service to purge the SMB. This must
139       // happen iff we are on the IPC thread, not doing this will cause
140       // deadlocks, doing this on the wrong thread causes out-of-order data
141       // commits (crbug.com/919187#c28).
142       FlushPendingCommitDataRequests();
143     } else {
144       base::SleepMicroseconds(stall_interval_us);
145       stall_interval_us =
146           std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
147     }
148   }
149 }
150 
ReturnCompletedChunk(Chunk chunk,BufferID target_buffer,PatchList * patch_list)151 void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk,
152                                                    BufferID target_buffer,
153                                                    PatchList* patch_list) {
154   PERFETTO_DCHECK(chunk.is_valid());
155   const WriterID writer_id = chunk.writer_id();
156   UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
157                           patch_list);
158 }
159 
SendPatches(WriterID writer_id,BufferID target_buffer,PatchList * patch_list)160 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
161                                           BufferID target_buffer,
162                                           PatchList* patch_list) {
163   PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
164   UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
165 }
166 
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,BufferID target_buffer,PatchList * patch_list)167 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(Chunk chunk,
168                                                       WriterID writer_id,
169                                                       BufferID target_buffer,
170                                                       PatchList* patch_list) {
171   // Note: chunk will be invalid if the call came from SendPatches().
172   bool should_post_callback = false;
173   bool should_commit_synchronously = false;
174   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
175   {
176     std::lock_guard<std::mutex> scoped_lock(lock_);
177 
178     if (!commit_data_req_) {
179       commit_data_req_.reset(new CommitDataRequest());
180       weak_this = weak_ptr_factory_.GetWeakPtr();
181       should_post_callback = true;
182     }
183 
184     // If a valid chunk is specified, return it and attach it to the request.
185     if (chunk.is_valid()) {
186       PERFETTO_DCHECK(chunk.writer_id() == writer_id);
187       uint8_t chunk_idx = chunk.chunk_idx();
188       bytes_pending_commit_ += chunk.size();
189       size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
190 
191       // DO NOT access |chunk| after this point, has been std::move()-d above.
192 
193       CommitDataRequest::ChunksToMove* ctm =
194           commit_data_req_->add_chunks_to_move();
195       ctm->set_page(static_cast<uint32_t>(page_idx));
196       ctm->set_chunk(chunk_idx);
197       ctm->set_target_buffer(target_buffer);
198 
199       // If more than half of the SMB.size() is filled with completed chunks for
200       // which we haven't notified the service yet (i.e. they are still enqueued
201       // in |commit_data_req_|), force a synchronous CommitDataRequest(), to
202       // reduce the likeliness of stalling the writer.
203       //
204       // We can only do this if we're writing on the same thread that we access
205       // the producer endpoint on, since we cannot notify the producer endpoint
206       // to commit synchronously on a different thread. Attempting to flush
207       // synchronously on another thread will lead to subtle bugs caused by
208       // out-of-order commit requests (crbug.com/919187#c28).
209       if (task_runner_->RunsTasksOnCurrentThread() &&
210           bytes_pending_commit_ >= shmem_abi_.size() / 2) {
211         should_commit_synchronously = true;
212         should_post_callback = false;
213       }
214     }
215 
216     // Get the completed patches for previous chunks from the |patch_list|
217     // and attach them.
218     ChunkID last_chunk_id = 0;  // 0 is irrelevant but keeps the compiler happy.
219     CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
220     while (!patch_list->empty() && patch_list->front().is_patched()) {
221       if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
222         last_chunk_req = commit_data_req_->add_chunks_to_patch();
223         last_chunk_req->set_writer_id(writer_id);
224         last_chunk_id = patch_list->front().chunk_id;
225         last_chunk_req->set_chunk_id(last_chunk_id);
226         last_chunk_req->set_target_buffer(target_buffer);
227       }
228       auto* patch_req = last_chunk_req->add_patches();
229       patch_req->set_offset(patch_list->front().offset);
230       patch_req->set_data(&patch_list->front().size_field[0],
231                           patch_list->front().size_field.size());
232       patch_list->pop_front();
233     }
234     // Patches are enqueued in the |patch_list| in order and are notified to
235     // the service when the chunk is returned. The only case when the current
236     // patch list is incomplete is if there is an unpatched entry at the head of
237     // the |patch_list| that belongs to the same ChunkID as the last one we are
238     // about to send to the service.
239     if (last_chunk_req && !patch_list->empty() &&
240         patch_list->front().chunk_id == last_chunk_id) {
241       last_chunk_req->set_has_more_patches(true);
242     }
243   }  // scoped_lock(lock_)
244 
245   if (should_post_callback) {
246     task_runner_->PostTask([weak_this] {
247       if (weak_this)
248         weak_this->FlushPendingCommitDataRequests();
249     });
250   }
251 
252   if (should_commit_synchronously)
253     FlushPendingCommitDataRequests();
254 }
255 
256 // This function is quite subtle. When making changes keep in mind these two
257 // challenges:
258 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
259 //    thread (or, for in-process cases, on the same thread where
260 //    TracingServiceImpl lives), the CommitData() call must be synchronous and
261 //    not posted, to avoid deadlocks.
262 // 2) When different threads hit this function, we must guarantee that we don't
263 //    accidentally make commits out of order. See commit 4e4fe8f56ef and
264 //    crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)265 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
266     std::function<void()> callback) {
267   // May be called by TraceWriterImpl on any thread.
268   if (!task_runner_->RunsTasksOnCurrentThread()) {
269     auto weak_this = weak_ptr_factory_.GetWeakPtr();
270     task_runner_->PostTask([weak_this, callback] {
271       if (weak_this)
272         weak_this->FlushPendingCommitDataRequests(std::move(callback));
273     });
274     return;
275   }
276 
277   std::shared_ptr<CommitDataRequest> req;
278   {
279     std::lock_guard<std::mutex> scoped_lock(lock_);
280     req = std::move(commit_data_req_);
281     bytes_pending_commit_ = 0;
282   }
283 
284   // |req| could be a nullptr if |commit_data_req_| became a nullptr. For
285   // example when a forced sync flush happens in GetNewChunk().
286   if (req) {
287     producer_endpoint_->CommitData(*req, callback);
288   } else if (callback) {
289     // If |req| was nullptr, it means that an enqueued deferred commit was
290     // executed just before this. At this point send an empty commit request
291     // to the service, just to linearize with it and give the guarantee to the
292     // caller that the data has been flushed into the service.
293     producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
294   }
295 }
296 
CreateTraceWriter(BufferID target_buffer)297 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
298     BufferID target_buffer) {
299   WriterID id;
300   {
301     std::lock_guard<std::mutex> scoped_lock(lock_);
302     id = active_writer_ids_.Allocate();
303   }
304   if (!id)
305     return std::unique_ptr<TraceWriter>(new NullTraceWriter());
306   auto weak_this = weak_ptr_factory_.GetWeakPtr();
307   task_runner_->PostTask([weak_this, id, target_buffer] {
308     if (weak_this)
309       weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
310   });
311   return std::unique_ptr<TraceWriter>(
312       new TraceWriterImpl(this, id, target_buffer));
313 }
314 
BindStartupTraceWriterRegistry(std::unique_ptr<StartupTraceWriterRegistry> registry,BufferID target_buffer)315 void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry(
316     std::unique_ptr<StartupTraceWriterRegistry> registry,
317     BufferID target_buffer) {
318   if (!task_runner_->RunsTasksOnCurrentThread()) {
319     auto weak_this = weak_ptr_factory_.GetWeakPtr();
320     auto* raw_reg = registry.release();
321     task_runner_->PostTask([weak_this, raw_reg, target_buffer]() {
322       std::unique_ptr<StartupTraceWriterRegistry> owned_reg(raw_reg);
323       if (!weak_this)
324         return;
325       weak_this->BindStartupTraceWriterRegistry(std::move(owned_reg),
326                                                 target_buffer);
327     });
328     return;
329   }
330 
331   // The registry will be owned by the arbiter, so it's safe to capture |this|
332   // in the callback.
333   auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) {
334     std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete;
335     {
336       std::lock_guard<std::mutex> scoped_lock(lock_);
337 
338       for (auto it = startup_trace_writer_registries_.begin();
339            it != startup_trace_writer_registries_.end(); it++) {
340         if (it->get() == bound_registry) {
341           // We can't delete the registry while the arbiter's lock is held
342           // (to avoid lock inversion).
343           registry_to_delete = std::move(*it);
344           startup_trace_writer_registries_.erase(it);
345           break;
346         }
347       }
348     }
349 
350     // The registry should have been in |startup_trace_writer_registries_|.
351     PERFETTO_DCHECK(registry_to_delete);
352     registry_to_delete.reset();
353   };
354   registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback);
355   std::lock_guard<std::mutex> scoped_lock(lock_);
356   startup_trace_writer_registries_.push_back(std::move(registry));
357 }
358 
NotifyFlushComplete(FlushRequestID req_id)359 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
360   bool should_post_commit_task = false;
361   {
362     std::lock_guard<std::mutex> scoped_lock(lock_);
363     // If a commit_data_req_ exists it means that somebody else already posted a
364     // FlushPendingCommitDataRequests() task.
365     if (!commit_data_req_) {
366       commit_data_req_.reset(new CommitDataRequest());
367       should_post_commit_task = true;
368     } else {
369       // If there is another request queued and that also contains is a reply
370       // to a flush request, reply with the highest id.
371       req_id = std::max(req_id, commit_data_req_->flush_request_id());
372     }
373     commit_data_req_->set_flush_request_id(req_id);
374   }
375   if (should_post_commit_task) {
376     auto weak_this = weak_ptr_factory_.GetWeakPtr();
377     task_runner_->PostTask([weak_this] {
378       if (weak_this)
379         weak_this->FlushPendingCommitDataRequests();
380     });
381   }
382 }
383 
ReleaseWriterID(WriterID id)384 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
385   auto weak_this = weak_ptr_factory_.GetWeakPtr();
386   task_runner_->PostTask([weak_this, id] {
387     if (weak_this)
388       weak_this->producer_endpoint_->UnregisterTraceWriter(id);
389   });
390 
391   std::lock_guard<std::mutex> scoped_lock(lock_);
392   active_writer_ids_.Free(id);
393 }
394 
395 }  // namespace perfetto
396