• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2016 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #pragma once
15 
16 #include "base/Compiler.h"
17 #include "base/Stream.h"
18 #include "base/StreamSerializing.h"
19 #include "base/ConditionVariable.h"
20 #include "base/Lock.h"
21 
22 #include <iterator>
23 #include <vector>
24 #include <utility>
25 
26 #include <assert.h>
27 #include <stddef.h>
28 
29 namespace android {
30 namespace base {
31 
32 // Values corresponding to the result of BufferQueue operations.
33 // |Ok| means everything went well.
34 // |TryAgain| means the operation could not be performed and should be
35 // tried later.
36 // |Error| means an error happened (i.e. the BufferQueue is closed).
37 // |Timeout| means that an item could not be popped in time.
38 enum class BufferQueueResult {
39     Ok = 0,
40     TryAgain = 1,
41     Error = 2,
42     Timeout = 3,
43 };
44 
45 // BufferQueue models a FIFO queue of <T> instances
46 // that can be used between two different threads. Note that it depends,
47 // for synchronization, on an external lock (passed as a reference in
48 // the BufferQueue constructor).
49 //
50 // This allows one to use multiple BufferQueue instances whose content
51 // are protected by a single lock.
52 template <class T>
53 class BufferQueue {
54     using ConditionVariable = android::base::ConditionVariable;
55     using Lock = android::base::Lock;
56     using AutoLock = android::base::AutoLock;
57 
58 public:
59     using value_type = T;
60 
61     // Constructor. |capacity| is the maximum number of T instances in
62     // the queue, and |lock| is a reference to an external lock provided by
63     // the caller.
BufferQueue(int capacity,android::base::Lock & lock)64     BufferQueue(int capacity, android::base::Lock& lock)
65         : mBuffers(capacity), mLock(lock) {}
66 
67     // Return true iff one can send a buffer to the queue, i.e. if it
68     // is not full or it would grow anyway.
canPushLocked()69     bool canPushLocked() const { return !mClosed && mCount < (int)mBuffers.size(); }
70 
71     // Return true iff one can receive a buffer from the queue, i.e. if
72     // it is not empty.
canPopLocked()73     bool canPopLocked() const { return mCount > 0; }
74 
75     // Return true iff the queue is closed.
isClosedLocked()76     bool isClosedLocked() const { return mClosed; }
77 
78     // Changes the operation mode to snapshot or back. In snapshot mode
79     // BufferQueue accepts all write requests and accumulates the data, but
80     // returns error on all reads.
setSnapshotModeLocked(bool on)81     void setSnapshotModeLocked(bool on) {
82         mSnapshotMode = on;
83         if (on && !mClosed) {
84             wakeAllWaiters();
85         }
86     }
87 
88     // Try to send a buffer to the queue. On success, return BufferQueueResult::Ok
89     // and moves |buffer| to the queue. On failure, return
90     // BufferQueueResult::TryAgain if the queue was full, or BufferQueueResult::Error
91     // if it was closed.
92     // Note: in snapshot mode it never returns TryAgain, but grows the max
93     //   queue size instead.
tryPushLocked(T && buffer)94     BufferQueueResult tryPushLocked(T&& buffer) {
95         if (mClosed) {
96             return BufferQueueResult::Error;
97         }
98         if (mCount >= (int)mBuffers.size()) {
99             if (mSnapshotMode) {
100                 grow();
101             } else {
102                 return BufferQueueResult::TryAgain;
103             }
104         }
105         int pos = mPos + mCount;
106         if (pos >= (int)mBuffers.size()) {
107             pos -= mBuffers.size();
108         }
109         mBuffers[pos] = std::move(buffer);
110         if (mCount++ == 0) {
111             mCanPop.signal();
112         }
113         return BufferQueueResult::Ok;
114     }
115 
116     // Push a buffer to the queue. This is a blocking call. On success,
117     // move |buffer| into the queue and return BufferQueueResult::Ok. On failure,
118     // return BufferQueueResult::Error meaning the queue was closed.
pushLocked(T && buffer)119     BufferQueueResult pushLocked(T&& buffer) {
120         while (mCount == (int)mBuffers.size() && !mSnapshotMode) {
121             if (mClosed) {
122                 return BufferQueueResult::Error;
123             }
124             mCanPush.wait(&mLock);
125         }
126         return tryPushLocked(std::move(buffer));
127     }
128 
129     // Try to read a buffer from the queue. On success, moves item into
130     // |*buffer| and return BufferQueueResult::Ok. On failure, return BufferQueueResult::Error
131     // if the queue is empty and closed or in snapshot mode, and
132     // BufferQueueResult::TryAgain if it is empty but not closed.
tryPopLocked(T * buffer)133     BufferQueueResult tryPopLocked(T* buffer) {
134         if (mCount == 0) {
135             return (mClosed || mSnapshotMode) ? BufferQueueResult::Error
136                                               : BufferQueueResult::TryAgain;
137         }
138         *buffer = std::move(mBuffers[mPos]);
139         int pos = mPos + 1;
140         if (pos >= (int)mBuffers.size()) {
141             pos -= mBuffers.size();
142         }
143         mPos = pos;
144         if (mCount-- == (int)mBuffers.size()) {
145             mCanPush.signal();
146         }
147         return BufferQueueResult::Ok;
148     }
149 
150     // Pop a buffer from the queue. This is a blocking call. On success,
151     // move item into |*buffer| and return BufferQueueResult::Ok. On failure,
152     // return BufferQueueResult::Error to indicate the queue was closed or is in
153     // snapshot mode.
popLocked(T * buffer)154     BufferQueueResult popLocked(T* buffer) {
155         while (mCount == 0 && !mSnapshotMode) {
156             if (mClosed) {
157                 // Closed queue is empty.
158                 return BufferQueueResult::Error;
159             }
160             mCanPop.wait(&mLock);
161         }
162         return tryPopLocked(buffer);
163     }
164 
165     // Pop a buffer from the queue. This is a blocking call. On success,
166     // move item into |*buffer| and return BufferQueueResult::Ok. On failure,
167     // return BufferQueueResult::Error to indicate the queue was closed or is in
168     // snapshot mode. Returns BufferQueueResult::Timeout if we waited passed
169     // waitUntilUs.
popLockedBefore(T * buffer,uint64_t waitUntilUs)170     BufferQueueResult popLockedBefore(T* buffer, uint64_t waitUntilUs) {
171         while (mCount == 0 && !mSnapshotMode) {
172             if (mClosed) {
173                 // Closed queue is empty.
174                 return BufferQueueResult::Error;
175             }
176             if (!mCanPop.timedWait(&mLock, waitUntilUs)) {
177                 return BufferQueueResult::Timeout;
178             }
179 
180         }
181         return tryPopLocked(buffer);
182     }
183 
184     // Close the queue, it is no longer possible to push new items
185     // to it (i.e. push() will always return BufferQueueResult::Error), or to
186     // read from an empty queue (i.e. pop() will always return
187     // BufferQueueResult::Error once the queue becomes empty).
closeLocked()188     void closeLocked() {
189         mClosed = true;
190         wakeAllWaiters();
191     }
192 
193     // Save to a snapshot file
onSaveLocked(android::base::Stream * stream)194     void onSaveLocked(android::base::Stream* stream) {
195         stream->putByte(mClosed);
196         if (!mClosed) {
197             stream->putBe32(mCount);
198             for (int i = 0; i < mCount; i++) {
199                 android::base::saveBuffer(
200                         stream, mBuffers[(i + mPos) % mBuffers.size()]);
201             }
202         }
203     }
204 
onLoadLocked(android::base::Stream * stream)205     bool onLoadLocked(android::base::Stream* stream) {
206         mClosed = stream->getByte();
207         if (!mClosed) {
208             mCount = stream->getBe32();
209             if ((int)mBuffers.size() < mCount) {
210                 mBuffers.resize(mCount);
211             }
212             mPos = 0;
213             for (int i = 0; i < mCount; i++) {
214                 if (!android::base::loadBuffer(stream, &mBuffers[i])) {
215                     return false;
216                 }
217             }
218         }
219         return true;
220     }
221 
222 private:
grow()223     void grow() {
224         assert(mCount == (int)mBuffers.size());
225         std::vector<T> newBuffers;
226         newBuffers.reserve(mBuffers.size() * 2);
227         newBuffers.insert(newBuffers.end(),
228                           std::make_move_iterator(mBuffers.begin() + mPos),
229                           std::make_move_iterator(
230                                   mBuffers.begin() +
231                                   std::min<int>(mPos + mCount, mBuffers.size())));
232         newBuffers.insert(
233                 newBuffers.end(), std::make_move_iterator(mBuffers.begin()),
234                 std::make_move_iterator(mBuffers.begin() +
235                                         (mPos + mCount) % mBuffers.size()));
236         mBuffers = std::move(newBuffers);
237         mBuffers.resize(mBuffers.capacity());
238         mPos = 0;
239     }
240 
wakeAllWaiters()241     void wakeAllWaiters() {
242         if (mCount == (int)mBuffers.size()) {
243             mCanPush.broadcast();
244         }
245         if (mCount == 0) {
246             mCanPop.broadcast();
247         }
248     }
249 
250 private:
251     int mPos = 0;
252     int mCount = 0;
253     bool mClosed = false;
254     bool mSnapshotMode = false;
255     std::vector<T> mBuffers;
256 
257     Lock& mLock;
258     ConditionVariable mCanPush;
259     ConditionVariable mCanPop;
260 
261     DISALLOW_COPY_ASSIGN_AND_MOVE(BufferQueue);
262 };
263 
264 }  // namespace base
265 }  // namespace android
266