1 #ifndef _DEBLOCKBUFFER_HPP
2 #define _DEBLOCKBUFFER_HPP
3 /*-------------------------------------------------------------------------
4 * drawElements C++ Base Library
5 * -----------------------------
6 *
7 * Copyright 2014 The Android Open Source Project
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 *
21 *//*!
22 * \file
23 * \brief Block-based thread-safe queue.
24 *//*--------------------------------------------------------------------*/
25
26 #include "deBlockBuffer.hpp"
27 #include "deMutex.hpp"
28 #include "deSemaphore.h"
29
30 #include <exception>
31
32 namespace de
33 {
34
35 void BlockBuffer_selfTest (void);
36
37 class BufferCanceledException : public std::exception
38 {
39 public:
BufferCanceledException(void)40 inline BufferCanceledException (void) {}
~BufferCanceledException(void)41 inline ~BufferCanceledException (void) throw() {}
42
what(void) const43 const char* what (void) const throw() { return "BufferCanceledException"; }
44 };
45
46 template <typename T>
47 class BlockBuffer
48 {
49 public:
50 typedef BufferCanceledException CanceledException;
51
52 BlockBuffer (int blockSize, int numBlocks);
53 ~BlockBuffer (void);
54
55 void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed.
56
57 void write (int numElements, const T* elements);
58 int tryWrite (int numElements, const T* elements);
59 void flush (void);
60 bool tryFlush (void);
61
62 void read (int numElements, T* elements);
63 int tryRead (int numElements, T* elements);
64
65 void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
isCanceled(void) const66 bool isCanceled (void) const { return !!m_canceled; }
67
68 private:
69 BlockBuffer (const BlockBuffer& other);
70 BlockBuffer& operator= (const BlockBuffer& other);
71
72 int writeToCurrentBlock (int numElements, const T* elements, bool blocking);
73 int readFromCurrentBlock(int numElements, T* elements, bool blocking);
74
75 void flushWriteBlock (void);
76
77 deSemaphore m_fill; //!< Block fill count.
78 deSemaphore m_empty; //!< Block empty count.
79
80 int m_writeBlock; //!< Current write block ndx.
81 int m_writePos; //!< Position in block. 0 if block is not yet acquired.
82
83 int m_readBlock; //!< Current read block ndx.
84 int m_readPos; //!< Position in block. 0 if block is not yet acquired.
85
86 int m_blockSize;
87 int m_numBlocks;
88
89 T* m_elements;
90 int* m_numUsedInBlock;
91
92 Mutex m_writeLock;
93 Mutex m_readLock;
94
95 volatile deUint32 m_canceled;
96 } DE_WARN_UNUSED_TYPE;
97
98 template <typename T>
BlockBuffer(int blockSize,int numBlocks)99 BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks)
100 : m_fill (0)
101 , m_empty (0)
102 , m_writeBlock (0)
103 , m_writePos (0)
104 , m_readBlock (0)
105 , m_readPos (0)
106 , m_blockSize (blockSize)
107 , m_numBlocks (numBlocks)
108 , m_elements (DE_NULL)
109 , m_numUsedInBlock (DE_NULL)
110 , m_writeLock ()
111 , m_readLock ()
112 , m_canceled (DE_FALSE)
113 {
114 DE_ASSERT(blockSize > 0);
115 DE_ASSERT(numBlocks > 0);
116
117 try
118 {
119 m_elements = new T[m_numBlocks*m_blockSize];
120 m_numUsedInBlock = new int[m_numBlocks];
121 }
122 catch (...)
123 {
124 delete[] m_elements;
125 delete[] m_numUsedInBlock;
126 throw;
127 }
128
129 m_fill = deSemaphore_create(0, DE_NULL);
130 m_empty = deSemaphore_create(numBlocks, DE_NULL);
131 DE_ASSERT(m_fill && m_empty);
132 }
133
134 template <typename T>
~BlockBuffer(void)135 BlockBuffer<T>::~BlockBuffer (void)
136 {
137 delete[] m_elements;
138 delete[] m_numUsedInBlock;
139
140 deSemaphore_destroy(m_fill);
141 deSemaphore_destroy(m_empty);
142 }
143
144 template <typename T>
clear(void)145 void BlockBuffer<T>::clear (void)
146 {
147 ScopedLock readLock (m_readLock);
148 ScopedLock writeLock (m_writeLock);
149
150 deSemaphore_destroy(m_fill);
151 deSemaphore_destroy(m_empty);
152
153 m_fill = deSemaphore_create(0, DE_NULL);
154 m_empty = deSemaphore_create(m_numBlocks, DE_NULL);
155 m_writeBlock = 0;
156 m_writePos = 0;
157 m_readBlock = 0;
158 m_readPos = 0;
159 m_canceled = DE_FALSE;
160
161 DE_ASSERT(m_fill && m_empty);
162 }
163
164 template <typename T>
cancel(void)165 void BlockBuffer<T>::cancel (void)
166 {
167 DE_ASSERT(!m_canceled);
168 m_canceled = DE_TRUE;
169
170 deSemaphore_increment(m_empty);
171 deSemaphore_increment(m_fill);
172 }
173
174 template <typename T>
writeToCurrentBlock(int numElements,const T * elements,bool blocking)175 int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking)
176 {
177 DE_ASSERT(numElements > 0 && elements != DE_NULL);
178
179 if (m_writePos == 0)
180 {
181 /* Write thread doesn't own current block - need to acquire. */
182 if (blocking)
183 deSemaphore_decrement(m_empty);
184 else
185 {
186 if (!deSemaphore_tryDecrement(m_empty))
187 return 0;
188 }
189
190 /* Check for canceled bit. */
191 if (m_canceled)
192 {
193 // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
194 deSemaphore_increment(m_empty);
195 m_writeLock.unlock();
196 throw CanceledException();
197 }
198 }
199
200 /* Write thread owns current block. */
201 T* block = m_elements + m_writeBlock*m_blockSize;
202 int numToWrite = de::min(numElements, m_blockSize-m_writePos);
203
204 DE_ASSERT(numToWrite > 0);
205
206 for (int ndx = 0; ndx < numToWrite; ndx++)
207 block[m_writePos+ndx] = elements[ndx];
208
209 m_writePos += numToWrite;
210
211 if (m_writePos == m_blockSize)
212 flushWriteBlock(); /* Flush current write block. */
213
214 return numToWrite;
215 }
216
217 template <typename T>
readFromCurrentBlock(int numElements,T * elements,bool blocking)218 int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking)
219 {
220 DE_ASSERT(numElements > 0 && elements != DE_NULL);
221
222 if (m_readPos == 0)
223 {
224 /* Read thread doesn't own current block - need to acquire. */
225 if (blocking)
226 deSemaphore_decrement(m_fill);
227 else
228 {
229 if (!deSemaphore_tryDecrement(m_fill))
230 return 0;
231 }
232
233 /* Check for canceled bit. */
234 if (m_canceled)
235 {
236 // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
237 deSemaphore_increment(m_fill);
238 m_readLock.unlock();
239 throw CanceledException();
240 }
241 }
242
243 /* Read thread now owns current block. */
244 const T* block = m_elements + m_readBlock*m_blockSize;
245 int numUsedInBlock = m_numUsedInBlock[m_readBlock];
246 int numToRead = de::min(numElements, numUsedInBlock-m_readPos);
247
248 DE_ASSERT(numToRead > 0);
249
250 for (int ndx = 0; ndx < numToRead; ndx++)
251 elements[ndx] = block[m_readPos+ndx];
252
253 m_readPos += numToRead;
254
255 if (m_readPos == numUsedInBlock)
256 {
257 /* Free current read block and advance. */
258 m_readBlock = (m_readBlock+1) % m_numBlocks;
259 m_readPos = 0;
260 deSemaphore_increment(m_empty);
261 }
262
263 return numToRead;
264 }
265
266 template <typename T>
tryWrite(int numElements,const T * elements)267 int BlockBuffer<T>::tryWrite (int numElements, const T* elements)
268 {
269 int numWritten = 0;
270
271 DE_ASSERT(numElements > 0 && elements != DE_NULL);
272
273 if (m_canceled)
274 throw CanceledException();
275
276 if (!m_writeLock.tryLock())
277 return numWritten;
278
279 while (numWritten < numElements)
280 {
281 int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */);
282
283 if (ret == 0)
284 break; /* Write failed. */
285
286 numWritten += ret;
287 }
288
289 m_writeLock.unlock();
290
291 return numWritten;
292 }
293
294 template <typename T>
write(int numElements,const T * elements)295 void BlockBuffer<T>::write (int numElements, const T* elements)
296 {
297 DE_ASSERT(numElements > 0 && elements != DE_NULL);
298
299 if (m_canceled)
300 throw CanceledException();
301
302 m_writeLock.lock();
303
304 int numWritten = 0;
305 while (numWritten < numElements)
306 numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */);
307
308 m_writeLock.unlock();
309 }
310
311 template <typename T>
flush(void)312 void BlockBuffer<T>::flush (void)
313 {
314 m_writeLock.lock();
315
316 if (m_writePos > 0)
317 flushWriteBlock();
318
319 m_writeLock.unlock();
320 }
321
322 template <typename T>
tryFlush(void)323 bool BlockBuffer<T>::tryFlush (void)
324 {
325 if (!m_writeLock.tryLock())
326 return false;
327
328 if (m_writePos > 0)
329 flushWriteBlock();
330
331 m_writeLock.unlock();
332
333 return true;
334 }
335
336 template <typename T>
flushWriteBlock(void)337 void BlockBuffer<T>::flushWriteBlock (void)
338 {
339 DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
340
341 m_numUsedInBlock[m_writeBlock] = m_writePos;
342 m_writeBlock = (m_writeBlock+1) % m_numBlocks;
343 m_writePos = 0;
344 deSemaphore_increment(m_fill);
345 }
346
347 template <typename T>
tryRead(int numElements,T * elements)348 int BlockBuffer<T>::tryRead (int numElements, T* elements)
349 {
350 int numRead = 0;
351
352 if (m_canceled)
353 throw CanceledException();
354
355 if (!m_readLock.tryLock())
356 return numRead;
357
358 while (numRead < numElements)
359 {
360 int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */);
361
362 if (ret == 0)
363 break; /* Failed. */
364
365 numRead += ret;
366 }
367
368 m_readLock.unlock();
369
370 return numRead;
371 }
372
373 template <typename T>
read(int numElements,T * elements)374 void BlockBuffer<T>::read (int numElements, T* elements)
375 {
376 DE_ASSERT(numElements > 0 && elements != DE_NULL);
377
378 if (m_canceled)
379 throw CanceledException();
380
381 m_readLock.lock();
382
383 int numRead = 0;
384 while (numRead < numElements)
385 numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */);
386
387 m_readLock.unlock();
388 }
389
390 } // de
391
392 #endif // _DEBLOCKBUFFER_HPP
393