• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
19 
20 #include "perfetto/ext/base/optional.h"
21 #include "perfetto/ext/base/unix_socket.h"
22 #include "perfetto/ext/base/utils.h"
23 #include "src/profiling/memory/scoped_spinlock.h"
24 
25 #include <atomic>
26 #include <map>
27 #include <memory>
28 
29 #include <stdint.h>
30 
31 namespace perfetto {
32 namespace profiling {
33 
34 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a
35 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
36 // + O_NONBLOCK socket, specifically:
37 //
38 // - Writes are atomic, data is either written fully in the buffer or not.
39 // - New writes are discarded if the buffer is full.
40 // - If a write succeeds, the reader is guaranteed to see the whole buffer.
41 // - Reads are atomic, no fragmentation.
42 // - The reader sees writes in write order (% discarding).
43 //
44 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
45 // *IMPORTANT*: The ring buffer must be written under the assumption that the
46 // other end modifies arbitrary shared memory without holding the spin-lock.
47 // This means we must make local copies of read and write pointers for doing
48 // bounds checks followed by reads / writes, as they might change in the
49 // meantime.
50 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
51 //
52 // TODO:
53 // - Write a benchmark.
54 class SharedRingBuffer {
55  public:
56   class Buffer {
57    public:
Buffer()58     Buffer() {}
Buffer(uint8_t * d,size_t s)59     Buffer(uint8_t* d, size_t s) : data(d), size(s) {}
60 
61     Buffer(const Buffer&) = delete;
62     Buffer& operator=(const Buffer&) = delete;
63 
64     Buffer(Buffer&&) = default;
65     Buffer& operator=(Buffer&&) = default;
66 
67     operator bool() const { return data != nullptr; }
68 
69     uint8_t* data = nullptr;
70     size_t size = 0;
71   };
72 
73   struct Stats {
74     uint64_t bytes_written;
75     uint64_t num_writes_succeeded;
76     uint64_t num_writes_corrupt;
77     uint64_t num_writes_overflow;
78 
79     uint64_t num_reads_succeeded;
80     uint64_t num_reads_corrupt;
81     uint64_t num_reads_nodata;
82 
83     // Fields below get set by GetStats as copies of atomics in MetadataPage.
84     uint64_t failed_spinlocks;
85   };
86 
87   static base::Optional<SharedRingBuffer> Create(size_t);
88   static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
89 
90   ~SharedRingBuffer();
91   SharedRingBuffer() = default;
92 
93   SharedRingBuffer(SharedRingBuffer&&) noexcept;
94   SharedRingBuffer& operator=(SharedRingBuffer&&);
95 
is_valid()96   bool is_valid() const { return !!mem_; }
size()97   size_t size() const { return size_; }
fd()98   int fd() const { return *mem_fd_; }
99 
100   Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
101   void EndWrite(Buffer buf);
102 
103   Buffer BeginRead();
104   void EndRead(Buffer);
105 
GetStats(ScopedSpinlock & spinlock)106   Stats GetStats(ScopedSpinlock& spinlock) {
107     PERFETTO_DCHECK(spinlock.locked());
108     Stats stats = meta_->stats;
109     stats.failed_spinlocks =
110         meta_->failed_spinlocks.load(std::memory_order_relaxed);
111     return stats;
112   }
113 
114   // This is used by the caller to be able to hold the SpinLock after
115   // BeginWrite has returned. This is so that additional bookkeeping can be
116   // done under the lock. This will be used to increment the sequence_number.
AcquireLock(ScopedSpinlock::Mode mode)117   ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
118     auto lock = ScopedSpinlock(&meta_->spinlock, mode);
119     if (PERFETTO_UNLIKELY(!lock.locked()))
120       meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed);
121     return lock;
122   }
123 
124   // Exposed for fuzzers.
125   struct MetadataPage {
126     alignas(uint64_t) std::atomic<bool> spinlock;
127     std::atomic<uint64_t> read_pos;
128     std::atomic<uint64_t> write_pos;
129 
130     std::atomic<uint64_t> failed_spinlocks;
131     // For stats that are only accessed by a single thread or under the
132     // spinlock, members of this struct are directly modified. Other stats use
133     // the atomics above this struct.
134     //
135     // When the user requests stats, the atomics above get copied into this
136     // struct, which is then returned.
137     Stats stats;
138   };
139 
140  private:
141   struct PointerPositions {
142     uint64_t read_pos;
143     uint64_t write_pos;
144   };
145 
146   struct CreateFlag {};
147   struct AttachFlag {};
148   SharedRingBuffer(const SharedRingBuffer&) = delete;
149   SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
150   SharedRingBuffer(CreateFlag, size_t size);
SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)151   SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
152     Initialize(std::move(mem_fd));
153   }
154 
155   void Initialize(base::ScopedFile mem_fd);
156   bool IsCorrupt(const PointerPositions& pos);
157 
GetPointerPositions()158   inline base::Optional<PointerPositions> GetPointerPositions() {
159     PointerPositions pos;
160     // We need to acquire load the write_pos to make sure we observe a
161     // consistent ring buffer in BeginRead, otherwise it is possible that we
162     // observe the write_pos increment, but not the size field write of the
163     // payload.
164     //
165     // This is matched by a release at the end of BeginWrite.
166     pos.write_pos = meta_->write_pos.load(std::memory_order_acquire);
167     pos.read_pos = meta_->read_pos.load(std::memory_order_relaxed);
168 
169     base::Optional<PointerPositions> result;
170     if (IsCorrupt(pos))
171       return result;
172     result = pos;
173     return result;
174   }
175 
read_avail(const PointerPositions & pos)176   inline size_t read_avail(const PointerPositions& pos) {
177     PERFETTO_DCHECK(pos.write_pos >= pos.read_pos);
178     auto res = static_cast<size_t>(pos.write_pos - pos.read_pos);
179     PERFETTO_DCHECK(res <= size_);
180     return res;
181   }
182 
write_avail(const PointerPositions & pos)183   inline size_t write_avail(const PointerPositions& pos) {
184     return size_ - read_avail(pos);
185   }
186 
at(uint64_t pos)187   inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); }
188 
189   base::ScopedFile mem_fd_;
190   MetadataPage* meta_ = nullptr;  // Start of the mmaped region.
191   uint8_t* mem_ = nullptr;  // Start of the contents (i.e. meta_ + kPageSize).
192 
193   // Size of the ring buffer contents, without including metadata or the 2nd
194   // mmap.
195   size_t size_ = 0;
196 
197   // Remember to update the move ctor when adding new fields.
198 };
199 
200 }  // namespace profiling
201 }  // namespace perfetto
202 
203 #endif  // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
204