• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
19 #include <optional>
20 
21 #include "perfetto/ext/base/unix_socket.h"
22 #include "perfetto/ext/base/utils.h"
23 #include "src/profiling/memory/scoped_spinlock.h"
24 #include "src/profiling/memory/util.h"
25 
26 #include <atomic>
27 #include <limits>
28 #include <map>
29 #include <memory>
30 #include <type_traits>
31 
32 #include <stdint.h>
33 
34 namespace perfetto {
35 namespace profiling {
36 
37 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a
38 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
39 // + O_NONBLOCK socket, specifically:
40 //
41 // - Writes are atomic, data is either written fully in the buffer or not.
42 // - New writes are discarded if the buffer is full.
43 // - If a write succeeds, the reader is guaranteed to see the whole buffer.
44 // - Reads are atomic, no fragmentation.
45 // - The reader sees writes in write order (% discarding).
46 //
47 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
48 // *IMPORTANT*: The ring buffer must be written under the assumption that the
49 // other end modifies arbitrary shared memory without holding the spin-lock.
50 // This means we must make local copies of read and write pointers for doing
51 // bounds checks followed by reads / writes, as they might change in the
52 // meantime.
53 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
54 //
55 // TODO:
56 // - Write a benchmark.
57 class SharedRingBuffer {
58  public:
59   class Buffer {
60    public:
Buffer()61     Buffer() {}
Buffer(uint8_t * d,size_t s,uint64_t f)62     Buffer(uint8_t* d, size_t s, uint64_t f)
63         : data(d), size(s), bytes_free(f) {}
64 
65     Buffer(const Buffer&) = delete;
66     Buffer& operator=(const Buffer&) = delete;
67 
68     Buffer(Buffer&&) = default;
69     Buffer& operator=(Buffer&&) = default;
70 
71     explicit operator bool() const { return data != nullptr; }
72 
73     uint8_t* data = nullptr;
74     size_t size = 0;
75     uint64_t bytes_free = 0;
76   };
77 
78   enum ErrorState : uint64_t {
79     kNoError = 0,
80     kHitTimeout = 1,
81     kInvalidStackBounds = 2,
82   };
83 
84   struct Stats {
85     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) bytes_written;
86     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_succeeded;
87     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_corrupt;
88     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_writes_overflow;
89 
90     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_succeeded;
91     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_corrupt;
92     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) num_reads_nodata;
93 
94     // Fields below get set by GetStats as copies of atomics in MetadataPage.
95     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) failed_spinlocks;
96     PERFETTO_CROSS_ABI_ALIGNED(uint64_t) client_spinlock_blocked_us;
97     PERFETTO_CROSS_ABI_ALIGNED(ErrorState) error_state;
98   };
99 
100   static std::optional<SharedRingBuffer> Create(size_t);
101   static std::optional<SharedRingBuffer> Attach(base::ScopedFile);
102 
103   ~SharedRingBuffer();
104   SharedRingBuffer() = default;
105 
106   SharedRingBuffer(SharedRingBuffer&&) noexcept;
107   SharedRingBuffer& operator=(SharedRingBuffer&&) noexcept;
108 
is_valid()109   bool is_valid() const { return !!mem_; }
size()110   size_t size() const { return size_; }
fd()111   int fd() const { return *mem_fd_; }
write_avail()112   size_t write_avail() {
113     auto pos = GetPointerPositions();
114     if (!pos)
115       return 0;
116     return write_avail(*pos);
117   }
read_avail()118   size_t read_avail() {
119     auto pos = GetPointerPositions();
120     if (!pos)
121       return 0;
122     return read_avail(*pos);
123   }
124 
125   Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
126   void EndWrite(Buffer buf);
127 
128   Buffer BeginRead();
129   // Returns the number bytes read from the shared memory buffer. This is
130   // different than the number of bytes returned in the Buffer, because it
131   // includes the header size.
132   size_t EndRead(Buffer);
133 
GetStats(ScopedSpinlock & spinlock)134   Stats GetStats(ScopedSpinlock& spinlock) {
135     PERFETTO_DCHECK(spinlock.locked());
136     Stats stats = meta_->stats;
137     stats.failed_spinlocks =
138         meta_->failed_spinlocks.load(std::memory_order_relaxed);
139     stats.error_state = meta_->error_state.load(std::memory_order_relaxed);
140     stats.client_spinlock_blocked_us =
141         meta_->client_spinlock_blocked_us.load(std::memory_order_relaxed);
142     return stats;
143   }
144 
SetErrorState(ErrorState error)145   void SetErrorState(ErrorState error) { meta_->error_state.store(error); }
146 
147   // This is used by the caller to be able to hold the SpinLock after
148   // BeginWrite has returned. This is so that additional bookkeeping can be
149   // done under the lock. This will be used to increment the sequence_number.
AcquireLock(ScopedSpinlock::Mode mode)150   ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
151     auto lock = ScopedSpinlock(&meta_->spinlock, mode);
152     if (PERFETTO_UNLIKELY(!lock.locked()))
153       meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed);
154     return lock;
155   }
156 
AddClientSpinlockBlockedUs(size_t n)157   void AddClientSpinlockBlockedUs(size_t n) {
158     meta_->client_spinlock_blocked_us.fetch_add(n, std::memory_order_relaxed);
159   }
160 
client_spinlock_blocked_us()161   uint64_t client_spinlock_blocked_us() {
162     return meta_->client_spinlock_blocked_us;
163   }
164 
SetShuttingDown()165   void SetShuttingDown() {
166     meta_->shutting_down.store(true, std::memory_order_relaxed);
167   }
168 
shutting_down()169   bool shutting_down() {
170     return meta_->shutting_down.load(std::memory_order_relaxed);
171   }
172 
SetReaderPaused()173   void SetReaderPaused() {
174     meta_->reader_paused.store(true, std::memory_order_relaxed);
175   }
176 
GetAndResetReaderPaused()177   bool GetAndResetReaderPaused() {
178     return meta_->reader_paused.exchange(false, std::memory_order_relaxed);
179   }
180 
InfiniteBufferForTesting()181   void InfiniteBufferForTesting() {
182     // Pretend this buffer is really large, while keeping size_mask_ as
183     // original so it keeps wrapping in circles.
184     size_ = std::numeric_limits<size_t>::max() / 2;
185   }
186 
187   // Exposed for fuzzers.
188   struct MetadataPage {
189     static_assert(std::is_trivially_constructible<Spinlock>::value,
190                   "Spinlock needs to be trivially constructible.");
191     alignas(8) Spinlock spinlock;
192     PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) read_pos;
193     PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) write_pos;
194 
195     PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>)
196     client_spinlock_blocked_us;
197     PERFETTO_CROSS_ABI_ALIGNED(std::atomic<uint64_t>) failed_spinlocks;
198     PERFETTO_CROSS_ABI_ALIGNED(std::atomic<ErrorState>) error_state;
199     alignas(sizeof(uint64_t)) std::atomic<bool> shutting_down;
200     alignas(sizeof(uint64_t)) std::atomic<bool> reader_paused;
201     // For stats that are only accessed by a single thread or under the
202     // spinlock, members of this struct are directly modified. Other stats use
203     // the atomics above this struct.
204     //
205     // When the user requests stats, the atomics above get copied into this
206     // struct, which is then returned.
207     alignas(sizeof(uint64_t)) Stats stats;
208   };
209 
210   static_assert(sizeof(MetadataPage) == 144,
211                 "metadata page size needs to be ABI independent");
212 
213  private:
214   struct PointerPositions {
215     uint64_t read_pos;
216     uint64_t write_pos;
217   };
218 
219   struct CreateFlag {};
220   struct AttachFlag {};
221   SharedRingBuffer(const SharedRingBuffer&) = delete;
222   SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
223   SharedRingBuffer(CreateFlag, size_t size);
SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)224   SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
225     Initialize(std::move(mem_fd));
226   }
227 
228   void Initialize(base::ScopedFile mem_fd);
229   bool IsCorrupt(const PointerPositions& pos);
230 
GetPointerPositions()231   inline std::optional<PointerPositions> GetPointerPositions() {
232     PointerPositions pos;
233     // We need to acquire load the write_pos to make sure we observe a
234     // consistent ring buffer in BeginRead, otherwise it is possible that we
235     // observe the write_pos increment, but not the size field write of the
236     // payload.
237     //
238     // This is matched by a release at the end of BeginWrite.
239     pos.write_pos = meta_->write_pos.load(std::memory_order_acquire);
240     pos.read_pos = meta_->read_pos.load(std::memory_order_relaxed);
241 
242     std::optional<PointerPositions> result;
243     if (IsCorrupt(pos))
244       return result;
245     result = pos;
246     return result;
247   }
248 
set_size(size_t size)249   inline void set_size(size_t size) {
250     size_ = size;
251     size_mask_ = size - 1;
252   }
253 
read_avail(const PointerPositions & pos)254   inline size_t read_avail(const PointerPositions& pos) {
255     PERFETTO_DCHECK(pos.write_pos >= pos.read_pos);
256     auto res = static_cast<size_t>(pos.write_pos - pos.read_pos);
257     PERFETTO_DCHECK(res <= size_);
258     return res;
259   }
260 
write_avail(const PointerPositions & pos)261   inline size_t write_avail(const PointerPositions& pos) {
262     return size_ - read_avail(pos);
263   }
264 
at(uint64_t pos)265   inline uint8_t* at(uint64_t pos) { return mem_ + (pos & size_mask_); }
266 
267   base::ScopedFile mem_fd_;
268   MetadataPage* meta_ = nullptr;  // Start of the mmaped region.
269   uint8_t* mem_ = nullptr;  // Start of the contents (i.e. meta_ + kPageSize).
270 
271   // Size of the ring buffer contents, without including metadata or the 2nd
272   // mmap.
273   size_t size_ = 0;
274   size_t size_mask_ = 0;
275 
276   // Remember to update the move ctor when adding new fields.
277 };
278 
279 }  // namespace profiling
280 }  // namespace perfetto
281 
282 #endif  // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
283