• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <input/PrintTools.h>
20 #include <condition_variable>
21 #include <functional>
22 #include <list>
23 #include <mutex>
24 #include <optional>
25 #include "android-base/thread_annotations.h"
26 
27 namespace android {
28 
29 /**
30  * A thread-safe FIFO queue. This list-backed queue stores up to <i>capacity</i> objects if
31  * a capacity is provided at construction, and is otherwise unbounded.
32  * Objects can always be added. Objects are added immediately.
33  * If the queue is full, new objects cannot be added.
34  *
35  * The action of retrieving an object will block until an element is available.
36  */
37 template <class T>
38 class BlockingQueue {
39 public:
40     explicit BlockingQueue() = default;
41 
BlockingQueue(size_t capacity)42     explicit BlockingQueue(size_t capacity) : mCapacity(capacity){};
43 
44     /**
45      * Retrieve and remove the oldest object.
46      * Blocks execution indefinitely while queue is empty.
47      */
pop()48     T pop() {
49         std::unique_lock lock(mLock);
50         android::base::ScopedLockAssertion assumeLock(mLock);
51         mHasElements.wait(lock, [this]() REQUIRES(mLock) { return !this->mQueue.empty(); });
52         T t = std::move(mQueue.front());
53         mQueue.erase(mQueue.begin());
54         return t;
55     };
56 
57     /**
58      * Retrieve and remove the oldest object.
59      * Blocks execution for the given duration while queue is empty, and returns std::nullopt
60      * if the queue was empty for the entire duration.
61      */
popWithTimeout(std::chrono::nanoseconds duration)62     std::optional<T> popWithTimeout(std::chrono::nanoseconds duration) {
63         std::unique_lock lock(mLock);
64         android::base::ScopedLockAssertion assumeLock(mLock);
65         if (!mHasElements.wait_for(lock, duration,
66                                    [this]() REQUIRES(mLock) { return !this->mQueue.empty(); })) {
67             return {};
68         }
69         T t = std::move(mQueue.front());
70         mQueue.erase(mQueue.begin());
71         return t;
72     };
73 
74     /**
75      * Add a new object to the queue.
76      * Does not block.
77      * Return true if an element was successfully added.
78      * Return false if the queue is full.
79      */
push(T && t)80     bool push(T&& t) {
81         { // acquire lock
82             std::scoped_lock lock(mLock);
83             if (mCapacity && mQueue.size() == mCapacity) {
84                 return false;
85             }
86             mQueue.push_back(std::move(t));
87         } // release lock
88         mHasElements.notify_one();
89         return true;
90     };
91 
92     /**
93      * Construct a new object into the queue.
94      * Does not block.
95      * Return true if an element was successfully added.
96      * Return false if the queue is full.
97      */
98     template <class... Args>
emplace(Args &&...args)99     bool emplace(Args&&... args) {
100         { // acquire lock
101             std::scoped_lock lock(mLock);
102             if (mCapacity && mQueue.size() == mCapacity) {
103                 return false;
104             }
105             mQueue.emplace_back(args...);
106         } // release lock
107         mHasElements.notify_one();
108         return true;
109     };
110 
erase_if(const std::function<bool (const T &)> & pred)111     void erase_if(const std::function<bool(const T&)>& pred) {
112         std::scoped_lock lock(mLock);
113         std::erase_if(mQueue, pred);
114     }
115 
116     /**
117      * Remove all elements.
118      * Does not block.
119      */
clear()120     void clear() {
121         std::scoped_lock lock(mLock);
122         mQueue.clear();
123     };
124 
125     /**
126      * How many elements are currently stored in the queue.
127      * Primary used for debugging.
128      * Does not block.
129      */
size()130     size_t size() const {
131         std::scoped_lock lock(mLock);
132         return mQueue.size();
133     }
134 
empty()135     bool empty() const {
136         std::scoped_lock lock(mLock);
137         return mQueue.empty();
138     }
139 
140     std::string dump(std::string (*toString)(const T&) = constToString) const {
141         std::scoped_lock lock(mLock);
142         return dumpContainer(mQueue, toString);
143     }
144 
145 private:
146     const std::optional<size_t> mCapacity;
147     /**
148      * Used to signal that mQueue is non-empty.
149      */
150     std::condition_variable mHasElements;
151     /**
152      * Lock for accessing and waiting on elements.
153      */
154     mutable std::mutex mLock;
155     std::list<T> mQueue GUARDED_BY(mLock);
156 };
157 
158 } // namespace android
159