• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
18 #define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
19 
20 #include <android-base/thread_annotations.h>
21 
22 #include <atomic>
23 #include <condition_variable>
24 #include <iostream>
25 #include <queue>
26 #include <thread>
27 
28 namespace android {
29 namespace hardware {
30 namespace automotive {
31 namespace vehicle {
32 
33 template <typename T>
34 class ConcurrentQueue {
35   public:
waitForItems()36     bool waitForItems() {
37         std::unique_lock<std::mutex> lockGuard(mLock);
38         android::base::ScopedLockAssertion lockAssertion(mLock);
39         while (mQueue.empty() && mIsActive) {
40             mCond.wait(lockGuard);
41         }
42         return mIsActive;
43     }
44 
flush()45     std::vector<T> flush() {
46         std::vector<T> items;
47 
48         std::scoped_lock<std::mutex> lockGuard(mLock);
49         if (mQueue.empty()) {
50             return items;
51         }
52         while (!mQueue.empty()) {
53             // Even if the queue is deactivated, we should still flush all the remaining values
54             // in the queue.
55             items.push_back(std::move(mQueue.front()));
56             mQueue.pop();
57         }
58         return items;
59     }
60 
push(T && item)61     void push(T&& item) {
62         {
63             std::scoped_lock<std::mutex> lockGuard(mLock);
64             if (!mIsActive) {
65                 return;
66             }
67             mQueue.push(std::move(item));
68         }
69         mCond.notify_one();
70     }
71 
push(std::vector<T> && items)72     void push(std::vector<T>&& items) {
73         {
74             std::scoped_lock<std::mutex> lockGuard(mLock);
75             if (!mIsActive) {
76                 return;
77             }
78             for (T& item : items) {
79                 mQueue.push(std::move(item));
80             }
81         }
82         mCond.notify_one();
83     }
84 
85     // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread.
86     // The items already in the queue could still be flushed even after the queue is deactivated.
deactivate()87     void deactivate() {
88         {
89             std::scoped_lock<std::mutex> lockGuard(mLock);
90             mIsActive = false;
91         }
92         // To unblock all waiting consumers.
93         mCond.notify_all();
94     }
95 
96     ConcurrentQueue() = default;
97 
98     ConcurrentQueue(const ConcurrentQueue&) = delete;
99     ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
100 
101   private:
102     mutable std::mutex mLock;
103     bool mIsActive GUARDED_BY(mLock) = true;
104     std::condition_variable mCond;
105     std::queue<T> mQueue GUARDED_BY(mLock);
106 };
107 
108 template <typename T>
109 class BatchingConsumer {
110   private:
111     enum class State {
112         INIT = 0,
113         RUNNING = 1,
114         STOP_REQUESTED = 2,
115         STOPPED = 3,
116     };
117 
118   public:
BatchingConsumer()119     BatchingConsumer() : mState(State::INIT) {}
120 
121     BatchingConsumer(const BatchingConsumer&) = delete;
122     BatchingConsumer& operator=(const BatchingConsumer&) = delete;
123 
124     using OnBatchReceivedFunc = std::function<void(std::vector<T> vec)>;
125 
run(ConcurrentQueue<T> * queue,std::chrono::nanoseconds batchInterval,const OnBatchReceivedFunc & func)126     void run(ConcurrentQueue<T>* queue, std::chrono::nanoseconds batchInterval,
127              const OnBatchReceivedFunc& func) {
128         mQueue = queue;
129         mBatchInterval = batchInterval;
130 
131         mWorkerThread = std::thread(&BatchingConsumer<T>::runInternal, this, func);
132     }
133 
requestStop()134     void requestStop() { mState = State::STOP_REQUESTED; }
135 
waitStopped()136     void waitStopped() {
137         if (mWorkerThread.joinable()) {
138             mWorkerThread.join();
139         }
140     }
141 
142   private:
runInternal(const OnBatchReceivedFunc & onBatchReceived)143     void runInternal(const OnBatchReceivedFunc& onBatchReceived) {
144         if (mState.exchange(State::RUNNING) == State::INIT) {
145             while (State::RUNNING == mState) {
146                 mQueue->waitForItems();
147                 if (State::STOP_REQUESTED == mState) break;
148 
149                 std::this_thread::sleep_for(mBatchInterval);
150                 if (State::STOP_REQUESTED == mState) break;
151 
152                 std::vector<T> items = mQueue->flush();
153 
154                 if (items.size() > 0) {
155                     onBatchReceived(std::move(items));
156                 }
157             }
158         }
159 
160         mState = State::STOPPED;
161     }
162 
163   private:
164     std::thread mWorkerThread;
165 
166     std::atomic<State> mState;
167     std::chrono::nanoseconds mBatchInterval;
168     ConcurrentQueue<T>* mQueue;
169 };
170 
171 }  // namespace vehicle
172 }  // namespace automotive
173 }  // namespace hardware
174 }  // namespace android
175 
176 #endif  // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
177