1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 19 20 #include <stdint.h> 21 22 #include <functional> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <vector> 27 28 #include "perfetto/ext/base/weak_ptr.h" 29 #include "perfetto/ext/tracing/core/basic_types.h" 30 #include "perfetto/ext/tracing/core/shared_memory_abi.h" 31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" 32 #include "perfetto/tracing/core/forward_decls.h" 33 #include "src/tracing/core/id_allocator.h" 34 35 namespace perfetto { 36 37 class PatchList; 38 class Patch; 39 class TraceWriter; 40 class TraceWriterImpl; 41 42 namespace base { 43 class TaskRunner; 44 } // namespace base 45 46 // This class handles the shared memory buffer on the producer side. It is used 47 // to obtain thread-local chunks and to partition pages from several threads. 48 // There is one arbiter instance per Producer. 49 // This class is thread-safe and uses locks to do so. Data sources are supposed 50 // to interact with this sporadically, only when they run out of space on their 51 // current thread-local chunk. 52 // 53 // When the arbiter is created using CreateUnboundInstance(), the following 54 // state transitions are possible: 55 // 56 // [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ] 57 // | | 58 // | | CreateStartupTraceWriter(buf) 59 // | | buffer reservations += buf 60 // | | 61 // | | ---- 62 // | | | | CreateStartupTraceWriter(buf) 63 // | | | | buffer reservations += buf 64 // | V | V 65 // | [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ] 66 // | | 67 // | BindToProducerEndpoint() | 68 // | | 69 // | BindToProducerEndpoint() | 70 // | V 71 // | [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ] 72 // | A | A | A 73 // | | | | | | 74 // | | ---- | | 75 // | | CreateStartupTraceWriter(buf) | | 76 // | | buffer reservations += buf | | 77 // | | | | 78 // | | CreateStartupTraceWriter(buf) | | 79 // | | where buf is not yet bound | | 80 // | | buffer reservations += buf | | (yes) 81 // | | | | 82 // | | BindStartupTargetBuffer(buf, id) |----- 83 // | | buffer reservations -= buf | reservations > 0? 84 // | | | 85 // | | | (no) 86 // V | V 87 // [ fully_bound_, endpoint_, 0 unbound buffer reservations ] 88 // | A 89 // | | CreateStartupTraceWriter(buf) 90 // | | where buf is already bound 91 // ---- 92 class SharedMemoryArbiterImpl : public SharedMemoryArbiter { 93 public: 94 // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the 95 // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may 96 // be |nullptr| if created unbound, see 97 // SharedMemoryArbiter::CreateUnboundInstance(). 98 SharedMemoryArbiterImpl(void* start, 99 size_t size, 100 size_t page_size, 101 TracingService::ProducerEndpoint*, 102 base::TaskRunner*); 103 104 // Returns a new Chunk to write tracing data. Depending on the provided 105 // BufferExhaustedPolicy, this may return an invalid chunk if no valid free 106 // chunk could be found in the SMB. 107 SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&, 108 BufferExhaustedPolicy, 109 size_t size_hint = 0); 110 111 // Puts back a Chunk that has been completed and sends a request to the 112 // service to move it to the central tracing buffer. |target_buffer| is the 113 // absolute trace buffer ID where the service should move the chunk onto (the 114 // producer is just to copy back the same number received in the 115 // DataSourceConfig upon the StartDataSource() reques). 116 // PatchList is a pointer to the list of patches for previous chunks. The 117 // first patched entries will be removed from the patched list and sent over 118 // to the service in the same CommitData() IPC request. 119 void ReturnCompletedChunk(SharedMemoryABI::Chunk, 120 MaybeUnboundBufferID target_buffer, 121 PatchList*); 122 123 // Send a request to the service to apply completed patches from |patch_list|. 124 // |writer_id| is the ID of the TraceWriter that calls this method, 125 // |target_buffer| is the global trace buffer ID of its target buffer. 126 void SendPatches(WriterID writer_id, 127 MaybeUnboundBufferID target_buffer, 128 PatchList* patch_list); 129 shmem_abi_for_testing()130 SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; } 131 set_default_layout_for_testing(SharedMemoryABI::PageLayout l)132 static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) { 133 default_page_layout = l; 134 } 135 136 // SharedMemoryArbiter implementation. 137 // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments. 138 std::unique_ptr<TraceWriter> CreateTraceWriter( 139 BufferID target_buffer, 140 BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override; 141 std::unique_ptr<TraceWriter> CreateStartupTraceWriter( 142 uint16_t target_buffer_reservation_id) override; 143 void BindToProducerEndpoint(TracingService::ProducerEndpoint*, 144 base::TaskRunner*) override; 145 void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id, 146 BufferID target_buffer_id) override; 147 void AbortStartupTracingForReservation( 148 uint16_t target_buffer_reservation_id) override; 149 void NotifyFlushComplete(FlushRequestID) override; 150 151 void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override; 152 153 bool EnableDirectSMBPatching() override; 154 155 void SetDirectSMBPatchingSupportedByService() override; 156 157 void FlushPendingCommitDataRequests( 158 std::function<void()> callback = {}) override; 159 bool TryShutdown() override; 160 task_runner()161 base::TaskRunner* task_runner() const { return task_runner_; } page_size()162 size_t page_size() const { return shmem_abi_.page_size(); } num_pages()163 size_t num_pages() const { return shmem_abi_.num_pages(); } 164 GetWeakPtr()165 base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const { 166 return weak_ptr_factory_.GetWeakPtr(); 167 } 168 169 private: 170 friend class TraceWriterImpl; 171 friend class StartupTraceWriterTest; 172 friend class SharedMemoryArbiterImplTest; 173 174 struct TargetBufferReservation { 175 bool resolved = false; 176 BufferID target_buffer = kInvalidBufferId; 177 }; 178 179 // Placeholder for the actual target buffer ID of a startup target buffer 180 // reservation ID in |target_buffer_reservations_|. 181 static constexpr BufferID kInvalidBufferId = 0; 182 183 static SharedMemoryABI::PageLayout default_page_layout; 184 185 SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete; 186 SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete; 187 188 void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk, 189 WriterID writer_id, 190 MaybeUnboundBufferID target_buffer, 191 PatchList* patch_list); 192 193 // Search the chunks that are being batched in |commit_data_req_| for a chunk 194 // that needs patching and that matches the provided |writer_id| and 195 // |patch.chunk_id|. If found, apply |patch| to that chunk, and if 196 // |chunk_needs_more_patching| is true, clear the needs patching flag of the 197 // chunk and mark it as complete - to allow the service to read it (and other 198 // chunks after it) during scraping. Returns true if the patch was applied, 199 // false otherwise. 200 // 201 // Note: the caller must be holding |lock_| for the duration of the call. 202 bool TryDirectPatchLocked(WriterID writer_id, 203 const Patch& patch, 204 bool chunk_needs_more_patching); 205 std::unique_ptr<TraceWriter> CreateTraceWriterInternal( 206 MaybeUnboundBufferID target_buffer, 207 BufferExhaustedPolicy); 208 209 // Called by the TraceWriter destructor. 210 void ReleaseWriterID(WriterID); 211 212 void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock, 213 uint16_t target_buffer_reservation_id, 214 BufferID target_buffer_id); 215 216 // If any flush callbacks were queued up while the arbiter or any target 217 // buffer reservation was unbound, this wraps the pending callbacks into a new 218 // std::function and returns it. Otherwise returns an invalid std::function. 219 std::function<void()> TakePendingFlushCallbacksLocked(); 220 221 // Replace occurrences of target buffer reservation IDs in |commit_data_req_| 222 // with their respective actual BufferIDs if they were already bound. Returns 223 // true iff all occurrences were replaced. 224 bool ReplaceCommitPlaceholderBufferIdsLocked(); 225 226 // Update and return |fully_bound_| based on the arbiter's |pending_writers_| 227 // state. 228 bool UpdateFullyBoundLocked(); 229 230 const bool initially_bound_; 231 232 // Only accessed on |task_runner_| after the producer endpoint was bound. 233 TracingService::ProducerEndpoint* producer_endpoint_ = nullptr; 234 235 // --- Begin lock-protected members --- 236 237 std::mutex lock_; 238 239 base::TaskRunner* task_runner_ = nullptr; 240 SharedMemoryABI shmem_abi_; 241 size_t page_idx_ = 0; 242 std::unique_ptr<CommitDataRequest> commit_data_req_; 243 size_t bytes_pending_commit_ = 0; // SUM(chunk.size() : commit_data_req_). 244 IdAllocator<WriterID> active_writer_ids_; 245 bool did_shutdown_ = false; 246 247 // Whether the arbiter itself and all startup target buffer reservations are 248 // bound. Note that this can become false again later if a new target buffer 249 // reservation is created by calling CreateStartupTraceWriter() with a new 250 // reservation id. 251 bool fully_bound_; 252 253 // IDs of writers and their assigned target buffers that should be registered 254 // with the service after the arbiter and/or their startup target buffer is 255 // bound. 256 std::map<WriterID, MaybeUnboundBufferID> pending_writers_; 257 258 // Callbacks for flush requests issued while the arbiter or a target buffer 259 // reservation was unbound. 260 std::vector<std::function<void()>> pending_flush_callbacks_; 261 262 // See SharedMemoryArbiter::SetBatchCommitsDuration. 263 uint32_t batch_commits_duration_ms_ = 0; 264 265 // See SharedMemoryArbiter::EnableDirectSMBPatching. 266 bool direct_patching_enabled_ = false; 267 268 // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService. 269 bool direct_patching_supported_by_service_ = false; 270 271 // Indicates whether we have already scheduled a delayed flush for the 272 // purposes of batching. Set to true at the beginning of a batching period and 273 // cleared at the end of the period. Immediate flushes that happen during a 274 // batching period will empty the |commit_data_req| (triggering an immediate 275 // IPC to the service), but will not clear this flag and the 276 // previously-scheduled delayed flush will still occur at the end of the 277 // batching period. 278 bool delayed_flush_scheduled_ = false; 279 280 // Stores target buffer reservations for writers created via 281 // CreateStartupTraceWriter(). A bound reservation sets 282 // TargetBufferReservation::resolved to true and is associated with the actual 283 // BufferID supplied in BindStartupTargetBuffer(). 284 // 285 // TODO(eseckler): Clean up entries from this map. This would probably require 286 // a method in SharedMemoryArbiter that allows a producer to invalidate a 287 // reservation ID. 288 std::map<MaybeUnboundBufferID, TargetBufferReservation> 289 target_buffer_reservations_; 290 291 // --- End lock-protected members --- 292 293 // Keep at the end. 294 base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_; 295 }; 296 297 } // namespace perfetto 298 299 #endif // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 300