1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include "perfetto/base/logging.h"
20 #include "perfetto/base/task_runner.h"
21 #include "perfetto/base/time.h"
22 #include "perfetto/tracing/core/commit_data_request.h"
23 #include "perfetto/tracing/core/shared_memory.h"
24 #include "perfetto/tracing/core/startup_trace_writer_registry.h"
25 #include "src/tracing/core/null_trace_writer.h"
26 #include "src/tracing/core/trace_writer_impl.h"
27
28 #include <limits>
29 #include <utility>
30
31 namespace perfetto {
32
33 using Chunk = SharedMemoryABI::Chunk;
34
35 // static
36 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
37 SharedMemoryABI::PageLayout::kPageDiv1;
38
39 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)40 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
41 SharedMemory* shared_memory,
42 size_t page_size,
43 TracingService::ProducerEndpoint* producer_endpoint,
44 base::TaskRunner* task_runner) {
45 return std::unique_ptr<SharedMemoryArbiterImpl>(
46 new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
47 page_size, producer_endpoint, task_runner));
48 }
49
SharedMemoryArbiterImpl(void * start,size_t size,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)50 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
51 void* start,
52 size_t size,
53 size_t page_size,
54 TracingService::ProducerEndpoint* producer_endpoint,
55 base::TaskRunner* task_runner)
56 : task_runner_(task_runner),
57 producer_endpoint_(producer_endpoint),
58 shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
59 active_writer_ids_(kMaxWriterID),
60 weak_ptr_factory_(this) {}
61
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,size_t size_hint)62 Chunk SharedMemoryArbiterImpl::GetNewChunk(
63 const SharedMemoryABI::ChunkHeader& header,
64 size_t size_hint) {
65 PERFETTO_DCHECK(size_hint == 0); // Not implemented yet.
66 int stall_count = 0;
67 unsigned stall_interval_us = 0;
68 static const unsigned kMaxStallIntervalUs = 100000;
69 static const int kLogAfterNStalls = 3;
70 static const int kFlushCommitsAfterEveryNStalls = 2;
71
72 for (;;) {
73 // TODO(primiano): Probably this lock is not really required and this code
74 // could be rewritten leveraging only the Try* atomic operations in
75 // SharedMemoryABI. But let's not be too adventurous for the moment.
76 {
77 std::lock_guard<std::mutex> scoped_lock(lock_);
78 const size_t initial_page_idx = page_idx_;
79 for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
80 page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
81 bool is_new_page = false;
82
83 // TODO(primiano): make the page layout dynamic.
84 auto layout = SharedMemoryArbiterImpl::default_page_layout;
85
86 if (shmem_abi_.is_page_free(page_idx_)) {
87 // TODO(primiano): Use the |size_hint| here to decide the layout.
88 is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
89 }
90 uint32_t free_chunks;
91 if (is_new_page) {
92 free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
93 } else {
94 free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
95 }
96
97 for (uint32_t chunk_idx = 0; free_chunks;
98 chunk_idx++, free_chunks >>= 1) {
99 if (!(free_chunks & 1))
100 continue;
101 // We found a free chunk.
102 Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
103 page_idx_, chunk_idx, &header);
104 if (!chunk.is_valid())
105 continue;
106 if (stall_count > kLogAfterNStalls) {
107 PERFETTO_LOG("Recovered from stall after %d iterations",
108 stall_count);
109 }
110 return chunk;
111 }
112 }
113 } // std::lock_guard<std::mutex>
114
115 // All chunks are taken (either kBeingWritten by us or kBeingRead by the
116 // Service). TODO: at this point we should return a bankrupcy chunk, not
117 // crash the process.
118 if (stall_count++ == kLogAfterNStalls) {
119 PERFETTO_ELOG("Shared memory buffer overrun! Stalling");
120 }
121
122 // If the IPC thread itself is stalled because the current process has
123 // filled up the SMB, we need to make sure that the service can process and
124 // purge the chunks written by our process, by flushing any pending commit
125 // requests. Because other threads in our process can continue to
126 // concurrently grab, fill and commit any chunks purged by the service, it
127 // is possible that the SMB remains full and the IPC thread remains stalled,
128 // needing to flush the concurrently queued up commits again. This is
129 // particularly likely with in-process perfetto service where the IPC thread
130 // is the service thread. To avoid remaining stalled forever in such a
131 // situation, we attempt to flush periodically after every N stalls.
132 if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
133 task_runner_->RunsTasksOnCurrentThread()) {
134 // TODO(primiano): sending the IPC synchronously is a temporary workaround
135 // until the backpressure logic in probes_producer is sorted out. Until
136 // then the risk is that we stall the message loop waiting for the tracing
137 // service to consume the shared memory buffer (SMB) and, for this reason,
138 // never run the task that tells the service to purge the SMB. This must
139 // happen iff we are on the IPC thread, not doing this will cause
140 // deadlocks, doing this on the wrong thread causes out-of-order data
141 // commits (crbug.com/919187#c28).
142 FlushPendingCommitDataRequests();
143 } else {
144 base::SleepMicroseconds(stall_interval_us);
145 stall_interval_us =
146 std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
147 }
148 }
149 }
150
ReturnCompletedChunk(Chunk chunk,BufferID target_buffer,PatchList * patch_list)151 void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk,
152 BufferID target_buffer,
153 PatchList* patch_list) {
154 PERFETTO_DCHECK(chunk.is_valid());
155 const WriterID writer_id = chunk.writer_id();
156 UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
157 patch_list);
158 }
159
SendPatches(WriterID writer_id,BufferID target_buffer,PatchList * patch_list)160 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
161 BufferID target_buffer,
162 PatchList* patch_list) {
163 PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
164 UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
165 }
166
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,BufferID target_buffer,PatchList * patch_list)167 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(Chunk chunk,
168 WriterID writer_id,
169 BufferID target_buffer,
170 PatchList* patch_list) {
171 // Note: chunk will be invalid if the call came from SendPatches().
172 bool should_post_callback = false;
173 bool should_commit_synchronously = false;
174 base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
175 {
176 std::lock_guard<std::mutex> scoped_lock(lock_);
177
178 if (!commit_data_req_) {
179 commit_data_req_.reset(new CommitDataRequest());
180 weak_this = weak_ptr_factory_.GetWeakPtr();
181 should_post_callback = true;
182 }
183
184 // If a valid chunk is specified, return it and attach it to the request.
185 if (chunk.is_valid()) {
186 PERFETTO_DCHECK(chunk.writer_id() == writer_id);
187 uint8_t chunk_idx = chunk.chunk_idx();
188 bytes_pending_commit_ += chunk.size();
189 size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
190
191 // DO NOT access |chunk| after this point, has been std::move()-d above.
192
193 CommitDataRequest::ChunksToMove* ctm =
194 commit_data_req_->add_chunks_to_move();
195 ctm->set_page(static_cast<uint32_t>(page_idx));
196 ctm->set_chunk(chunk_idx);
197 ctm->set_target_buffer(target_buffer);
198
199 // If more than half of the SMB.size() is filled with completed chunks for
200 // which we haven't notified the service yet (i.e. they are still enqueued
201 // in |commit_data_req_|), force a synchronous CommitDataRequest(), to
202 // reduce the likeliness of stalling the writer.
203 //
204 // We can only do this if we're writing on the same thread that we access
205 // the producer endpoint on, since we cannot notify the producer endpoint
206 // to commit synchronously on a different thread. Attempting to flush
207 // synchronously on another thread will lead to subtle bugs caused by
208 // out-of-order commit requests (crbug.com/919187#c28).
209 if (task_runner_->RunsTasksOnCurrentThread() &&
210 bytes_pending_commit_ >= shmem_abi_.size() / 2) {
211 should_commit_synchronously = true;
212 should_post_callback = false;
213 }
214 }
215
216 // Get the completed patches for previous chunks from the |patch_list|
217 // and attach them.
218 ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy.
219 CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
220 while (!patch_list->empty() && patch_list->front().is_patched()) {
221 if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
222 last_chunk_req = commit_data_req_->add_chunks_to_patch();
223 last_chunk_req->set_writer_id(writer_id);
224 last_chunk_id = patch_list->front().chunk_id;
225 last_chunk_req->set_chunk_id(last_chunk_id);
226 last_chunk_req->set_target_buffer(target_buffer);
227 }
228 auto* patch_req = last_chunk_req->add_patches();
229 patch_req->set_offset(patch_list->front().offset);
230 patch_req->set_data(&patch_list->front().size_field[0],
231 patch_list->front().size_field.size());
232 patch_list->pop_front();
233 }
234 // Patches are enqueued in the |patch_list| in order and are notified to
235 // the service when the chunk is returned. The only case when the current
236 // patch list is incomplete is if there is an unpatched entry at the head of
237 // the |patch_list| that belongs to the same ChunkID as the last one we are
238 // about to send to the service.
239 if (last_chunk_req && !patch_list->empty() &&
240 patch_list->front().chunk_id == last_chunk_id) {
241 last_chunk_req->set_has_more_patches(true);
242 }
243 } // scoped_lock(lock_)
244
245 if (should_post_callback) {
246 task_runner_->PostTask([weak_this] {
247 if (weak_this)
248 weak_this->FlushPendingCommitDataRequests();
249 });
250 }
251
252 if (should_commit_synchronously)
253 FlushPendingCommitDataRequests();
254 }
255
256 // This function is quite subtle. When making changes keep in mind these two
257 // challenges:
258 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
259 // thread (or, for in-process cases, on the same thread where
260 // TracingServiceImpl lives), the CommitData() call must be synchronous and
261 // not posted, to avoid deadlocks.
262 // 2) When different threads hit this function, we must guarantee that we don't
263 // accidentally make commits out of order. See commit 4e4fe8f56ef and
264 // crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)265 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
266 std::function<void()> callback) {
267 // May be called by TraceWriterImpl on any thread.
268 if (!task_runner_->RunsTasksOnCurrentThread()) {
269 auto weak_this = weak_ptr_factory_.GetWeakPtr();
270 task_runner_->PostTask([weak_this, callback] {
271 if (weak_this)
272 weak_this->FlushPendingCommitDataRequests(std::move(callback));
273 });
274 return;
275 }
276
277 std::shared_ptr<CommitDataRequest> req;
278 {
279 std::lock_guard<std::mutex> scoped_lock(lock_);
280 req = std::move(commit_data_req_);
281 bytes_pending_commit_ = 0;
282 }
283
284 // |req| could be a nullptr if |commit_data_req_| became a nullptr. For
285 // example when a forced sync flush happens in GetNewChunk().
286 if (req) {
287 producer_endpoint_->CommitData(*req, callback);
288 } else if (callback) {
289 // If |req| was nullptr, it means that an enqueued deferred commit was
290 // executed just before this. At this point send an empty commit request
291 // to the service, just to linearize with it and give the guarantee to the
292 // caller that the data has been flushed into the service.
293 producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
294 }
295 }
296
CreateTraceWriter(BufferID target_buffer)297 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
298 BufferID target_buffer) {
299 WriterID id;
300 {
301 std::lock_guard<std::mutex> scoped_lock(lock_);
302 id = active_writer_ids_.Allocate();
303 }
304 if (!id)
305 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
306 auto weak_this = weak_ptr_factory_.GetWeakPtr();
307 task_runner_->PostTask([weak_this, id, target_buffer] {
308 if (weak_this)
309 weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
310 });
311 return std::unique_ptr<TraceWriter>(
312 new TraceWriterImpl(this, id, target_buffer));
313 }
314
BindStartupTraceWriterRegistry(std::unique_ptr<StartupTraceWriterRegistry> registry,BufferID target_buffer)315 void SharedMemoryArbiterImpl::BindStartupTraceWriterRegistry(
316 std::unique_ptr<StartupTraceWriterRegistry> registry,
317 BufferID target_buffer) {
318 if (!task_runner_->RunsTasksOnCurrentThread()) {
319 auto weak_this = weak_ptr_factory_.GetWeakPtr();
320 auto* raw_reg = registry.release();
321 task_runner_->PostTask([weak_this, raw_reg, target_buffer]() {
322 std::unique_ptr<StartupTraceWriterRegistry> owned_reg(raw_reg);
323 if (!weak_this)
324 return;
325 weak_this->BindStartupTraceWriterRegistry(std::move(owned_reg),
326 target_buffer);
327 });
328 return;
329 }
330
331 // The registry will be owned by the arbiter, so it's safe to capture |this|
332 // in the callback.
333 auto on_bound_callback = [this](StartupTraceWriterRegistry* bound_registry) {
334 std::unique_ptr<StartupTraceWriterRegistry> registry_to_delete;
335 {
336 std::lock_guard<std::mutex> scoped_lock(lock_);
337
338 for (auto it = startup_trace_writer_registries_.begin();
339 it != startup_trace_writer_registries_.end(); it++) {
340 if (it->get() == bound_registry) {
341 // We can't delete the registry while the arbiter's lock is held
342 // (to avoid lock inversion).
343 registry_to_delete = std::move(*it);
344 startup_trace_writer_registries_.erase(it);
345 break;
346 }
347 }
348 }
349
350 // The registry should have been in |startup_trace_writer_registries_|.
351 PERFETTO_DCHECK(registry_to_delete);
352 registry_to_delete.reset();
353 };
354 registry->BindToArbiter(this, target_buffer, task_runner_, on_bound_callback);
355 std::lock_guard<std::mutex> scoped_lock(lock_);
356 startup_trace_writer_registries_.push_back(std::move(registry));
357 }
358
NotifyFlushComplete(FlushRequestID req_id)359 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
360 bool should_post_commit_task = false;
361 {
362 std::lock_guard<std::mutex> scoped_lock(lock_);
363 // If a commit_data_req_ exists it means that somebody else already posted a
364 // FlushPendingCommitDataRequests() task.
365 if (!commit_data_req_) {
366 commit_data_req_.reset(new CommitDataRequest());
367 should_post_commit_task = true;
368 } else {
369 // If there is another request queued and that also contains is a reply
370 // to a flush request, reply with the highest id.
371 req_id = std::max(req_id, commit_data_req_->flush_request_id());
372 }
373 commit_data_req_->set_flush_request_id(req_id);
374 }
375 if (should_post_commit_task) {
376 auto weak_this = weak_ptr_factory_.GetWeakPtr();
377 task_runner_->PostTask([weak_this] {
378 if (weak_this)
379 weak_this->FlushPendingCommitDataRequests();
380 });
381 }
382 }
383
ReleaseWriterID(WriterID id)384 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
385 auto weak_this = weak_ptr_factory_.GetWeakPtr();
386 task_runner_->PostTask([weak_this, id] {
387 if (weak_this)
388 weak_this->producer_endpoint_->UnregisterTraceWriter(id);
389 });
390
391 std::lock_guard<std::mutex> scoped_lock(lock_);
392 active_writer_ids_.Free(id);
393 }
394
395 } // namespace perfetto
396