• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include "perfetto/ext/base/weak_ptr.h"
29 #include "perfetto/ext/tracing/core/basic_types.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/tracing/core/forward_decls.h"
33 #include "src/tracing/core/id_allocator.h"
34 
35 namespace perfetto {
36 
37 class PatchList;
38 class Patch;
39 class TraceWriter;
40 class TraceWriterImpl;
41 
42 namespace base {
43 class TaskRunner;
44 }  // namespace base
45 
46 // This class handles the shared memory buffer on the producer side. It is used
47 // to obtain thread-local chunks and to partition pages from several threads.
48 // There is one arbiter instance per Producer.
49 // This class is thread-safe and uses locks to do so. Data sources are supposed
50 // to interact with this sporadically, only when they run out of space on their
51 // current thread-local chunk.
52 //
53 // The arbiter can become "unbound" as a consequence of:
54 //  (a) being created without an endpoint
55 //  (b) CreateStartupTraceWriter calls after creation (whether created with or
56 //      without endpoint).
57 //
58 // Entering the unbound state is only supported if all trace writers are created
59 // in kDrop mode. In the unbound state, the arbiter buffers commit messages
60 // until all trace writers are bound to a target buffer.
61 //
62 // The following state transitions are possible:
63 //
64 //   CreateInstance()
65 //    |
66 //    |  CreateUnboundInstance()
67 //    |    |
68 //    |    |
69 //    |    V
70 //    |  [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ]
71 //    |      |     |
72 //    |      |     | CreateStartupTraceWriter(buf)
73 //    |      |     |  buffer reservations += buf
74 //    |      |     |
75 //    |      |     |             ----
76 //    |      |     |            |    | CreateStartupTraceWriter(buf)
77 //    |      |     |            |    |  buffer reservations += buf
78 //    |      |     V            |    V
79 //    |      |   [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ]
80 //    |      |                                                |
81 //    |      |                       BindToProducerEndpoint() |
82 //    |      |                                                |
83 //    |      | BindToProducerEndpoint()                       |
84 //    |      |                                                V
85 //    |      |   [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ]
86 //    |      |   A    |    A                               |     A
87 //    |      |   |    |    |                               |     |
88 //    |      |   |     ----                                |     |
89 //    |      |   |    CreateStartupTraceWriter(buf)        |     |
90 //    |      |   |     buffer reservations += buf          |     |
91 //    |      |   |                                         |     |
92 //    |      |   | CreateStartupTraceWriter(buf)           |     |
93 //    |      |   |  where buf is not yet bound             |     |
94 //    |      |   |  buffer reservations += buf             |     | (yes)
95 //    |      |   |                                         |     |
96 //    |      |   |        BindStartupTargetBuffer(buf, id) |-----
97 //    |      |   |           buffer reservations -= buf    | reservations > 0?
98 //    |      |   |                                         |
99 //    |      |   |                                         | (no)
100 //    |      V   |                                         V
101 //     --> [ fully_bound_, endpoint_, 0 unbound buffer reservations ]
102 //              |    A
103 //              |    | CreateStartupTraceWriter(buf)
104 //              |    |  where buf is already bound
105 //               ----
106 class SharedMemoryArbiterImpl : public SharedMemoryArbiter {
107  public:
108   // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the
109   // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may
110   // be |nullptr| if created unbound, see
111   // SharedMemoryArbiter::CreateUnboundInstance().
112   SharedMemoryArbiterImpl(void* start,
113                           size_t size,
114                           size_t page_size,
115                           TracingService::ProducerEndpoint*,
116                           base::TaskRunner*);
117 
118   // Returns a new Chunk to write tracing data. Depending on the provided
119   // BufferExhaustedPolicy, this may return an invalid chunk if no valid free
120   // chunk could be found in the SMB.
121   SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
122                                      BufferExhaustedPolicy,
123                                      size_t size_hint = 0);
124 
125   // Puts back a Chunk that has been completed and sends a request to the
126   // service to move it to the central tracing buffer. |target_buffer| is the
127   // absolute trace buffer ID where the service should move the chunk onto (the
128   // producer is just to copy back the same number received in the
129   // DataSourceConfig upon the StartDataSource() reques).
130   // PatchList is a pointer to the list of patches for previous chunks. The
131   // first patched entries will be removed from the patched list and sent over
132   // to the service in the same CommitData() IPC request.
133   void ReturnCompletedChunk(SharedMemoryABI::Chunk,
134                             MaybeUnboundBufferID target_buffer,
135                             PatchList*);
136 
137   // Send a request to the service to apply completed patches from |patch_list|.
138   // |writer_id| is the ID of the TraceWriter that calls this method,
139   // |target_buffer| is the global trace buffer ID of its target buffer.
140   void SendPatches(WriterID writer_id,
141                    MaybeUnboundBufferID target_buffer,
142                    PatchList* patch_list);
143 
shmem_abi_for_testing()144   SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
145 
set_default_layout_for_testing(SharedMemoryABI::PageLayout l)146   static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
147     default_page_layout = l;
148   }
149 
default_page_layout_for_testing()150   static SharedMemoryABI::PageLayout default_page_layout_for_testing() {
151     return default_page_layout;
152   }
153 
154   // SharedMemoryArbiter implementation.
155   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
156   std::unique_ptr<TraceWriter> CreateTraceWriter(
157       BufferID target_buffer,
158       BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override;
159   std::unique_ptr<TraceWriter> CreateStartupTraceWriter(
160       uint16_t target_buffer_reservation_id) override;
161   void BindToProducerEndpoint(TracingService::ProducerEndpoint*,
162                               base::TaskRunner*) override;
163   void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,
164                                BufferID target_buffer_id) override;
165   void AbortStartupTracingForReservation(
166       uint16_t target_buffer_reservation_id) override;
167   void NotifyFlushComplete(FlushRequestID) override;
168 
169   void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override;
170 
171   bool EnableDirectSMBPatching() override;
172 
173   void SetDirectSMBPatchingSupportedByService() override;
174 
175   void FlushPendingCommitDataRequests(
176       std::function<void()> callback = {}) override;
177   bool TryShutdown() override;
178 
task_runner()179   base::TaskRunner* task_runner() const { return task_runner_; }
page_size()180   size_t page_size() const { return shmem_abi_.page_size(); }
num_pages()181   size_t num_pages() const { return shmem_abi_.num_pages(); }
182 
GetWeakPtr()183   base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const {
184     return weak_ptr_factory_.GetWeakPtr();
185   }
186 
187  private:
188   friend class TraceWriterImpl;
189   friend class StartupTraceWriterTest;
190   friend class SharedMemoryArbiterImplTest;
191 
192   struct TargetBufferReservation {
193     bool resolved = false;
194     BufferID target_buffer = kInvalidBufferId;
195   };
196 
197   // Placeholder for the actual target buffer ID of a startup target buffer
198   // reservation ID in |target_buffer_reservations_|.
199   static constexpr BufferID kInvalidBufferId = 0;
200 
201   static SharedMemoryABI::PageLayout default_page_layout;
202 
203   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
204   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
205 
206   void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
207                                WriterID writer_id,
208                                MaybeUnboundBufferID target_buffer,
209                                PatchList* patch_list);
210 
211   // Search the chunks that are being batched in |commit_data_req_| for a chunk
212   // that needs patching and that matches the provided |writer_id| and
213   // |patch.chunk_id|. If found, apply |patch| to that chunk, and if
214   // |chunk_needs_more_patching| is true, clear the needs patching flag of the
215   // chunk and mark it as complete - to allow the service to read it (and other
216   // chunks after it) during scraping. Returns true if the patch was applied,
217   // false otherwise.
218   //
219   // Note: the caller must be holding |lock_| for the duration of the call.
220   bool TryDirectPatchLocked(WriterID writer_id,
221                             const Patch& patch,
222                             bool chunk_needs_more_patching);
223   std::unique_ptr<TraceWriter> CreateTraceWriterInternal(
224       MaybeUnboundBufferID target_buffer,
225       BufferExhaustedPolicy);
226 
227   // Called by the TraceWriter destructor.
228   void ReleaseWriterID(WriterID);
229 
230   void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,
231                                    uint16_t target_buffer_reservation_id,
232                                    BufferID target_buffer_id);
233 
234   // If any flush callbacks were queued up while the arbiter or any target
235   // buffer reservation was unbound, this wraps the pending callbacks into a new
236   // std::function and returns it. Otherwise returns an invalid std::function.
237   std::function<void()> TakePendingFlushCallbacksLocked();
238 
239   // Replace occurrences of target buffer reservation IDs in |commit_data_req_|
240   // with their respective actual BufferIDs if they were already bound. Returns
241   // true iff all occurrences were replaced.
242   bool ReplaceCommitPlaceholderBufferIdsLocked();
243 
244   // Update and return |fully_bound_| based on the arbiter's |pending_writers_|
245   // state.
246   bool UpdateFullyBoundLocked();
247 
248   // Only accessed on |task_runner_| after the producer endpoint was bound.
249   TracingService::ProducerEndpoint* producer_endpoint_ = nullptr;
250 
251   // --- Begin lock-protected members ---
252 
253   std::mutex lock_;
254 
255   base::TaskRunner* task_runner_ = nullptr;
256   SharedMemoryABI shmem_abi_;
257   size_t page_idx_ = 0;
258   std::unique_ptr<CommitDataRequest> commit_data_req_;
259   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
260   IdAllocator<WriterID> active_writer_ids_;
261   bool did_shutdown_ = false;
262 
263   // Whether the arbiter itself and all startup target buffer reservations are
264   // bound. Note that this can become false again later if a new target buffer
265   // reservation is created by calling CreateStartupTraceWriter() with a new
266   // reservation id.
267   bool fully_bound_;
268 
269   // Whether the arbiter was always bound. If false, the arbiter was unbound at
270   // one point in time.
271   bool was_always_bound_;
272 
273   // Whether all created trace writers were created with kDrop policy.
274   bool all_writers_have_drop_policy_ = true;
275 
276   // IDs of writers and their assigned target buffers that should be registered
277   // with the service after the arbiter and/or their startup target buffer is
278   // bound.
279   std::map<WriterID, MaybeUnboundBufferID> pending_writers_;
280 
281   // Callbacks for flush requests issued while the arbiter or a target buffer
282   // reservation was unbound.
283   std::vector<std::function<void()>> pending_flush_callbacks_;
284 
285   // See SharedMemoryArbiter::SetBatchCommitsDuration.
286   uint32_t batch_commits_duration_ms_ = 0;
287 
288   // See SharedMemoryArbiter::EnableDirectSMBPatching.
289   bool direct_patching_enabled_ = false;
290 
291   // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService.
292   bool direct_patching_supported_by_service_ = false;
293 
294   // Indicates whether we have already scheduled a delayed flush for the
295   // purposes of batching. Set to true at the beginning of a batching period and
296   // cleared at the end of the period. Immediate flushes that happen during a
297   // batching period will empty the |commit_data_req| (triggering an immediate
298   // IPC to the service), but will not clear this flag and the
299   // previously-scheduled delayed flush will still occur at the end of the
300   // batching period.
301   bool delayed_flush_scheduled_ = false;
302 
303   // Stores target buffer reservations for writers created via
304   // CreateStartupTraceWriter(). A bound reservation sets
305   // TargetBufferReservation::resolved to true and is associated with the actual
306   // BufferID supplied in BindStartupTargetBuffer().
307   //
308   // TODO(eseckler): Clean up entries from this map. This would probably require
309   // a method in SharedMemoryArbiter that allows a producer to invalidate a
310   // reservation ID.
311   std::map<MaybeUnboundBufferID, TargetBufferReservation>
312       target_buffer_reservations_;
313 
314   // --- End lock-protected members ---
315 
316   // Keep at the end.
317   base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_;
318 };
319 
320 }  // namespace perfetto
321 
322 #endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
323