1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 19 20 #include "perfetto/ext/base/optional.h" 21 #include "perfetto/ext/base/unix_socket.h" 22 #include "perfetto/ext/base/utils.h" 23 #include "src/profiling/memory/scoped_spinlock.h" 24 #include "src/profiling/memory/util.h" 25 26 #include <atomic> 27 #include <limits> 28 #include <map> 29 #include <memory> 30 #include <type_traits> 31 32 #include <stdint.h> 33 34 namespace perfetto { 35 namespace profiling { 36 37 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a 38 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET 39 // + O_NONBLOCK socket, specifically: 40 // 41 // - Writes are atomic, data is either written fully in the buffer or not. 42 // - New writes are discarded if the buffer is full. 43 // - If a write succeeds, the reader is guaranteed to see the whole buffer. 44 // - Reads are atomic, no fragmentation. 45 // - The reader sees writes in write order (% discarding). 46 // 47 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 48 // *IMPORTANT*: The ring buffer must be written under the assumption that the 49 // other end modifies arbitrary shared memory without holding the spin-lock. 50 // This means we must make local copies of read and write pointers for doing 51 // bounds checks followed by reads / writes, as they might change in the 52 // meantime. 53 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 54 // 55 // TODO: 56 // - Write a benchmark. 57 class SharedRingBuffer { 58 public: 59 class Buffer { 60 public: Buffer()61 Buffer() {} Buffer(uint8_t * d,size_t s,uint64_t f)62 Buffer(uint8_t* d, size_t s, uint64_t f) 63 : data(d), size(s), bytes_free(f) {} 64 65 Buffer(const Buffer&) = delete; 66 Buffer& operator=(const Buffer&) = delete; 67 68 Buffer(Buffer&&) = default; 69 Buffer& operator=(Buffer&&) = default; 70 71 explicit operator bool() const { return data != nullptr; } 72 73 uint8_t* data = nullptr; 74 size_t size = 0; 75 uint64_t bytes_free = 0; 76 }; 77 78 enum ErrorState : uint64_t { 79 kNoError = 0, 80 kHitTimeout = 1, 81 kInvalidStackBounds = 2, 82 }; 83 84 struct Stats { 85 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) bytes_written; 86 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_succeeded; 87 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_corrupt; 88 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_overflow; 89 90 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_succeeded; 91 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_corrupt; 92 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_nodata; 93 94 // Fields below get set by GetStats as copies of atomics in MetadataPage. 95 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) failed_spinlocks; 96 PERFETTO_CROSS_ABI_ALIGNED(uint64_t) client_spinlock_blocked_us; 97 PERFETTO_CROSS_ABI_ALIGNED(ErrorState) error_state; 98 }; 99 100 static base::Optional<SharedRingBuffer> Create(size_t); 101 static base::Optional<SharedRingBuffer> Attach(base::ScopedFile); 102 103 ~SharedRingBuffer(); 104 SharedRingBuffer() = default; 105 106 SharedRingBuffer(SharedRingBuffer&&) noexcept; 107 SharedRingBuffer& operator=(SharedRingBuffer&&) noexcept; 108 is_valid()109 bool is_valid() const { return !!mem_; } size()110 size_t size() const { return size_; } fd()111 int fd() const { return *mem_fd_; } write_avail()112 size_t write_avail() { 113 auto pos = GetPointerPositions(); 114 if (!pos) 115 return 0; 116 return write_avail(*pos); 117 } 118 119 Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size); 120 void EndWrite(Buffer buf); 121 122 Buffer BeginRead(); 123 void EndRead(Buffer); 124 GetStats(ScopedSpinlock & spinlock)125 Stats GetStats(ScopedSpinlock& spinlock) { 126 PERFETTO_DCHECK(spinlock.locked()); 127 Stats stats = meta_->stats; 128 stats.failed_spinlocks = 129 meta_->failed_spinlocks.load(std::memory_order_relaxed); 130 stats.error_state = meta_->error_state.load(std::memory_order_relaxed); 131 stats.client_spinlock_blocked_us = 132 meta_->client_spinlock_blocked_us.load(std::memory_order_relaxed); 133 return stats; 134 } 135 SetErrorState(ErrorState error)136 void SetErrorState(ErrorState error) { meta_->error_state.store(error); } 137 138 // This is used by the caller to be able to hold the SpinLock after 139 // BeginWrite has returned. This is so that additional bookkeeping can be 140 // done under the lock. This will be used to increment the sequence_number. AcquireLock(ScopedSpinlock::Mode mode)141 ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) { 142 auto lock = ScopedSpinlock(&meta_->spinlock, mode); 143 if (PERFETTO_UNLIKELY(!lock.locked())) 144 meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed); 145 return lock; 146 } 147 AddClientSpinlockBlockedUs(size_t n)148 void AddClientSpinlockBlockedUs(size_t n) { 149 meta_->client_spinlock_blocked_us.fetch_add(n, std::memory_order_relaxed); 150 } 151 client_spinlock_blocked_us()152 uint64_t client_spinlock_blocked_us() { 153 return meta_->client_spinlock_blocked_us; 154 } 155 SetShuttingDown()156 void SetShuttingDown() { 157 meta_->shutting_down.store(true, std::memory_order_relaxed); 158 } 159 shutting_down()160 bool shutting_down() { 161 return meta_->shutting_down.load(std::memory_order_relaxed); 162 } 163 SetReaderPaused()164 void SetReaderPaused() { 165 meta_->reader_paused.store(true, std::memory_order_relaxed); 166 } 167 GetAndResetReaderPaused()168 bool GetAndResetReaderPaused() { 169 return meta_->reader_paused.exchange(false, std::memory_order_relaxed); 170 } 171 InfiniteBufferForTesting()172 void InfiniteBufferForTesting() { 173 // Pretend this buffer is really large, while keeping size_mask_ as 174 // original so it keeps wrapping in circles. 175 size_ = std::numeric_limits<size_t>::max() / 2; 176 } 177 178 // Exposed for fuzzers. 179 struct MetadataPage { 180 static_assert(std::is_trivially_constructible<Spinlock>::value, 181 "Spinlock needs to be trivially constructible."); 182 alignas(8) Spinlock spinlock; 183 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) read_pos; 184 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) write_pos; 185 186 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) 187 client_spinlock_blocked_us; 188 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) failed_spinlocks; 189 PERFETTO_CROSS_ABI_ALIGNED(std::atomic<ErrorState>) error_state; 190 alignas(sizeof(uint64_t)) std::atomic<bool> shutting_down; 191 alignas(sizeof(uint64_t)) std::atomic<bool> reader_paused; 192 // For stats that are only accessed by a single thread or under the 193 // spinlock, members of this struct are directly modified. Other stats use 194 // the atomics above this struct. 195 // 196 // When the user requests stats, the atomics above get copied into this 197 // struct, which is then returned. 198 alignas(sizeof(uint64_t)) Stats stats; 199 }; 200 201 static_assert(sizeof(MetadataPage) == 144, 202 "metadata page size needs to be ABI independent"); 203 204 private: 205 struct PointerPositions { 206 uint64_t read_pos; 207 uint64_t write_pos; 208 }; 209 210 struct CreateFlag {}; 211 struct AttachFlag {}; 212 SharedRingBuffer(const SharedRingBuffer&) = delete; 213 SharedRingBuffer& operator=(const SharedRingBuffer&) = delete; 214 SharedRingBuffer(CreateFlag, size_t size); SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)215 SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) { 216 Initialize(std::move(mem_fd)); 217 } 218 219 void Initialize(base::ScopedFile mem_fd); 220 bool IsCorrupt(const PointerPositions& pos); 221 GetPointerPositions()222 inline base::Optional<PointerPositions> GetPointerPositions() { 223 PointerPositions pos; 224 // We need to acquire load the write_pos to make sure we observe a 225 // consistent ring buffer in BeginRead, otherwise it is possible that we 226 // observe the write_pos increment, but not the size field write of the 227 // payload. 228 // 229 // This is matched by a release at the end of BeginWrite. 230 pos.write_pos = meta_->write_pos.load(std::memory_order_acquire); 231 pos.read_pos = meta_->read_pos.load(std::memory_order_relaxed); 232 233 base::Optional<PointerPositions> result; 234 if (IsCorrupt(pos)) 235 return result; 236 result = pos; 237 return result; 238 } 239 set_size(size_t size)240 inline void set_size(size_t size) { 241 size_ = size; 242 size_mask_ = size - 1; 243 } 244 read_avail(const PointerPositions & pos)245 inline size_t read_avail(const PointerPositions& pos) { 246 PERFETTO_DCHECK(pos.write_pos >= pos.read_pos); 247 auto res = static_cast<size_t>(pos.write_pos - pos.read_pos); 248 PERFETTO_DCHECK(res <= size_); 249 return res; 250 } 251 write_avail(const PointerPositions & pos)252 inline size_t write_avail(const PointerPositions& pos) { 253 return size_ - read_avail(pos); 254 } 255 at(uint64_t pos)256 inline uint8_t* at(uint64_t pos) { return mem_ + (pos & size_mask_); } 257 258 base::ScopedFile mem_fd_; 259 MetadataPage* meta_ = nullptr; // Start of the mmaped region. 260 uint8_t* mem_ = nullptr; // Start of the contents (i.e. meta_ + kPageSize). 261 262 // Size of the ring buffer contents, without including metadata or the 2nd 263 // mmap. 264 size_t size_ = 0; 265 size_t size_mask_ = 0; 266 267 // Remember to update the move ctor when adding new fields. 268 }; 269 270 } // namespace profiling 271 } // namespace perfetto 272 273 #endif // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 274