#ifndef _DEBLOCKBUFFER_HPP #define _DEBLOCKBUFFER_HPP /*------------------------------------------------------------------------- * drawElements C++ Base Library * ----------------------------- * * Copyright 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * *//*! * \file * \brief Block-based thread-safe queue. *//*--------------------------------------------------------------------*/ #include "deBlockBuffer.hpp" #include "deMutex.hpp" #include "deSemaphore.h" #include namespace de { void BlockBuffer_selfTest (void); class BufferCanceledException : public std::exception { public: inline BufferCanceledException (void) {} inline ~BufferCanceledException (void) throw() {} const char* what (void) const throw() { return "BufferCanceledException"; } }; template class BlockBuffer { public: typedef BufferCanceledException CanceledException; BlockBuffer (int blockSize, int numBlocks); ~BlockBuffer (void); void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed. void write (int numElements, const T* elements); int tryWrite (int numElements, const T* elements); void flush (void); bool tryFlush (void); void read (int numElements, T* elements); int tryRead (int numElements, T* elements); void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException. bool isCanceled (void) const { return !!m_canceled; } private: BlockBuffer (const BlockBuffer& other); BlockBuffer& operator= (const BlockBuffer& other); int writeToCurrentBlock (int numElements, const T* elements, bool blocking); int readFromCurrentBlock(int numElements, T* elements, bool blocking); void flushWriteBlock (void); deSemaphore m_fill; //!< Block fill count. deSemaphore m_empty; //!< Block empty count. int m_writeBlock; //!< Current write block ndx. int m_writePos; //!< Position in block. 0 if block is not yet acquired. int m_readBlock; //!< Current read block ndx. int m_readPos; //!< Position in block. 0 if block is not yet acquired. int m_blockSize; int m_numBlocks; T* m_elements; int* m_numUsedInBlock; Mutex m_writeLock; Mutex m_readLock; volatile deUint32 m_canceled; } DE_WARN_UNUSED_TYPE; template BlockBuffer::BlockBuffer (int blockSize, int numBlocks) : m_fill (0) , m_empty (0) , m_writeBlock (0) , m_writePos (0) , m_readBlock (0) , m_readPos (0) , m_blockSize (blockSize) , m_numBlocks (numBlocks) , m_elements (DE_NULL) , m_numUsedInBlock (DE_NULL) , m_writeLock () , m_readLock () , m_canceled (DE_FALSE) { DE_ASSERT(blockSize > 0); DE_ASSERT(numBlocks > 0); try { m_elements = new T[m_numBlocks*m_blockSize]; m_numUsedInBlock = new int[m_numBlocks]; } catch (...) { delete[] m_elements; delete[] m_numUsedInBlock; throw; } m_fill = deSemaphore_create(0, DE_NULL); m_empty = deSemaphore_create(numBlocks, DE_NULL); DE_ASSERT(m_fill && m_empty); } template BlockBuffer::~BlockBuffer (void) { delete[] m_elements; delete[] m_numUsedInBlock; deSemaphore_destroy(m_fill); deSemaphore_destroy(m_empty); } template void BlockBuffer::clear (void) { ScopedLock readLock (m_readLock); ScopedLock writeLock (m_writeLock); deSemaphore_destroy(m_fill); deSemaphore_destroy(m_empty); m_fill = deSemaphore_create(0, DE_NULL); m_empty = deSemaphore_create(m_numBlocks, DE_NULL); m_writeBlock = 0; m_writePos = 0; m_readBlock = 0; m_readPos = 0; m_canceled = DE_FALSE; DE_ASSERT(m_fill && m_empty); } template void BlockBuffer::cancel (void) { DE_ASSERT(!m_canceled); m_canceled = DE_TRUE; deSemaphore_increment(m_empty); deSemaphore_increment(m_fill); } template int BlockBuffer::writeToCurrentBlock (int numElements, const T* elements, bool blocking) { DE_ASSERT(numElements > 0 && elements != DE_NULL); if (m_writePos == 0) { /* Write thread doesn't own current block - need to acquire. */ if (blocking) deSemaphore_decrement(m_empty); else { if (!deSemaphore_tryDecrement(m_empty)) return 0; } /* Check for canceled bit. */ if (m_canceled) { // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here. deSemaphore_increment(m_empty); m_writeLock.unlock(); throw CanceledException(); } } /* Write thread owns current block. */ T* block = m_elements + m_writeBlock*m_blockSize; int numToWrite = de::min(numElements, m_blockSize-m_writePos); DE_ASSERT(numToWrite > 0); for (int ndx = 0; ndx < numToWrite; ndx++) block[m_writePos+ndx] = elements[ndx]; m_writePos += numToWrite; if (m_writePos == m_blockSize) flushWriteBlock(); /* Flush current write block. */ return numToWrite; } template int BlockBuffer::readFromCurrentBlock (int numElements, T* elements, bool blocking) { DE_ASSERT(numElements > 0 && elements != DE_NULL); if (m_readPos == 0) { /* Read thread doesn't own current block - need to acquire. */ if (blocking) deSemaphore_decrement(m_fill); else { if (!deSemaphore_tryDecrement(m_fill)) return 0; } /* Check for canceled bit. */ if (m_canceled) { // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here. deSemaphore_increment(m_fill); m_readLock.unlock(); throw CanceledException(); } } /* Read thread now owns current block. */ const T* block = m_elements + m_readBlock*m_blockSize; int numUsedInBlock = m_numUsedInBlock[m_readBlock]; int numToRead = de::min(numElements, numUsedInBlock-m_readPos); DE_ASSERT(numToRead > 0); for (int ndx = 0; ndx < numToRead; ndx++) elements[ndx] = block[m_readPos+ndx]; m_readPos += numToRead; if (m_readPos == numUsedInBlock) { /* Free current read block and advance. */ m_readBlock = (m_readBlock+1) % m_numBlocks; m_readPos = 0; deSemaphore_increment(m_empty); } return numToRead; } template int BlockBuffer::tryWrite (int numElements, const T* elements) { int numWritten = 0; DE_ASSERT(numElements > 0 && elements != DE_NULL); if (m_canceled) throw CanceledException(); if (!m_writeLock.tryLock()) return numWritten; while (numWritten < numElements) { int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */); if (ret == 0) break; /* Write failed. */ numWritten += ret; } m_writeLock.unlock(); return numWritten; } template void BlockBuffer::write (int numElements, const T* elements) { DE_ASSERT(numElements > 0 && elements != DE_NULL); if (m_canceled) throw CanceledException(); m_writeLock.lock(); int numWritten = 0; while (numWritten < numElements) numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */); m_writeLock.unlock(); } template void BlockBuffer::flush (void) { m_writeLock.lock(); if (m_writePos > 0) flushWriteBlock(); m_writeLock.unlock(); } template bool BlockBuffer::tryFlush (void) { if (!m_writeLock.tryLock()) return false; if (m_writePos > 0) flushWriteBlock(); m_writeLock.unlock(); return true; } template void BlockBuffer::flushWriteBlock (void) { DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize)); m_numUsedInBlock[m_writeBlock] = m_writePos; m_writeBlock = (m_writeBlock+1) % m_numBlocks; m_writePos = 0; deSemaphore_increment(m_fill); } template int BlockBuffer::tryRead (int numElements, T* elements) { int numRead = 0; if (m_canceled) throw CanceledException(); if (!m_readLock.tryLock()) return numRead; while (numRead < numElements) { int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */); if (ret == 0) break; /* Failed. */ numRead += ret; } m_readLock.unlock(); return numRead; } template void BlockBuffer::read (int numElements, T* elements) { DE_ASSERT(numElements > 0 && elements != DE_NULL); if (m_canceled) throw CanceledException(); m_readLock.lock(); int numRead = 0; while (numRead < numElements) numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */); m_readLock.unlock(); } } // de #endif // _DEBLOCKBUFFER_HPP