1 /*
2 * Copyright 2019 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/memory/fifo_buffer.h"
12
13 #include <algorithm>
14
15 #include "rtc_base/thread.h"
16
17 namespace rtc {
18
FifoBuffer(size_t size)19 FifoBuffer::FifoBuffer(size_t size)
20 : state_(SS_OPEN),
21 buffer_(new char[size]),
22 buffer_length_(size),
23 data_length_(0),
24 read_position_(0),
25 owner_(Thread::Current()) {
26 // all events are done on the owner_ thread
27 }
28
FifoBuffer(size_t size,Thread * owner)29 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
30 : state_(SS_OPEN),
31 buffer_(new char[size]),
32 buffer_length_(size),
33 data_length_(0),
34 read_position_(0),
35 owner_(owner) {
36 // all events are done on the owner_ thread
37 }
38
~FifoBuffer()39 FifoBuffer::~FifoBuffer() {}
40
GetBuffered(size_t * size) const41 bool FifoBuffer::GetBuffered(size_t* size) const {
42 webrtc::MutexLock lock(&mutex_);
43 *size = data_length_;
44 return true;
45 }
46
SetCapacity(size_t size)47 bool FifoBuffer::SetCapacity(size_t size) {
48 webrtc::MutexLock lock(&mutex_);
49 if (data_length_ > size) {
50 return false;
51 }
52
53 if (size != buffer_length_) {
54 char* buffer = new char[size];
55 const size_t copy = data_length_;
56 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
57 memcpy(buffer, &buffer_[read_position_], tail_copy);
58 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
59 buffer_.reset(buffer);
60 read_position_ = 0;
61 buffer_length_ = size;
62 }
63 return true;
64 }
65
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)66 StreamResult FifoBuffer::ReadOffset(void* buffer,
67 size_t bytes,
68 size_t offset,
69 size_t* bytes_read) {
70 webrtc::MutexLock lock(&mutex_);
71 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
72 }
73
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)74 StreamResult FifoBuffer::WriteOffset(const void* buffer,
75 size_t bytes,
76 size_t offset,
77 size_t* bytes_written) {
78 webrtc::MutexLock lock(&mutex_);
79 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
80 }
81
GetState() const82 StreamState FifoBuffer::GetState() const {
83 webrtc::MutexLock lock(&mutex_);
84 return state_;
85 }
86
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)87 StreamResult FifoBuffer::Read(void* buffer,
88 size_t bytes,
89 size_t* bytes_read,
90 int* error) {
91 webrtc::MutexLock lock(&mutex_);
92 const bool was_writable = data_length_ < buffer_length_;
93 size_t copy = 0;
94 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
95
96 if (result == SR_SUCCESS) {
97 // If read was successful then adjust the read position and number of
98 // bytes buffered.
99 read_position_ = (read_position_ + copy) % buffer_length_;
100 data_length_ -= copy;
101 if (bytes_read) {
102 *bytes_read = copy;
103 }
104
105 // if we were full before, and now we're not, post an event
106 if (!was_writable && copy > 0) {
107 PostEvent(owner_, SE_WRITE, 0);
108 }
109 }
110 return result;
111 }
112
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)113 StreamResult FifoBuffer::Write(const void* buffer,
114 size_t bytes,
115 size_t* bytes_written,
116 int* error) {
117 webrtc::MutexLock lock(&mutex_);
118
119 const bool was_readable = (data_length_ > 0);
120 size_t copy = 0;
121 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
122
123 if (result == SR_SUCCESS) {
124 // If write was successful then adjust the number of readable bytes.
125 data_length_ += copy;
126 if (bytes_written) {
127 *bytes_written = copy;
128 }
129
130 // if we didn't have any data to read before, and now we do, post an event
131 if (!was_readable && copy > 0) {
132 PostEvent(owner_, SE_READ, 0);
133 }
134 }
135 return result;
136 }
137
Close()138 void FifoBuffer::Close() {
139 webrtc::MutexLock lock(&mutex_);
140 state_ = SS_CLOSED;
141 }
142
GetReadData(size_t * size)143 const void* FifoBuffer::GetReadData(size_t* size) {
144 webrtc::MutexLock lock(&mutex_);
145 *size = (read_position_ + data_length_ <= buffer_length_)
146 ? data_length_
147 : buffer_length_ - read_position_;
148 return &buffer_[read_position_];
149 }
150
ConsumeReadData(size_t size)151 void FifoBuffer::ConsumeReadData(size_t size) {
152 webrtc::MutexLock lock(&mutex_);
153 RTC_DCHECK(size <= data_length_);
154 const bool was_writable = data_length_ < buffer_length_;
155 read_position_ = (read_position_ + size) % buffer_length_;
156 data_length_ -= size;
157 if (!was_writable && size > 0) {
158 PostEvent(owner_, SE_WRITE, 0);
159 }
160 }
161
GetWriteBuffer(size_t * size)162 void* FifoBuffer::GetWriteBuffer(size_t* size) {
163 webrtc::MutexLock lock(&mutex_);
164 if (state_ == SS_CLOSED) {
165 return nullptr;
166 }
167
168 // if empty, reset the write position to the beginning, so we can get
169 // the biggest possible block
170 if (data_length_ == 0) {
171 read_position_ = 0;
172 }
173
174 const size_t write_position =
175 (read_position_ + data_length_) % buffer_length_;
176 *size = (write_position > read_position_ || data_length_ == 0)
177 ? buffer_length_ - write_position
178 : read_position_ - write_position;
179 return &buffer_[write_position];
180 }
181
ConsumeWriteBuffer(size_t size)182 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
183 webrtc::MutexLock lock(&mutex_);
184 RTC_DCHECK(size <= buffer_length_ - data_length_);
185 const bool was_readable = (data_length_ > 0);
186 data_length_ += size;
187 if (!was_readable && size > 0) {
188 PostEvent(owner_, SE_READ, 0);
189 }
190 }
191
GetWriteRemaining(size_t * size) const192 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
193 webrtc::MutexLock lock(&mutex_);
194 *size = buffer_length_ - data_length_;
195 return true;
196 }
197
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)198 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
199 size_t bytes,
200 size_t offset,
201 size_t* bytes_read) {
202 if (offset >= data_length_) {
203 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
204 }
205
206 const size_t available = data_length_ - offset;
207 const size_t read_position = (read_position_ + offset) % buffer_length_;
208 const size_t copy = std::min(bytes, available);
209 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
210 char* const p = static_cast<char*>(buffer);
211 memcpy(p, &buffer_[read_position], tail_copy);
212 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
213
214 if (bytes_read) {
215 *bytes_read = copy;
216 }
217 return SR_SUCCESS;
218 }
219
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)220 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
221 size_t bytes,
222 size_t offset,
223 size_t* bytes_written) {
224 if (state_ == SS_CLOSED) {
225 return SR_EOS;
226 }
227
228 if (data_length_ + offset >= buffer_length_) {
229 return SR_BLOCK;
230 }
231
232 const size_t available = buffer_length_ - data_length_ - offset;
233 const size_t write_position =
234 (read_position_ + data_length_ + offset) % buffer_length_;
235 const size_t copy = std::min(bytes, available);
236 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
237 const char* const p = static_cast<const char*>(buffer);
238 memcpy(&buffer_[write_position], p, tail_copy);
239 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
240
241 if (bytes_written) {
242 *bytes_written = copy;
243 }
244 return SR_SUCCESS;
245 }
246
247 } // namespace rtc
248