1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 19 20 #include "perfetto/base/optional.h" 21 #include "perfetto/base/unix_socket.h" 22 #include "perfetto/base/utils.h" 23 #include "src/profiling/memory/scoped_spinlock.h" 24 25 #include <atomic> 26 #include <map> 27 #include <memory> 28 29 #include <stdint.h> 30 31 namespace perfetto { 32 namespace profiling { 33 34 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a 35 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET 36 // + O_NONBLOCK socket, specifically: 37 // 38 // - Writes are atomic, data is either written fully in the buffer or not. 39 // - New writes are discarded if the buffer is full. 40 // - If a write succeeds, the reader is guaranteed to see the whole buffer. 41 // - Reads are atomic, no fragmentation. 42 // - The reader sees writes in write order (% discarding). 43 // 44 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 45 // *IMPORTANT*: The ring buffer must be written under the assumption that the 46 // other end modifies arbitrary shared memory without holding the spin-lock. 47 // This means we must make local copies of read and write pointers for doing 48 // bounds checks followed by reads / writes, as they might change in the 49 // meantime. 50 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 51 // 52 // TODO: 53 // - Write a benchmark. 54 class SharedRingBuffer { 55 public: 56 class Buffer { 57 public: Buffer()58 Buffer() {} Buffer(uint8_t * d,size_t s)59 Buffer(uint8_t* d, size_t s) : data(d), size(s) {} 60 61 Buffer(const Buffer&) = delete; 62 Buffer& operator=(const Buffer&) = delete; 63 64 Buffer(Buffer&&) = default; 65 Buffer& operator=(Buffer&&) = default; 66 67 operator bool() const { return data != nullptr; } 68 69 uint8_t* data = nullptr; 70 size_t size = 0; 71 }; 72 73 struct Stats { 74 uint64_t bytes_written; 75 uint64_t num_writes_succeeded; 76 uint64_t num_writes_corrupt; 77 uint64_t num_writes_overflow; 78 79 uint64_t num_reads_succeeded; 80 uint64_t num_reads_corrupt; 81 uint64_t num_reads_nodata; 82 83 // Fields below get set by GetStats as copies of atomics in MetadataPage. 84 uint64_t failed_spinlocks; 85 }; 86 87 static base::Optional<SharedRingBuffer> Create(size_t); 88 static base::Optional<SharedRingBuffer> Attach(base::ScopedFile); 89 90 ~SharedRingBuffer(); 91 SharedRingBuffer() = default; 92 93 SharedRingBuffer(SharedRingBuffer&&) noexcept; 94 SharedRingBuffer& operator=(SharedRingBuffer&&); 95 is_valid()96 bool is_valid() const { return !!mem_; } size()97 size_t size() const { return size_; } fd()98 int fd() const { return *mem_fd_; } 99 100 Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size); 101 void EndWrite(Buffer buf); 102 103 Buffer BeginRead(); 104 void EndRead(Buffer); 105 GetStats(ScopedSpinlock & spinlock)106 Stats GetStats(ScopedSpinlock& spinlock) { 107 PERFETTO_DCHECK(spinlock.locked()); 108 Stats stats = meta_->stats; 109 stats.failed_spinlocks = 110 meta_->failed_spinlocks.load(std::memory_order_relaxed); 111 return stats; 112 } 113 114 // This is used by the caller to be able to hold the SpinLock after 115 // BeginWrite has returned. This is so that additional bookkeeping can be 116 // done under the lock. This will be used to increment the sequence_number. AcquireLock(ScopedSpinlock::Mode mode)117 ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) { 118 auto lock = ScopedSpinlock(&meta_->spinlock, mode); 119 if (PERFETTO_UNLIKELY(!lock.locked())) 120 meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed); 121 return lock; 122 } 123 124 // Exposed for fuzzers. 125 struct MetadataPage { 126 alignas(uint64_t) std::atomic<bool> spinlock; 127 uint64_t read_pos; 128 uint64_t write_pos; 129 130 std::atomic<uint64_t> failed_spinlocks; 131 Stats stats; 132 }; 133 134 private: 135 struct PointerPositions { 136 uint64_t read_pos; 137 uint64_t write_pos; 138 }; 139 140 struct CreateFlag {}; 141 struct AttachFlag {}; 142 SharedRingBuffer(const SharedRingBuffer&) = delete; 143 SharedRingBuffer& operator=(const SharedRingBuffer&) = delete; 144 SharedRingBuffer(CreateFlag, size_t size); SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)145 SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) { 146 Initialize(std::move(mem_fd)); 147 } 148 149 void Initialize(base::ScopedFile mem_fd); 150 bool IsCorrupt(const PointerPositions& pos); 151 GetPointerPositions(const ScopedSpinlock & lock)152 inline base::Optional<PointerPositions> GetPointerPositions( 153 const ScopedSpinlock& lock) { 154 PERFETTO_DCHECK(lock.locked()); 155 156 PointerPositions pos; 157 pos.read_pos = meta_->read_pos; 158 pos.write_pos = meta_->write_pos; 159 160 base::Optional<PointerPositions> result; 161 if (IsCorrupt(pos)) 162 return result; 163 result = pos; 164 return result; 165 } 166 read_avail(const PointerPositions & pos)167 inline size_t read_avail(const PointerPositions& pos) { 168 PERFETTO_DCHECK(pos.write_pos >= pos.read_pos); 169 auto res = static_cast<size_t>(pos.write_pos - pos.read_pos); 170 PERFETTO_DCHECK(res <= size_); 171 return res; 172 } 173 write_avail(const PointerPositions & pos)174 inline size_t write_avail(const PointerPositions& pos) { 175 return size_ - read_avail(pos); 176 } 177 at(uint64_t pos)178 inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); } 179 180 base::ScopedFile mem_fd_; 181 MetadataPage* meta_ = nullptr; // Start of the mmaped region. 182 uint8_t* mem_ = nullptr; // Start of the contents (i.e. meta_ + kPageSize). 183 184 // Size of the ring buffer contents, without including metadata or the 2nd 185 // mmap. 186 size_t size_ = 0; 187 188 // Remember to update the move ctor when adding new fields. 189 }; 190 191 } // namespace profiling 192 } // namespace perfetto 193 194 #endif // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_ 195