1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 19 #include <optional> 20 21 #include "perfetto/ext/base/unix_socket.h" 22 #include "perfetto/ext/base/utils.h" 23 #include "src/profiling/memory/scoped_spinlock.h" 24 #include "src/profiling/memory/util.h" 25 26 #include <atomic> 27 #include <limits> 28 #include <map> 29 #include <memory> 30 #include <type_traits> 31 32 #include <stdint.h> 33 34 namespace perfetto { 35 namespace profiling { 36 37 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a 38 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET 39 // + O_NONBLOCK socket, specifically: 40 // 41 // - Writes are atomic, data is either written fully in the buffer or not. 42 // - New writes are discarded if the buffer is full. 43 // - If a write succeeds, the reader is guaranteed to see the whole buffer. 44 // - Reads are atomic, no fragmentation. 45 // - The reader sees writes in write order (% discarding). 46 // 47 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 48 // *IMPORTANT*: The ring buffer must be written under the assumption that the 49 // other end modifies arbitrary shared memory without holding the spin-lock. 50 // This means we must make local copies of read and write pointers for doing 51 // bounds checks followed by reads / writes, as they might change in the 52 // meantime. 53 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 54 // 55 // TODO: 56 // - Write a benchmark. 57 class SharedRingBuffer { 58 public: 59 class Buffer { 60 public: Buffer()61 Buffer() {} Buffer(uint8_t * d,size_t s,uint64_t f)62 Buffer(uint8_t* d, size_t s, uint64_t f) 63 : data(d), size(s), bytes_free(f) {} 64 65 Buffer(const Buffer&) = delete; 66 Buffer& operator=(const Buffer&) = delete; 67 68 Buffer(Buffer&&) = default; 69 Buffer& operator=(Buffer&&) = default; 70 71 explicit operator bool() const { return data != nullptr; } 72 73 uint8_t* data = nullptr; 74 size_t size = 0; 75 uint64_t bytes_free = 0; 76 }; 77 78 enum ErrorState : uint64_t { 79 kNoError = 0, 80 kHitTimeout = 1, 81 kInvalidStackBounds = 2, 82 }; 83 84 struct Stats { 85 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) bytes_written; 86 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_succeeded; 87 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_corrupt; 88 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_overflow; 89 90 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_succeeded; 91 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_corrupt; 92 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_nodata; 93 94 // Fields below get set by GetStats as copies of atomics in MetadataPage. 95 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) failed_spinlocks; 96 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) client_spinlock_blocked_us; 97 PERFETTO_CROSS_ABI_ALIGNED(ErrorState) error_state; 98 }; 99 100 static std::optional<SharedRingBuffer> Create(size_t); 101 static std::optional<SharedRingBuffer> Attach(base::ScopedFile); 102 103 ~SharedRingBuffer(); 104 SharedRingBuffer() = default; 105 106 SharedRingBuffer(SharedRingBuffer&&) noexcept; 107 SharedRingBuffer& operator=(SharedRingBuffer&&) noexcept; 108 is_valid()109 bool is_valid() const { return !!mem_; } size()110 size_t size() const { return size_; } fd()111 int fd() const { return *mem_fd_; } write_avail()112 size_t write_avail() { 113 auto pos = GetPointerPositions(); 114 if (!pos) 115 return 0; 116 return write_avail(*pos); 117 } read_avail()118 size_t read_avail() { 119 auto pos = GetPointerPositions(); 120 if (!pos) 121 return 0; 122 return read_avail(*pos); 123 } 124 125 Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size); 126 void EndWrite(Buffer buf); 127 128 Buffer BeginRead(); 129 // Returns the number bytes read from the shared memory buffer. This is 130 // different than the number of bytes returned in the Buffer, because it 131 // includes the header size. 132 size_t EndRead(Buffer); 133 GetStats(ScopedSpinlock & spinlock)134 Stats GetStats(ScopedSpinlock& spinlock) { 135 PERFETTO_DCHECK(spinlock.locked()); 136 Stats stats = meta_->stats; 137 stats.failed_spinlocks = 138 meta_->failed_spinlocks.load(std::memory_order_relaxed); 139 stats.error_state = meta_->error_state.load(std::memory_order_relaxed); 140 stats.client_spinlock_blocked_us = 141 meta_->client_spinlock_blocked_us.load(std::memory_order_relaxed); 142 return stats; 143 } 144 SetErrorState(ErrorState error)145 void SetErrorState(ErrorState error) { meta_->error_state.store(error); } 146 147 // This is used by the caller to be able to hold the SpinLock after 148 // BeginWrite has returned. This is so that additional bookkeeping can be 149 // done under the lock. This will be used to increment the sequence_number. AcquireLock(ScopedSpinlock::Mode mode)150 ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) { 151 auto lock = ScopedSpinlock(&meta_->spinlock, mode); 152 if (PERFETTO_UNLIKELY(!lock.locked())) 153 meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed); 154 return lock; 155 } 156 AddClientSpinlockBlockedUs(size_t n)157 void AddClientSpinlockBlockedUs(size_t n) { 158 meta_->client_spinlock_blocked_us.fetch_add(n, std::memory_order_relaxed); 159 } 160 client_spinlock_blocked_us()161 uint64_t client_spinlock_blocked_us() { 162 return meta_->client_spinlock_blocked_us; 163 } 164 SetShuttingDown()165 void SetShuttingDown() { 166 meta_->shutting_down.store(true, std::memory_order_relaxed); 167 } 168 shutting_down()169 bool shutting_down() { 170 return meta_->shutting_down.load(std::memory_order_relaxed); 171 } 172 SetReaderPaused()173 void SetReaderPaused() { 174 meta_->reader_paused.store(true, std::memory_order_relaxed); 175 } 176 GetAndResetReaderPaused()177 bool GetAndResetReaderPaused() { 178 return meta_->reader_paused.exchange(false, std::memory_order_relaxed); 179 } 180 InfiniteBufferForTesting()181 void InfiniteBufferForTesting() { 182 // Pretend this buffer is really large, while keeping size_mask_ as 183 // original so it keeps wrapping in circles. 184 size_ = std::numeric_limits<size_t>::max() / 2; 185 } 186 187 // Exposed for fuzzers. 188 struct MetadataPage { 189 static_assert(std::is_trivially_constructible<Spinlock>::value, 190 "Spinlock needs to be trivially constructible."); 191 alignas(8) Spinlock spinlock; 192 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) read_pos; 193 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) write_pos; 194 195 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) 196 client_spinlock_blocked_us; 197 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) failed_spinlocks; 198 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<ErrorState>) error_state; 199 alignas(sizeof(uint64_t)) std::atomic<bool> shutting_down; 200 alignas(sizeof(uint64_t)) std::atomic<bool> reader_paused; 201 // For stats that are only accessed by a single thread or under the 202 // spinlock, members of this struct are directly modified. Other stats use 203 // the atomics above this struct. 204 // 205 // When the user requests stats, the atomics above get copied into this 206 // struct, which is then returned. 207 alignas(sizeof(uint64_t)) Stats stats; 208 }; 209 210 static_assert(sizeof(MetadataPage) == 144, 211 "metadata page size needs to be ABI independent"); 212 213 private: 214 struct PointerPositions { 215 uint64_t read_pos; 216 uint64_t write_pos; 217 }; 218 219 struct CreateFlag {}; 220 struct AttachFlag {}; 221 SharedRingBuffer(const SharedRingBuffer&) = delete; 222 SharedRingBuffer& operator=(const SharedRingBuffer&) = delete; 223 SharedRingBuffer(CreateFlag, size_t size); SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)224 SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) { 225 Initialize(std::move(mem_fd)); 226 } 227 228 void Initialize(base::ScopedFile mem_fd); 229 bool IsCorrupt(const PointerPositions& pos); 230 GetPointerPositions()231 inline std::optional<PointerPositions> GetPointerPositions() { 232 PointerPositions pos; 233 // We need to acquire load the write_pos to make sure we observe a 234 // consistent ring buffer in BeginRead, otherwise it is possible that we 235 // observe the write_pos increment, but not the size field write of the 236 // payload. 237 // 238 // This is matched by a release at the end of BeginWrite. 239 pos.write_pos = meta_->write_pos.load(std::memory_order_acquire); 240 pos.read_pos = meta_->read_pos.load(std::memory_order_relaxed); 241 242 std::optional<PointerPositions> result; 243 if (IsCorrupt(pos)) 244 return result; 245 result = pos; 246 return result; 247 } 248 set_size(size_t size)249 inline void set_size(size_t size) { 250 size_ = size; 251 size_mask_ = size - 1; 252 } 253 read_avail(const PointerPositions & pos)254 inline size_t read_avail(const PointerPositions& pos) { 255 PERFETTO_DCHECK(pos.write_pos >= pos.read_pos); 256 auto res = static_cast<size_t>(pos.write_pos - pos.read_pos); 257 PERFETTO_DCHECK(res <= size_); 258 return res; 259 } 260 write_avail(const PointerPositions & pos)261 inline size_t write_avail(const PointerPositions& pos) { 262 return size_ - read_avail(pos); 263 } 264 at(uint64_t pos)265 inline uint8_t* at(uint64_t pos) { return mem_ + (pos & size_mask_); } 266 267 base::ScopedFile mem_fd_; 268 MetadataPage* meta_ = nullptr; // Start of the mmaped region. 269 uint8_t* mem_ = nullptr; // Start of the contents (i.e. meta_ + kPageSize). 270 271 // Size of the ring buffer contents, without including metadata or the 2nd 272 // mmap. 273 size_t size_ = 0; 274 size_t size_mask_ = 0; 275 276 // Remember to update the move ctor when adding new fields. 277 }; 278 279 } // namespace profiling 280 } // namespace perfetto 281 282 #endif // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 283