• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include "perfetto/ext/base/weak_ptr.h"
29 #include "perfetto/ext/tracing/core/basic_types.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/tracing/core/forward_decls.h"
33 #include "src/tracing/core/id_allocator.h"
34 
35 namespace perfetto {
36 
37 class PatchList;
38 class TraceWriter;
39 class TraceWriterImpl;
40 
41 namespace base {
42 class TaskRunner;
43 }  // namespace base
44 
45 // This class handles the shared memory buffer on the producer side. It is used
46 // to obtain thread-local chunks and to partition pages from several threads.
47 // There is one arbiter instance per Producer.
48 // This class is thread-safe and uses locks to do so. Data sources are supposed
49 // to interact with this sporadically, only when they run out of space on their
50 // current thread-local chunk.
51 //
52 // When the arbiter is created using CreateUnboundInstance(), the following
53 // state transitions are possible:
54 //
55 //   [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ]
56 //       |     |
57 //       |     | CreateStartupTraceWriter(buf)
58 //       |     |  buffer reservations += buf
59 //       |     |
60 //       |     |             ----
61 //       |     |            |    | CreateStartupTraceWriter(buf)
62 //       |     |            |    |  buffer reservations += buf
63 //       |     V            |    V
64 //       |   [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ]
65 //       |                                                |
66 //       |                       BindToProducerEndpoint() |
67 //       |                                                |
68 //       | BindToProducerEndpoint()                       |
69 //       |                                                V
70 //       |   [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ]
71 //       |   A    |    A                               |     A
72 //       |   |    |    |                               |     |
73 //       |   |     ----                                |     |
74 //       |   |    CreateStartupTraceWriter(buf)        |     |
75 //       |   |     buffer reservations += buf          |     |
76 //       |   |                                         |     |
77 //       |   | CreateStartupTraceWriter(buf)           |     |
78 //       |   |  where buf is not yet bound             |     |
79 //       |   |  buffer reservations += buf             |     | (yes)
80 //       |   |                                         |     |
81 //       |   |        BindStartupTargetBuffer(buf, id) |-----
82 //       |   |           buffer reservations -= buf    | reservations > 0?
83 //       |   |                                         |
84 //       |   |                                         | (no)
85 //       V   |                                         V
86 //       [ fully_bound_, endpoint_, 0 unbound buffer reservations ]
87 //          |    A
88 //          |    | CreateStartupTraceWriter(buf)
89 //          |    |  where buf is already bound
90 //           ----
91 class SharedMemoryArbiterImpl : public SharedMemoryArbiter {
92  public:
93   // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the
94   // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may
95   // be |nullptr| if created unbound, see
96   // SharedMemoryArbiter::CreateUnboundInstance().
97   SharedMemoryArbiterImpl(void* start,
98                           size_t size,
99                           size_t page_size,
100                           TracingService::ProducerEndpoint*,
101                           base::TaskRunner*);
102 
103   // Returns a new Chunk to write tracing data. Depending on the provided
104   // BufferExhaustedPolicy, this may return an invalid chunk if no valid free
105   // chunk could be found in the SMB.
106   SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
107                                      BufferExhaustedPolicy,
108                                      size_t size_hint = 0);
109 
110   // Puts back a Chunk that has been completed and sends a request to the
111   // service to move it to the central tracing buffer. |target_buffer| is the
112   // absolute trace buffer ID where the service should move the chunk onto (the
113   // producer is just to copy back the same number received in the
114   // DataSourceConfig upon the StartDataSource() reques).
115   // PatchList is a pointer to the list of patches for previous chunks. The
116   // first patched entries will be removed from the patched list and sent over
117   // to the service in the same CommitData() IPC request.
118   void ReturnCompletedChunk(SharedMemoryABI::Chunk,
119                             MaybeUnboundBufferID target_buffer,
120                             PatchList*);
121 
122   // Send a request to the service to apply completed patches from |patch_list|.
123   // |writer_id| is the ID of the TraceWriter that calls this method,
124   // |target_buffer| is the global trace buffer ID of its target buffer.
125   void SendPatches(WriterID writer_id,
126                    MaybeUnboundBufferID target_buffer,
127                    PatchList* patch_list);
128 
129   // Forces a synchronous commit of the completed packets without waiting for
130   // the next task.
131   void FlushPendingCommitDataRequests(std::function<void()> callback = {});
132 
shmem_abi_for_testing()133   SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
134 
set_default_layout_for_testing(SharedMemoryABI::PageLayout l)135   static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
136     default_page_layout = l;
137   }
138 
139   // SharedMemoryArbiter implementation.
140   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
141   std::unique_ptr<TraceWriter> CreateTraceWriter(
142       BufferID target_buffer,
143       BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override;
144   std::unique_ptr<TraceWriter> CreateStartupTraceWriter(
145       uint16_t target_buffer_reservation_id) override;
146   void BindToProducerEndpoint(TracingService::ProducerEndpoint*,
147                               base::TaskRunner*) override;
148   void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,
149                                BufferID target_buffer_id) override;
150   void AbortStartupTracingForReservation(
151       uint16_t target_buffer_reservation_id) override;
152   void NotifyFlushComplete(FlushRequestID) override;
153 
task_runner()154   base::TaskRunner* task_runner() const { return task_runner_; }
page_size()155   size_t page_size() const { return shmem_abi_.page_size(); }
num_pages()156   size_t num_pages() const { return shmem_abi_.num_pages(); }
157 
GetWeakPtr()158   base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const {
159     return weak_ptr_factory_.GetWeakPtr();
160   }
161 
162  private:
163   friend class TraceWriterImpl;
164   friend class StartupTraceWriterTest;
165   friend class SharedMemoryArbiterImplTest;
166 
167   struct TargetBufferReservation {
168     bool resolved = false;
169     BufferID target_buffer = kInvalidBufferId;
170   };
171 
172   // Placeholder for the actual target buffer ID of a startup target buffer
173   // reservation ID in |target_buffer_reservations_|.
174   static constexpr BufferID kInvalidBufferId = 0;
175 
176   static SharedMemoryABI::PageLayout default_page_layout;
177 
178   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
179   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
180 
181   void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
182                                WriterID writer_id,
183                                MaybeUnboundBufferID target_buffer,
184                                PatchList* patch_list);
185 
186   std::unique_ptr<TraceWriter> CreateTraceWriterInternal(
187       MaybeUnboundBufferID target_buffer,
188       BufferExhaustedPolicy);
189 
190   // Called by the TraceWriter destructor.
191   void ReleaseWriterID(WriterID);
192 
193   void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,
194                                    uint16_t target_buffer_reservation_id,
195                                    BufferID target_buffer_id);
196 
197   // If any flush callbacks were queued up while the arbiter or any target
198   // buffer reservation was unbound, this wraps the pending callbacks into a new
199   // std::function and returns it. Otherwise returns an invalid std::function.
200   std::function<void()> TakePendingFlushCallbacksLocked();
201 
202   // Replace occurrences of target buffer reservation IDs in |commit_data_req_|
203   // with their respective actual BufferIDs if they were already bound. Returns
204   // true iff all occurrences were replaced.
205   bool ReplaceCommitPlaceholderBufferIdsLocked();
206 
207   // Update and return |fully_bound_| based on the arbiter's |pending_writers_|
208   // state.
209   bool UpdateFullyBoundLocked();
210 
211   const bool initially_bound_;
212   // Only accessed on |task_runner_| after the producer endpoint was bound.
213   TracingService::ProducerEndpoint* producer_endpoint_ = nullptr;
214 
215   // --- Begin lock-protected members ---
216 
217   std::mutex lock_;
218 
219   base::TaskRunner* task_runner_ = nullptr;
220   SharedMemoryABI shmem_abi_;
221   size_t page_idx_ = 0;
222   std::unique_ptr<CommitDataRequest> commit_data_req_;
223   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
224   IdAllocator<WriterID> active_writer_ids_;
225 
226   // Whether the arbiter itself and all startup target buffer reservations are
227   // bound. Note that this can become false again later if a new target buffer
228   // reservation is created by calling CreateStartupTraceWriter() with a new
229   // reservation id.
230   bool fully_bound_;
231 
232   // IDs of writers and their assigned target buffers that should be registered
233   // with the service after the arbiter and/or their startup target buffer is
234   // bound.
235   std::map<WriterID, MaybeUnboundBufferID> pending_writers_;
236 
237   // Callbacks for flush requests issued while the arbiter or a target buffer
238   // reservation was unbound.
239   std::vector<std::function<void()>> pending_flush_callbacks_;
240 
241   // Stores target buffer reservations for writers created via
242   // CreateStartupTraceWriter(). A bound reservation sets
243   // TargetBufferReservation::resolved to true and is associated with the actual
244   // BufferID supplied in BindStartupTargetBuffer().
245   //
246   // TODO(eseckler): Clean up entries from this map. This would probably require
247   // a method in SharedMemoryArbiter that allows a producer to invalidate a
248   // reservation ID.
249   std::map<MaybeUnboundBufferID, TargetBufferReservation>
250       target_buffer_reservations_;
251 
252   // --- End lock-protected members ---
253 
254   // Keep at the end.
255   base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_;
256 };
257 
258 }  // namespace perfetto
259 
260 #endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
261