• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
18 #define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
19 
20 #include <android-base/thread_annotations.h>
21 
22 #include <atomic>
23 #include <condition_variable>
24 #include <iostream>
25 #include <queue>
26 #include <thread>
27 
28 namespace android {
29 namespace hardware {
30 namespace automotive {
31 namespace vehicle {
32 
33 template <typename T>
34 class ConcurrentQueue {
35   public:
waitForItems()36     bool waitForItems() {
37         std::unique_lock<std::mutex> lockGuard(mLock);
38         android::base::ScopedLockAssertion lockAssertion(mLock);
39         while (mQueue.empty() && mIsActive) {
40             mCond.wait(lockGuard);
41         }
42         return mIsActive;
43     }
44 
flush()45     std::vector<T> flush() {
46         std::vector<T> items;
47 
48         std::scoped_lock<std::mutex> lockGuard(mLock);
49         if (mQueue.empty()) {
50             return items;
51         }
52         while (!mQueue.empty()) {
53             // Even if the queue is deactivated, we should still flush all the remaining values
54             // in the queue.
55             items.push_back(std::move(mQueue.front()));
56             mQueue.pop();
57         }
58         return items;
59     }
60 
push(T && item)61     void push(T&& item) {
62         {
63             std::scoped_lock<std::mutex> lockGuard(mLock);
64             if (!mIsActive) {
65                 return;
66             }
67             mQueue.push(std::move(item));
68         }
69         mCond.notify_one();
70     }
71 
72     // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread.
73     // The items already in the queue could still be flushed even after the queue is deactivated.
deactivate()74     void deactivate() {
75         {
76             std::scoped_lock<std::mutex> lockGuard(mLock);
77             mIsActive = false;
78         }
79         // To unblock all waiting consumers.
80         mCond.notify_all();
81     }
82 
83     ConcurrentQueue() = default;
84 
85     ConcurrentQueue(const ConcurrentQueue&) = delete;
86     ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
87 
88   private:
89     mutable std::mutex mLock;
90     bool mIsActive GUARDED_BY(mLock) = true;
91     std::condition_variable mCond;
92     std::queue<T> mQueue GUARDED_BY(mLock);
93 };
94 
95 }  // namespace vehicle
96 }  // namespace automotive
97 }  // namespace hardware
98 }  // namespace android
99 
100 #endif  // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
101