1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 19 20 #include "perfetto/ext/base/optional.h" 21 #include "perfetto/ext/base/unix_socket.h" 22 #include "perfetto/ext/base/utils.h" 23 #include "src/profiling/memory/scoped_spinlock.h" 24 25 #include <atomic> 26 #include <map> 27 #include <memory> 28 29 #include <stdint.h> 30 31 namespace perfetto { 32 namespace profiling { 33 34 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a 35 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET 36 // + O_NONBLOCK socket, specifically: 37 // 38 // - Writes are atomic, data is either written fully in the buffer or not. 39 // - New writes are discarded if the buffer is full. 40 // - If a write succeeds, the reader is guaranteed to see the whole buffer. 41 // - Reads are atomic, no fragmentation. 42 // - The reader sees writes in write order (% discarding). 43 // 44 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 45 // *IMPORTANT*: The ring buffer must be written under the assumption that the 46 // other end modifies arbitrary shared memory without holding the spin-lock. 47 // This means we must make local copies of read and write pointers for doing 48 // bounds checks followed by reads / writes, as they might change in the 49 // meantime. 50 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 51 // 52 // TODO: 53 // - Write a benchmark. 54 class SharedRingBuffer { 55 public: 56 class Buffer { 57 public: Buffer()58 Buffer() {} Buffer(uint8_t * d,size_t s)59 Buffer(uint8_t* d, size_t s) : data(d), size(s) {} 60 61 Buffer(const Buffer&) = delete; 62 Buffer& operator=(const Buffer&) = delete; 63 64 Buffer(Buffer&&) = default; 65 Buffer& operator=(Buffer&&) = default; 66 67 operator bool() const { return data != nullptr; } 68 69 uint8_t* data = nullptr; 70 size_t size = 0; 71 }; 72 73 struct Stats { 74 uint64_t bytes_written; 75 uint64_t num_writes_succeeded; 76 uint64_t num_writes_corrupt; 77 uint64_t num_writes_overflow; 78 79 uint64_t num_reads_succeeded; 80 uint64_t num_reads_corrupt; 81 uint64_t num_reads_nodata; 82 83 // Fields below get set by GetStats as copies of atomics in MetadataPage. 84 uint64_t failed_spinlocks; 85 }; 86 87 static base::Optional<SharedRingBuffer> Create(size_t); 88 static base::Optional<SharedRingBuffer> Attach(base::ScopedFile); 89 90 ~SharedRingBuffer(); 91 SharedRingBuffer() = default; 92 93 SharedRingBuffer(SharedRingBuffer&&) noexcept; 94 SharedRingBuffer& operator=(SharedRingBuffer&&); 95 is_valid()96 bool is_valid() const { return !!mem_; } size()97 size_t size() const { return size_; } fd()98 int fd() const { return *mem_fd_; } 99 100 Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size); 101 void EndWrite(Buffer buf); 102 103 Buffer BeginRead(); 104 void EndRead(Buffer); 105 GetStats(ScopedSpinlock & spinlock)106 Stats GetStats(ScopedSpinlock& spinlock) { 107 PERFETTO_DCHECK(spinlock.locked()); 108 Stats stats = meta_->stats; 109 stats.failed_spinlocks = 110 meta_->failed_spinlocks.load(std::memory_order_relaxed); 111 return stats; 112 } 113 114 // This is used by the caller to be able to hold the SpinLock after 115 // BeginWrite has returned. This is so that additional bookkeeping can be 116 // done under the lock. This will be used to increment the sequence_number. AcquireLock(ScopedSpinlock::Mode mode)117 ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) { 118 auto lock = ScopedSpinlock(&meta_->spinlock, mode); 119 if (PERFETTO_UNLIKELY(!lock.locked())) 120 meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed); 121 return lock; 122 } 123 124 // Exposed for fuzzers. 125 struct MetadataPage { 126 alignas(uint64_t) std::atomic<bool> spinlock; 127 std::atomic<uint64_t> read_pos; 128 std::atomic<uint64_t> write_pos; 129 130 std::atomic<uint64_t> failed_spinlocks; 131 // For stats that are only accessed by a single thread or under the 132 // spinlock, members of this struct are directly modified. Other stats use 133 // the atomics above this struct. 134 // 135 // When the user requests stats, the atomics above get copied into this 136 // struct, which is then returned. 137 Stats stats; 138 }; 139 140 private: 141 struct PointerPositions { 142 uint64_t read_pos; 143 uint64_t write_pos; 144 }; 145 146 struct CreateFlag {}; 147 struct AttachFlag {}; 148 SharedRingBuffer(const SharedRingBuffer&) = delete; 149 SharedRingBuffer& operator=(const SharedRingBuffer&) = delete; 150 SharedRingBuffer(CreateFlag, size_t size); SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)151 SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) { 152 Initialize(std::move(mem_fd)); 153 } 154 155 void Initialize(base::ScopedFile mem_fd); 156 bool IsCorrupt(const PointerPositions& pos); 157 GetPointerPositions()158 inline base::Optional<PointerPositions> GetPointerPositions() { 159 PointerPositions pos; 160 // We need to acquire load the write_pos to make sure we observe a 161 // consistent ring buffer in BeginRead, otherwise it is possible that we 162 // observe the write_pos increment, but not the size field write of the 163 // payload. 164 // 165 // This is matched by a release at the end of BeginWrite. 166 pos.write_pos = meta_->write_pos.load(std::memory_order_acquire); 167 pos.read_pos = meta_->read_pos.load(std::memory_order_relaxed); 168 169 base::Optional<PointerPositions> result; 170 if (IsCorrupt(pos)) 171 return result; 172 result = pos; 173 return result; 174 } 175 read_avail(const PointerPositions & pos)176 inline size_t read_avail(const PointerPositions& pos) { 177 PERFETTO_DCHECK(pos.write_pos >= pos.read_pos); 178 auto res = static_cast<size_t>(pos.write_pos - pos.read_pos); 179 PERFETTO_DCHECK(res <= size_); 180 return res; 181 } 182 write_avail(const PointerPositions & pos)183 inline size_t write_avail(const PointerPositions& pos) { 184 return size_ - read_avail(pos); 185 } 186 at(uint64_t pos)187 inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); } 188 189 base::ScopedFile mem_fd_; 190 MetadataPage* meta_ = nullptr; // Start of the mmaped region. 191 uint8_t* mem_ = nullptr; // Start of the contents (i.e. meta_ + kPageSize). 192 193 // Size of the ring buffer contents, without including metadata or the 2nd 194 // mmap. 195 size_t size_ = 0; 196 197 // Remember to update the move ctor when adding new fields. 198 }; 199 200 } // namespace profiling 201 } // namespace perfetto 202 203 #endif // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 204