1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 19 20 #include <stdint.h> 21 22 #include <functional> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <vector> 27 28 #include "perfetto/ext/base/weak_ptr.h" 29 #include "perfetto/ext/tracing/core/basic_types.h" 30 #include "perfetto/ext/tracing/core/shared_memory_abi.h" 31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" 32 #include "perfetto/tracing/core/forward_decls.h" 33 #include "src/tracing/core/id_allocator.h" 34 35 namespace perfetto { 36 37 class PatchList; 38 class Patch; 39 class TraceWriter; 40 class TraceWriterImpl; 41 42 namespace base { 43 class TaskRunner; 44 } // namespace base 45 46 // This class handles the shared memory buffer on the producer side. It is used 47 // to obtain thread-local chunks and to partition pages from several threads. 48 // There is one arbiter instance per Producer. 49 // This class is thread-safe and uses locks to do so. Data sources are supposed 50 // to interact with this sporadically, only when they run out of space on their 51 // current thread-local chunk. 52 // 53 // The arbiter can become "unbound" as a consequence of: 54 // (a) being created without an endpoint 55 // (b) CreateStartupTraceWriter calls after creation (whether created with or 56 // without endpoint). 57 // 58 // Entering the unbound state is only supported if all trace writers are created 59 // in kDrop mode. In the unbound state, the arbiter buffers commit messages 60 // until all trace writers are bound to a target buffer. 61 // 62 // The following state transitions are possible: 63 // 64 // CreateInstance() 65 // | 66 // | CreateUnboundInstance() 67 // | | 68 // | | 69 // | V 70 // | [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ] 71 // | | | 72 // | | | CreateStartupTraceWriter(buf) 73 // | | | buffer reservations += buf 74 // | | | 75 // | | | ---- 76 // | | | | | CreateStartupTraceWriter(buf) 77 // | | | | | buffer reservations += buf 78 // | | V | V 79 // | | [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ] 80 // | | | 81 // | | BindToProducerEndpoint() | 82 // | | | 83 // | | BindToProducerEndpoint() | 84 // | | V 85 // | | [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ] 86 // | | A | A | A 87 // | | | | | | | 88 // | | | ---- | | 89 // | | | CreateStartupTraceWriter(buf) | | 90 // | | | buffer reservations += buf | | 91 // | | | | | 92 // | | | CreateStartupTraceWriter(buf) | | 93 // | | | where buf is not yet bound | | 94 // | | | buffer reservations += buf | | (yes) 95 // | | | | | 96 // | | | BindStartupTargetBuffer(buf, id) |----- 97 // | | | buffer reservations -= buf | reservations > 0? 98 // | | | | 99 // | | | | (no) 100 // | V | V 101 // --> [ fully_bound_, endpoint_, 0 unbound buffer reservations ] 102 // | A 103 // | | CreateStartupTraceWriter(buf) 104 // | | where buf is already bound 105 // ---- 106 class SharedMemoryArbiterImpl : public SharedMemoryArbiter { 107 public: 108 // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the 109 // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may 110 // be |nullptr| if created unbound, see 111 // SharedMemoryArbiter::CreateUnboundInstance(). 112 SharedMemoryArbiterImpl(void* start, 113 size_t size, 114 size_t page_size, 115 TracingService::ProducerEndpoint*, 116 base::TaskRunner*); 117 118 // Returns a new Chunk to write tracing data. Depending on the provided 119 // BufferExhaustedPolicy, this may return an invalid chunk if no valid free 120 // chunk could be found in the SMB. 121 SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&, 122 BufferExhaustedPolicy, 123 size_t size_hint = 0); 124 125 // Puts back a Chunk that has been completed and sends a request to the 126 // service to move it to the central tracing buffer. |target_buffer| is the 127 // absolute trace buffer ID where the service should move the chunk onto (the 128 // producer is just to copy back the same number received in the 129 // DataSourceConfig upon the StartDataSource() reques). 130 // PatchList is a pointer to the list of patches for previous chunks. The 131 // first patched entries will be removed from the patched list and sent over 132 // to the service in the same CommitData() IPC request. 133 void ReturnCompletedChunk(SharedMemoryABI::Chunk, 134 MaybeUnboundBufferID target_buffer, 135 PatchList*); 136 137 // Send a request to the service to apply completed patches from |patch_list|. 138 // |writer_id| is the ID of the TraceWriter that calls this method, 139 // |target_buffer| is the global trace buffer ID of its target buffer. 140 void SendPatches(WriterID writer_id, 141 MaybeUnboundBufferID target_buffer, 142 PatchList* patch_list); 143 shmem_abi_for_testing()144 SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; } 145 set_default_layout_for_testing(SharedMemoryABI::PageLayout l)146 static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) { 147 default_page_layout = l; 148 } 149 default_page_layout_for_testing()150 static SharedMemoryABI::PageLayout default_page_layout_for_testing() { 151 return default_page_layout; 152 } 153 154 // SharedMemoryArbiter implementation. 155 // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments. 156 std::unique_ptr<TraceWriter> CreateTraceWriter( 157 BufferID target_buffer, 158 BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override; 159 std::unique_ptr<TraceWriter> CreateStartupTraceWriter( 160 uint16_t target_buffer_reservation_id) override; 161 void BindToProducerEndpoint(TracingService::ProducerEndpoint*, 162 base::TaskRunner*) override; 163 void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id, 164 BufferID target_buffer_id) override; 165 void AbortStartupTracingForReservation( 166 uint16_t target_buffer_reservation_id) override; 167 void NotifyFlushComplete(FlushRequestID) override; 168 169 void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override; 170 171 bool EnableDirectSMBPatching() override; 172 173 void SetDirectSMBPatchingSupportedByService() override; 174 175 void FlushPendingCommitDataRequests( 176 std::function<void()> callback = {}) override; 177 bool TryShutdown() override; 178 task_runner()179 base::TaskRunner* task_runner() const { return task_runner_; } page_size()180 size_t page_size() const { return shmem_abi_.page_size(); } num_pages()181 size_t num_pages() const { return shmem_abi_.num_pages(); } 182 GetWeakPtr()183 base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const { 184 return weak_ptr_factory_.GetWeakPtr(); 185 } 186 187 private: 188 friend class TraceWriterImpl; 189 friend class StartupTraceWriterTest; 190 friend class SharedMemoryArbiterImplTest; 191 192 struct TargetBufferReservation { 193 bool resolved = false; 194 BufferID target_buffer = kInvalidBufferId; 195 }; 196 197 // Placeholder for the actual target buffer ID of a startup target buffer 198 // reservation ID in |target_buffer_reservations_|. 199 static constexpr BufferID kInvalidBufferId = 0; 200 201 static SharedMemoryABI::PageLayout default_page_layout; 202 203 SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete; 204 SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete; 205 206 void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk, 207 WriterID writer_id, 208 MaybeUnboundBufferID target_buffer, 209 PatchList* patch_list); 210 211 // Search the chunks that are being batched in |commit_data_req_| for a chunk 212 // that needs patching and that matches the provided |writer_id| and 213 // |patch.chunk_id|. If found, apply |patch| to that chunk, and if 214 // |chunk_needs_more_patching| is true, clear the needs patching flag of the 215 // chunk and mark it as complete - to allow the service to read it (and other 216 // chunks after it) during scraping. Returns true if the patch was applied, 217 // false otherwise. 218 // 219 // Note: the caller must be holding |lock_| for the duration of the call. 220 bool TryDirectPatchLocked(WriterID writer_id, 221 const Patch& patch, 222 bool chunk_needs_more_patching); 223 std::unique_ptr<TraceWriter> CreateTraceWriterInternal( 224 MaybeUnboundBufferID target_buffer, 225 BufferExhaustedPolicy); 226 227 // Called by the TraceWriter destructor. 228 void ReleaseWriterID(WriterID); 229 230 void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock, 231 uint16_t target_buffer_reservation_id, 232 BufferID target_buffer_id); 233 234 // If any flush callbacks were queued up while the arbiter or any target 235 // buffer reservation was unbound, this wraps the pending callbacks into a new 236 // std::function and returns it. Otherwise returns an invalid std::function. 237 std::function<void()> TakePendingFlushCallbacksLocked(); 238 239 // Replace occurrences of target buffer reservation IDs in |commit_data_req_| 240 // with their respective actual BufferIDs if they were already bound. Returns 241 // true iff all occurrences were replaced. 242 bool ReplaceCommitPlaceholderBufferIdsLocked(); 243 244 // Update and return |fully_bound_| based on the arbiter's |pending_writers_| 245 // state. 246 bool UpdateFullyBoundLocked(); 247 248 // Only accessed on |task_runner_| after the producer endpoint was bound. 249 TracingService::ProducerEndpoint* producer_endpoint_ = nullptr; 250 251 // --- Begin lock-protected members --- 252 253 std::mutex lock_; 254 255 base::TaskRunner* task_runner_ = nullptr; 256 SharedMemoryABI shmem_abi_; 257 size_t page_idx_ = 0; 258 std::unique_ptr<CommitDataRequest> commit_data_req_; 259 size_t bytes_pending_commit_ = 0; // SUM(chunk.size() : commit_data_req_). 260 IdAllocator<WriterID> active_writer_ids_; 261 bool did_shutdown_ = false; 262 263 // Whether the arbiter itself and all startup target buffer reservations are 264 // bound. Note that this can become false again later if a new target buffer 265 // reservation is created by calling CreateStartupTraceWriter() with a new 266 // reservation id. 267 bool fully_bound_; 268 269 // Whether the arbiter was always bound. If false, the arbiter was unbound at 270 // one point in time. 271 bool was_always_bound_; 272 273 // Whether all created trace writers were created with kDrop policy. 274 bool all_writers_have_drop_policy_ = true; 275 276 // IDs of writers and their assigned target buffers that should be registered 277 // with the service after the arbiter and/or their startup target buffer is 278 // bound. 279 std::map<WriterID, MaybeUnboundBufferID> pending_writers_; 280 281 // Callbacks for flush requests issued while the arbiter or a target buffer 282 // reservation was unbound. 283 std::vector<std::function<void()>> pending_flush_callbacks_; 284 285 // See SharedMemoryArbiter::SetBatchCommitsDuration. 286 uint32_t batch_commits_duration_ms_ = 0; 287 288 // See SharedMemoryArbiter::EnableDirectSMBPatching. 289 bool direct_patching_enabled_ = false; 290 291 // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService. 292 bool direct_patching_supported_by_service_ = false; 293 294 // Indicates whether we have already scheduled a delayed flush for the 295 // purposes of batching. Set to true at the beginning of a batching period and 296 // cleared at the end of the period. Immediate flushes that happen during a 297 // batching period will empty the |commit_data_req| (triggering an immediate 298 // IPC to the service), but will not clear this flag and the 299 // previously-scheduled delayed flush will still occur at the end of the 300 // batching period. 301 bool delayed_flush_scheduled_ = false; 302 303 // Stores target buffer reservations for writers created via 304 // CreateStartupTraceWriter(). A bound reservation sets 305 // TargetBufferReservation::resolved to true and is associated with the actual 306 // BufferID supplied in BindStartupTargetBuffer(). 307 // 308 // TODO(eseckler): Clean up entries from this map. This would probably require 309 // a method in SharedMemoryArbiter that allows a producer to invalidate a 310 // reservation ID. 311 std::map<MaybeUnboundBufferID, TargetBufferReservation> 312 target_buffer_reservations_; 313 314 // --- End lock-protected members --- 315 316 // Keep at the end. 317 base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_; 318 }; 319 320 } // namespace perfetto 321 322 #endif // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 323