1 // Copyright (C) 2016 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #pragma once 15 16 #include "base/Compiler.h" 17 #include "base/Stream.h" 18 #include "base/StreamSerializing.h" 19 #include "base/ConditionVariable.h" 20 #include "base/Lock.h" 21 22 #include <iterator> 23 #include <vector> 24 #include <utility> 25 26 #include <assert.h> 27 #include <stddef.h> 28 29 namespace android { 30 namespace base { 31 32 // Values corresponding to the result of BufferQueue operations. 33 // |Ok| means everything went well. 34 // |TryAgain| means the operation could not be performed and should be 35 // tried later. 36 // |Error| means an error happened (i.e. the BufferQueue is closed). 37 // |Timeout| means that an item could not be popped in time. 38 enum class BufferQueueResult { 39 Ok = 0, 40 TryAgain = 1, 41 Error = 2, 42 Timeout = 3, 43 }; 44 45 // BufferQueue models a FIFO queue of <T> instances 46 // that can be used between two different threads. Note that it depends, 47 // for synchronization, on an external lock (passed as a reference in 48 // the BufferQueue constructor). 49 // 50 // This allows one to use multiple BufferQueue instances whose content 51 // are protected by a single lock. 52 template <class T> 53 class BufferQueue { 54 using ConditionVariable = android::base::ConditionVariable; 55 using Lock = android::base::Lock; 56 using AutoLock = android::base::AutoLock; 57 58 public: 59 using value_type = T; 60 61 // Constructor. |capacity| is the maximum number of T instances in 62 // the queue, and |lock| is a reference to an external lock provided by 63 // the caller. BufferQueue(int capacity,android::base::Lock & lock)64 BufferQueue(int capacity, android::base::Lock& lock) 65 : mBuffers(capacity), mLock(lock) {} 66 67 // Return true iff one can send a buffer to the queue, i.e. if it 68 // is not full or it would grow anyway. canPushLocked()69 bool canPushLocked() const { return !mClosed && mCount < (int)mBuffers.size(); } 70 71 // Return true iff one can receive a buffer from the queue, i.e. if 72 // it is not empty. canPopLocked()73 bool canPopLocked() const { return mCount > 0; } 74 75 // Return true iff the queue is closed. isClosedLocked()76 bool isClosedLocked() const { return mClosed; } 77 78 // Changes the operation mode to snapshot or back. In snapshot mode 79 // BufferQueue accepts all write requests and accumulates the data, but 80 // returns error on all reads. setSnapshotModeLocked(bool on)81 void setSnapshotModeLocked(bool on) { 82 mSnapshotMode = on; 83 if (on && !mClosed) { 84 wakeAllWaiters(); 85 } 86 } 87 88 // Try to send a buffer to the queue. On success, return BufferQueueResult::Ok 89 // and moves |buffer| to the queue. On failure, return 90 // BufferQueueResult::TryAgain if the queue was full, or BufferQueueResult::Error 91 // if it was closed. 92 // Note: in snapshot mode it never returns TryAgain, but grows the max 93 // queue size instead. tryPushLocked(T && buffer)94 BufferQueueResult tryPushLocked(T&& buffer) { 95 if (mClosed) { 96 return BufferQueueResult::Error; 97 } 98 if (mCount >= (int)mBuffers.size()) { 99 if (mSnapshotMode) { 100 grow(); 101 } else { 102 return BufferQueueResult::TryAgain; 103 } 104 } 105 int pos = mPos + mCount; 106 if (pos >= (int)mBuffers.size()) { 107 pos -= mBuffers.size(); 108 } 109 mBuffers[pos] = std::move(buffer); 110 if (mCount++ == 0) { 111 mCanPop.signal(); 112 } 113 return BufferQueueResult::Ok; 114 } 115 116 // Push a buffer to the queue. This is a blocking call. On success, 117 // move |buffer| into the queue and return BufferQueueResult::Ok. On failure, 118 // return BufferQueueResult::Error meaning the queue was closed. pushLocked(T && buffer)119 BufferQueueResult pushLocked(T&& buffer) { 120 while (mCount == (int)mBuffers.size() && !mSnapshotMode) { 121 if (mClosed) { 122 return BufferQueueResult::Error; 123 } 124 mCanPush.wait(&mLock); 125 } 126 return tryPushLocked(std::move(buffer)); 127 } 128 129 // Try to read a buffer from the queue. On success, moves item into 130 // |*buffer| and return BufferQueueResult::Ok. On failure, return BufferQueueResult::Error 131 // if the queue is empty and closed or in snapshot mode, and 132 // BufferQueueResult::TryAgain if it is empty but not closed. tryPopLocked(T * buffer)133 BufferQueueResult tryPopLocked(T* buffer) { 134 if (mCount == 0) { 135 return (mClosed || mSnapshotMode) ? BufferQueueResult::Error 136 : BufferQueueResult::TryAgain; 137 } 138 *buffer = std::move(mBuffers[mPos]); 139 int pos = mPos + 1; 140 if (pos >= (int)mBuffers.size()) { 141 pos -= mBuffers.size(); 142 } 143 mPos = pos; 144 if (mCount-- == (int)mBuffers.size()) { 145 mCanPush.signal(); 146 } 147 return BufferQueueResult::Ok; 148 } 149 150 // Pop a buffer from the queue. This is a blocking call. On success, 151 // move item into |*buffer| and return BufferQueueResult::Ok. On failure, 152 // return BufferQueueResult::Error to indicate the queue was closed or is in 153 // snapshot mode. popLocked(T * buffer)154 BufferQueueResult popLocked(T* buffer) { 155 while (mCount == 0 && !mSnapshotMode) { 156 if (mClosed) { 157 // Closed queue is empty. 158 return BufferQueueResult::Error; 159 } 160 mCanPop.wait(&mLock); 161 } 162 return tryPopLocked(buffer); 163 } 164 165 // Pop a buffer from the queue. This is a blocking call. On success, 166 // move item into |*buffer| and return BufferQueueResult::Ok. On failure, 167 // return BufferQueueResult::Error to indicate the queue was closed or is in 168 // snapshot mode. Returns BufferQueueResult::Timeout if we waited passed 169 // waitUntilUs. popLockedBefore(T * buffer,uint64_t waitUntilUs)170 BufferQueueResult popLockedBefore(T* buffer, uint64_t waitUntilUs) { 171 while (mCount == 0 && !mSnapshotMode) { 172 if (mClosed) { 173 // Closed queue is empty. 174 return BufferQueueResult::Error; 175 } 176 if (!mCanPop.timedWait(&mLock, waitUntilUs)) { 177 return BufferQueueResult::Timeout; 178 } 179 180 } 181 return tryPopLocked(buffer); 182 } 183 184 // Close the queue, it is no longer possible to push new items 185 // to it (i.e. push() will always return BufferQueueResult::Error), or to 186 // read from an empty queue (i.e. pop() will always return 187 // BufferQueueResult::Error once the queue becomes empty). closeLocked()188 void closeLocked() { 189 mClosed = true; 190 wakeAllWaiters(); 191 } 192 193 // Save to a snapshot file onSaveLocked(android::base::Stream * stream)194 void onSaveLocked(android::base::Stream* stream) { 195 stream->putByte(mClosed); 196 if (!mClosed) { 197 stream->putBe32(mCount); 198 for (int i = 0; i < mCount; i++) { 199 android::base::saveBuffer( 200 stream, mBuffers[(i + mPos) % mBuffers.size()]); 201 } 202 } 203 } 204 onLoadLocked(android::base::Stream * stream)205 bool onLoadLocked(android::base::Stream* stream) { 206 mClosed = stream->getByte(); 207 if (!mClosed) { 208 mCount = stream->getBe32(); 209 if ((int)mBuffers.size() < mCount) { 210 mBuffers.resize(mCount); 211 } 212 mPos = 0; 213 for (int i = 0; i < mCount; i++) { 214 if (!android::base::loadBuffer(stream, &mBuffers[i])) { 215 return false; 216 } 217 } 218 } 219 return true; 220 } 221 222 private: grow()223 void grow() { 224 assert(mCount == (int)mBuffers.size()); 225 std::vector<T> newBuffers; 226 newBuffers.reserve(mBuffers.size() * 2); 227 newBuffers.insert(newBuffers.end(), 228 std::make_move_iterator(mBuffers.begin() + mPos), 229 std::make_move_iterator( 230 mBuffers.begin() + 231 std::min<int>(mPos + mCount, mBuffers.size()))); 232 newBuffers.insert( 233 newBuffers.end(), std::make_move_iterator(mBuffers.begin()), 234 std::make_move_iterator(mBuffers.begin() + 235 (mPos + mCount) % mBuffers.size())); 236 mBuffers = std::move(newBuffers); 237 mBuffers.resize(mBuffers.capacity()); 238 mPos = 0; 239 } 240 wakeAllWaiters()241 void wakeAllWaiters() { 242 if (mCount == (int)mBuffers.size()) { 243 mCanPush.broadcast(); 244 } 245 if (mCount == 0) { 246 mCanPop.broadcast(); 247 } 248 } 249 250 private: 251 int mPos = 0; 252 int mCount = 0; 253 bool mClosed = false; 254 bool mSnapshotMode = false; 255 std::vector<T> mBuffers; 256 257 Lock& mLock; 258 ConditionVariable mCanPush; 259 ConditionVariable mCanPop; 260 261 DISALLOW_COPY_ASSIGN_AND_MOVE(BufferQueue); 262 }; 263 264 } // namespace base 265 } // namespace android 266