1 // Copyright (C) 2016 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #pragma once 15 16 #include "aemu/base/Compiler.h" 17 #include "aemu/base/files/Stream.h" 18 #include "aemu/base/files/StreamSerializing.h" 19 #include "aemu/base/synchronization/ConditionVariable.h" 20 #include "aemu/base/synchronization/Lock.h" 21 22 #include <iterator> 23 #include <vector> 24 #include <utility> 25 26 #include <assert.h> 27 #include <stddef.h> 28 29 namespace android { 30 namespace base { 31 32 // Values corresponding to the result of BufferQueue operations. 33 // |Ok| means everything went well. 34 // |TryAgain| means the operation could not be performed and should be 35 // tried later. 36 // |Error| means an error happened (i.e. the BufferQueue is closed). 37 // |Timeout| means that an item could not be popped in time. 38 enum class BufferQueueResult { 39 Ok = 0, 40 TryAgain = 1, 41 Error = 2, 42 Timeout = 3, 43 }; 44 45 // BufferQueue models a FIFO queue of <T> instances 46 // that can be used between two different threads. Note that it depends, 47 // for synchronization, on an external lock (passed as a reference in 48 // the BufferQueue constructor). 49 // 50 // This allows one to use multiple BufferQueue instances whose content 51 // are protected by a single lock. 52 template <class T> 53 class BufferQueue { 54 using ConditionVariable = android::base::ConditionVariable; 55 using Lock = android::base::Lock; 56 using AutoLock = android::base::AutoLock; 57 58 public: 59 using value_type = T; 60 61 // Constructor. |capacity| is the maximum number of T instances in 62 // the queue, and |lock| is a reference to an external lock provided by 63 // the caller. BufferQueue(int capacity,android::base::Lock & lock)64 BufferQueue(int capacity, android::base::Lock& lock) 65 : mBuffers(capacity), mLock(lock) {} 66 67 // Return true iff one can send a buffer to the queue, i.e. if it 68 // is not full or it would grow anyway. canPushLocked()69 bool canPushLocked() const { return !mClosed && mCount < (int)mBuffers.size(); } 70 71 // A blocking call that will wait until one can send a buffer to the queue. waitUntilPushableLocked()72 void waitUntilPushableLocked() { 73 if (!canPushLocked()) { 74 mCanPush.wait(&mLock); 75 } 76 } 77 78 // Return true iff one can receive a buffer from the queue, i.e. if 79 // it is not empty. canPopLocked()80 bool canPopLocked() const { return mCount > 0; } 81 82 // A blocking call that will wait until one can receive a buffer from the queue. waitUntilPopableLocked()83 void waitUntilPopableLocked() { 84 if (!canPopLocked()) { 85 mCanPop.wait(&mLock); 86 } 87 } 88 89 // Return true iff the queue is closed. isClosedLocked()90 bool isClosedLocked() const { return mClosed; } 91 92 // Changes the operation mode to snapshot or back. In snapshot mode 93 // BufferQueue accepts all write requests and accumulates the data, but 94 // returns error on all reads. setSnapshotModeLocked(bool on)95 void setSnapshotModeLocked(bool on) { 96 mSnapshotMode = on; 97 if (on && !mClosed) { 98 wakeAllWaiters(); 99 } 100 } 101 102 // Try to send a buffer to the queue. On success, return BufferQueueResult::Ok 103 // and moves |buffer| to the queue. On failure, return 104 // BufferQueueResult::TryAgain if the queue was full, or BufferQueueResult::Error 105 // if it was closed. 106 // Note: in snapshot mode it never returns TryAgain, but grows the max 107 // queue size instead. tryPushLocked(T && buffer)108 BufferQueueResult tryPushLocked(T&& buffer) { 109 if (mClosed) { 110 return BufferQueueResult::Error; 111 } 112 if (mCount >= (int)mBuffers.size()) { 113 if (mSnapshotMode) { 114 grow(); 115 } else { 116 return BufferQueueResult::TryAgain; 117 } 118 } 119 int pos = mPos + mCount; 120 if (pos >= (int)mBuffers.size()) { 121 pos -= mBuffers.size(); 122 } 123 mBuffers[pos] = std::move(buffer); 124 if (mCount++ == 0) { 125 mCanPop.signal(); 126 } 127 return BufferQueueResult::Ok; 128 } 129 130 // Push a buffer to the queue. This is a blocking call. On success, 131 // move |buffer| into the queue and return BufferQueueResult::Ok. On failure, 132 // return BufferQueueResult::Error meaning the queue was closed. pushLocked(T && buffer)133 BufferQueueResult pushLocked(T&& buffer) { 134 while (mCount == (int)mBuffers.size() && !mSnapshotMode) { 135 if (mClosed) { 136 return BufferQueueResult::Error; 137 } 138 mCanPush.wait(&mLock); 139 } 140 return tryPushLocked(std::move(buffer)); 141 } 142 143 // Try to read a buffer from the queue. On success, moves item into 144 // |*buffer| and return BufferQueueResult::Ok. On failure, return BufferQueueResult::Error 145 // if the queue is empty and closed or in snapshot mode, and 146 // BufferQueueResult::TryAgain if it is empty but not closed. tryPopLocked(T * buffer)147 BufferQueueResult tryPopLocked(T* buffer) { 148 if (mCount == 0) { 149 return (mClosed || mSnapshotMode) ? BufferQueueResult::Error 150 : BufferQueueResult::TryAgain; 151 } 152 *buffer = std::move(mBuffers[mPos]); 153 int pos = mPos + 1; 154 if (pos >= (int)mBuffers.size()) { 155 pos -= mBuffers.size(); 156 } 157 mPos = pos; 158 if (mCount-- == (int)mBuffers.size()) { 159 mCanPush.signal(); 160 } 161 return BufferQueueResult::Ok; 162 } 163 164 // Pop a buffer from the queue. This is a blocking call. On success, 165 // move item into |*buffer| and return BufferQueueResult::Ok. On failure, 166 // return BufferQueueResult::Error to indicate the queue was closed or is in 167 // snapshot mode. popLocked(T * buffer)168 BufferQueueResult popLocked(T* buffer) { 169 while (mCount == 0 && !mSnapshotMode) { 170 if (mClosed) { 171 // Closed queue is empty. 172 return BufferQueueResult::Error; 173 } 174 mCanPop.wait(&mLock); 175 } 176 return tryPopLocked(buffer); 177 } 178 179 // Pop a buffer from the queue. This is a blocking call. On success, 180 // move item into |*buffer| and return BufferQueueResult::Ok. On failure, 181 // return BufferQueueResult::Error to indicate the queue was closed or is in 182 // snapshot mode. Returns BufferQueueResult::Timeout if we waited passed 183 // waitUntilUs. popLockedBefore(T * buffer,uint64_t waitUntilUs)184 BufferQueueResult popLockedBefore(T* buffer, uint64_t waitUntilUs) { 185 while (mCount == 0 && !mSnapshotMode) { 186 if (mClosed) { 187 // Closed queue is empty. 188 return BufferQueueResult::Error; 189 } 190 if (!mCanPop.timedWait(&mLock, waitUntilUs)) { 191 return BufferQueueResult::Timeout; 192 } 193 194 } 195 return tryPopLocked(buffer); 196 } 197 198 // Close the queue, it is no longer possible to push new items 199 // to it (i.e. push() will always return BufferQueueResult::Error), or to 200 // read from an empty queue (i.e. pop() will always return 201 // BufferQueueResult::Error once the queue becomes empty). closeLocked()202 void closeLocked() { 203 mClosed = true; 204 wakeAllWaiters(); 205 } 206 207 // Save to a snapshot file onSaveLocked(android::base::Stream * stream)208 void onSaveLocked(android::base::Stream* stream) { 209 stream->putByte(mClosed); 210 if (!mClosed) { 211 stream->putBe32(mCount); 212 for (int i = 0; i < mCount; i++) { 213 android::base::saveBuffer( 214 stream, mBuffers[(i + mPos) % mBuffers.size()]); 215 } 216 } 217 } 218 onLoadLocked(android::base::Stream * stream)219 bool onLoadLocked(android::base::Stream* stream) { 220 mClosed = stream->getByte(); 221 if (!mClosed) { 222 mCount = stream->getBe32(); 223 if ((int)mBuffers.size() < mCount) { 224 mBuffers.resize(mCount); 225 } 226 mPos = 0; 227 for (int i = 0; i < mCount; i++) { 228 if (!android::base::loadBuffer(stream, &mBuffers[i])) { 229 return false; 230 } 231 } 232 } 233 return true; 234 } 235 236 private: grow()237 void grow() { 238 assert(mCount == (int)mBuffers.size()); 239 std::vector<T> newBuffers; 240 newBuffers.reserve(mBuffers.size() * 2); 241 newBuffers.insert(newBuffers.end(), 242 std::make_move_iterator(mBuffers.begin() + mPos), 243 std::make_move_iterator( 244 mBuffers.begin() + 245 std::min<int>(mPos + mCount, mBuffers.size()))); 246 newBuffers.insert( 247 newBuffers.end(), std::make_move_iterator(mBuffers.begin()), 248 std::make_move_iterator(mBuffers.begin() + 249 (mPos + mCount) % mBuffers.size())); 250 mBuffers = std::move(newBuffers); 251 mBuffers.resize(mBuffers.capacity()); 252 mPos = 0; 253 } 254 wakeAllWaiters()255 void wakeAllWaiters() { 256 if (mCount == (int)mBuffers.size()) { 257 mCanPush.broadcast(); 258 } 259 if (mCount == 0) { 260 mCanPop.broadcast(); 261 } 262 } 263 264 private: 265 int mPos = 0; 266 int mCount = 0; 267 bool mClosed = false; 268 bool mSnapshotMode = false; 269 std::vector<T> mBuffers; 270 271 Lock& mLock; 272 ConditionVariable mCanPush; 273 ConditionVariable mCanPop; 274 275 DISALLOW_COPY_ASSIGN_AND_MOVE(BufferQueue); 276 }; 277 278 } // namespace base 279 } // namespace android 280