• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include "perfetto/ext/base/weak_ptr.h"
29 #include "perfetto/ext/tracing/core/basic_types.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/tracing/core/forward_decls.h"
33 #include "src/tracing/core/id_allocator.h"
34 
35 namespace perfetto {
36 
37 class PatchList;
38 class Patch;
39 class TraceWriter;
40 class TraceWriterImpl;
41 
42 namespace base {
43 class TaskRunner;
44 }  // namespace base
45 
46 // This class handles the shared memory buffer on the producer side. It is used
47 // to obtain thread-local chunks and to partition pages from several threads.
48 // There is one arbiter instance per Producer.
49 // This class is thread-safe and uses locks to do so. Data sources are supposed
50 // to interact with this sporadically, only when they run out of space on their
51 // current thread-local chunk.
52 //
53 // The arbiter can become "unbound" as a consequence of:
54 //  (a) being created without an endpoint
55 //  (b) CreateStartupTraceWriter calls after creation (whether created with or
56 //      without endpoint).
57 //
58 // Entering the unbound state is only supported if all trace writers are created
59 // in kDrop mode. In the unbound state, the arbiter buffers commit messages
60 // until all trace writers are bound to a target buffer.
61 //
62 // The following state transitions are possible:
63 //
64 //   CreateInstance()
65 //    |
66 //    |  CreateUnboundInstance()
67 //    |    |
68 //    |    |
69 //    |    V
70 //    |  [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ]
71 //    |      |     |
72 //    |      |     | CreateStartupTraceWriter(buf)
73 //    |      |     |  buffer reservations += buf
74 //    |      |     |
75 //    |      |     |             ----
76 //    |      |     |            |    | CreateStartupTraceWriter(buf)
77 //    |      |     |            |    |  buffer reservations += buf
78 //    |      |     V            |    V
79 //    |      |   [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ]
80 //    |      |                                                |
81 //    |      |                       BindToProducerEndpoint() |
82 //    |      |                                                |
83 //    |      | BindToProducerEndpoint()                       |
84 //    |      |                                                V
85 //    |      |   [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ]
86 //    |      |   A    |    A                               |     A
87 //    |      |   |    |    |                               |     |
88 //    |      |   |     ----                                |     |
89 //    |      |   |    CreateStartupTraceWriter(buf)        |     |
90 //    |      |   |     buffer reservations += buf          |     |
91 //    |      |   |                                         |     |
92 //    |      |   | CreateStartupTraceWriter(buf)           |     |
93 //    |      |   |  where buf is not yet bound             |     |
94 //    |      |   |  buffer reservations += buf             |     | (yes)
95 //    |      |   |                                         |     |
96 //    |      |   |        BindStartupTargetBuffer(buf, id) |-----
97 //    |      |   |           buffer reservations -= buf    | reservations > 0?
98 //    |      |   |                                         |
99 //    |      |   |                                         | (no)
100 //    |      V   |                                         V
101 //     --> [ fully_bound_, endpoint_, 0 unbound buffer reservations ]
102 //              |    A
103 //              |    | CreateStartupTraceWriter(buf)
104 //              |    |  where buf is already bound
105 //               ----
106 class SharedMemoryArbiterImpl : public SharedMemoryArbiter {
107  public:
108   // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the
109   // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may
110   // be |nullptr| if created unbound, see
111   // SharedMemoryArbiter::CreateUnboundInstance().
112 
113   // SharedMemoryArbiterImpl(void* start,
114   //                         size_t size,
115   //                         size_t page_size,
116   //                         TracingService::ProducerEndpoint*
117   //                         producer_endpoint, base::TaskRunner* task_runner) :
118   //   SharedMemoryArbiterImpl(start, size, page_size, false, producer_endpoint,
119   //   task_runner) {
120   // }
121 
122   SharedMemoryArbiterImpl(void* start,
123                           size_t size,
124                           ShmemMode mode,
125                           size_t page_size,
126                           TracingService::ProducerEndpoint*,
127                           base::TaskRunner*);
128 
129   // Returns a new Chunk to write tracing data. Depending on the provided
130   // BufferExhaustedPolicy, this may return an invalid chunk if no valid free
131   // chunk could be found in the SMB.
132   SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
133                                      BufferExhaustedPolicy);
134 
135   // Puts back a Chunk that has been completed and sends a request to the
136   // service to move it to the central tracing buffer. |target_buffer| is the
137   // absolute trace buffer ID where the service should move the chunk onto (the
138   // producer is just to copy back the same number received in the
139   // DataSourceConfig upon the StartDataSource() reques).
140   // PatchList is a pointer to the list of patches for previous chunks. The
141   // first patched entries will be removed from the patched list and sent over
142   // to the service in the same CommitData() IPC request.
143   void ReturnCompletedChunk(SharedMemoryABI::Chunk,
144                             MaybeUnboundBufferID target_buffer,
145                             PatchList*);
146 
147   // Send a request to the service to apply completed patches from |patch_list|.
148   // |writer_id| is the ID of the TraceWriter that calls this method,
149   // |target_buffer| is the global trace buffer ID of its target buffer.
150   void SendPatches(WriterID writer_id,
151                    MaybeUnboundBufferID target_buffer,
152                    PatchList* patch_list);
153 
shmem_abi_for_testing()154   SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
155 
set_default_layout_for_testing(SharedMemoryABI::PageLayout l)156   static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
157     default_page_layout = l;
158   }
159 
default_page_layout_for_testing()160   static SharedMemoryABI::PageLayout default_page_layout_for_testing() {
161     return default_page_layout;
162   }
163 
164   // SharedMemoryArbiter implementation.
165   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
166   std::unique_ptr<TraceWriter> CreateTraceWriter(
167       BufferID target_buffer,
168       BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override;
169   std::unique_ptr<TraceWriter> CreateStartupTraceWriter(
170       uint16_t target_buffer_reservation_id) override;
171   void BindToProducerEndpoint(TracingService::ProducerEndpoint*,
172                               base::TaskRunner*) override;
173   void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,
174                                BufferID target_buffer_id) override;
175   void AbortStartupTracingForReservation(
176       uint16_t target_buffer_reservation_id) override;
177   void NotifyFlushComplete(FlushRequestID) override;
178 
179   void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override;
180 
181   bool EnableDirectSMBPatching() override;
182 
183   void SetDirectSMBPatchingSupportedByService() override;
184 
185   void FlushPendingCommitDataRequests(
186       std::function<void()> callback = {}) override;
187   bool TryShutdown() override;
188 
task_runner()189   base::TaskRunner* task_runner() const { return task_runner_; }
page_size()190   size_t page_size() const { return shmem_abi_.page_size(); }
num_pages()191   size_t num_pages() const { return shmem_abi_.num_pages(); }
192 
GetWeakPtr()193   base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const {
194     return weak_ptr_factory_.GetWeakPtr();
195   }
196 
197  private:
198   friend class TraceWriterImpl;
199   friend class StartupTraceWriterTest;
200   friend class SharedMemoryArbiterImplTest;
201 
202   struct TargetBufferReservation {
203     bool resolved = false;
204     BufferID target_buffer = kInvalidBufferId;
205   };
206 
207   // Placeholder for the actual target buffer ID of a startup target buffer
208   // reservation ID in |target_buffer_reservations_|.
209   static constexpr BufferID kInvalidBufferId = 0;
210 
211   static SharedMemoryABI::PageLayout default_page_layout;
212 
213   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
214   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
215 
216   void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
217                                WriterID writer_id,
218                                MaybeUnboundBufferID target_buffer,
219                                PatchList* patch_list);
220 
221   // Search the chunks that are being batched in |commit_data_req_| for a chunk
222   // that needs patching and that matches the provided |writer_id| and
223   // |patch.chunk_id|. If found, apply |patch| to that chunk, and if
224   // |chunk_needs_more_patching| is true, clear the needs patching flag of the
225   // chunk and mark it as complete - to allow the service to read it (and other
226   // chunks after it) during scraping. Returns true if the patch was applied,
227   // false otherwise.
228   //
229   // Note: the caller must be holding |lock_| for the duration of the call.
230   bool TryDirectPatchLocked(WriterID writer_id,
231                             const Patch& patch,
232                             bool chunk_needs_more_patching);
233   std::unique_ptr<TraceWriter> CreateTraceWriterInternal(
234       MaybeUnboundBufferID target_buffer,
235       BufferExhaustedPolicy);
236 
237   // Called by the TraceWriter destructor.
238   void ReleaseWriterID(WriterID);
239 
240   void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,
241                                    uint16_t target_buffer_reservation_id,
242                                    BufferID target_buffer_id);
243 
244   // If any flush callbacks were queued up while the arbiter or any target
245   // buffer reservation was unbound, this wraps the pending callbacks into a new
246   // std::function and returns it. Otherwise returns an invalid std::function.
247   std::function<void()> TakePendingFlushCallbacksLocked();
248 
249   // Replace occurrences of target buffer reservation IDs in |commit_data_req_|
250   // with their respective actual BufferIDs if they were already bound. Returns
251   // true iff all occurrences were replaced.
252   bool ReplaceCommitPlaceholderBufferIdsLocked();
253 
254   // Update and return |fully_bound_| based on the arbiter's |pending_writers_|
255   // state.
256   bool UpdateFullyBoundLocked();
257 
258   // Only accessed on |task_runner_| after the producer endpoint was bound.
259   TracingService::ProducerEndpoint* producer_endpoint_ = nullptr;
260 
261   // Set to true when this instance runs in a emulation mode for a producer
262   // endpoint that doesn't support shared memory (e.g. vsock).
263   const bool use_shmem_emulation_ = false;
264 
265   // --- Begin lock-protected members ---
266 
267   std::mutex lock_;
268 
269   base::TaskRunner* task_runner_ = nullptr;
270   SharedMemoryABI shmem_abi_;
271   size_t page_idx_ = 0;
272   std::unique_ptr<CommitDataRequest> commit_data_req_;
273   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
274   IdAllocator<WriterID> active_writer_ids_;
275   bool did_shutdown_ = false;
276 
277   // Whether the arbiter itself and all startup target buffer reservations are
278   // bound. Note that this can become false again later if a new target buffer
279   // reservation is created by calling CreateStartupTraceWriter() with a new
280   // reservation id.
281   bool fully_bound_;
282 
283   // Whether the arbiter was always bound. If false, the arbiter was unbound at
284   // one point in time.
285   bool was_always_bound_;
286 
287   // Whether all created trace writers were created with kDrop policy.
288   bool all_writers_have_drop_policy_ = true;
289 
290   // IDs of writers and their assigned target buffers that should be registered
291   // with the service after the arbiter and/or their startup target buffer is
292   // bound.
293   std::map<WriterID, MaybeUnboundBufferID> pending_writers_;
294 
295   // Callbacks for flush requests issued while the arbiter or a target buffer
296   // reservation was unbound.
297   std::vector<std::function<void()>> pending_flush_callbacks_;
298 
299   // See SharedMemoryArbiter::SetBatchCommitsDuration.
300   uint32_t batch_commits_duration_ms_ = 0;
301 
302   // See SharedMemoryArbiter::EnableDirectSMBPatching.
303   bool direct_patching_enabled_ = false;
304 
305   // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService.
306   bool direct_patching_supported_by_service_ = false;
307 
308   // Indicates whether we have already scheduled a delayed flush for the
309   // purposes of batching. Set to true at the beginning of a batching period and
310   // cleared at the end of the period. Immediate flushes that happen during a
311   // batching period will empty the |commit_data_req| (triggering an immediate
312   // IPC to the service), but will not clear this flag and the
313   // previously-scheduled delayed flush will still occur at the end of the
314   // batching period.
315   bool delayed_flush_scheduled_ = false;
316 
317   // Stores target buffer reservations for writers created via
318   // CreateStartupTraceWriter(). A bound reservation sets
319   // TargetBufferReservation::resolved to true and is associated with the actual
320   // BufferID supplied in BindStartupTargetBuffer().
321   //
322   // TODO(eseckler): Clean up entries from this map. This would probably require
323   // a method in SharedMemoryArbiter that allows a producer to invalidate a
324   // reservation ID.
325   std::map<MaybeUnboundBufferID, TargetBufferReservation>
326       target_buffer_reservations_;
327 
328   // --- End lock-protected members ---
329 
330   // Keep at the end.
331   base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_;
332 };
333 
334 }  // namespace perfetto
335 
336 #endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
337