1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ 18 #define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ 19 20 #include <android-base/thread_annotations.h> 21 22 #include <atomic> 23 #include <condition_variable> 24 #include <iostream> 25 #include <queue> 26 #include <thread> 27 28 namespace android { 29 namespace hardware { 30 namespace automotive { 31 namespace vehicle { 32 33 template <typename T> 34 class ConcurrentQueue { 35 public: waitForItems()36 bool waitForItems() { 37 std::unique_lock<std::mutex> lockGuard(mLock); 38 android::base::ScopedLockAssertion lockAssertion(mLock); 39 while (mQueue.empty() && mIsActive) { 40 mCond.wait(lockGuard); 41 } 42 return mIsActive; 43 } 44 flush()45 std::vector<T> flush() { 46 std::vector<T> items; 47 48 std::scoped_lock<std::mutex> lockGuard(mLock); 49 if (mQueue.empty()) { 50 return items; 51 } 52 while (!mQueue.empty()) { 53 // Even if the queue is deactivated, we should still flush all the remaining values 54 // in the queue. 55 items.push_back(std::move(mQueue.front())); 56 mQueue.pop(); 57 } 58 return items; 59 } 60 push(T && item)61 void push(T&& item) { 62 { 63 std::scoped_lock<std::mutex> lockGuard(mLock); 64 if (!mIsActive) { 65 return; 66 } 67 mQueue.push(std::move(item)); 68 } 69 mCond.notify_one(); 70 } 71 push(std::vector<T> && items)72 void push(std::vector<T>&& items) { 73 { 74 std::scoped_lock<std::mutex> lockGuard(mLock); 75 if (!mIsActive) { 76 return; 77 } 78 for (T& item : items) { 79 mQueue.push(std::move(item)); 80 } 81 } 82 mCond.notify_one(); 83 } 84 85 // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread. 86 // The items already in the queue could still be flushed even after the queue is deactivated. deactivate()87 void deactivate() { 88 { 89 std::scoped_lock<std::mutex> lockGuard(mLock); 90 mIsActive = false; 91 } 92 // To unblock all waiting consumers. 93 mCond.notify_all(); 94 } 95 96 ConcurrentQueue() = default; 97 98 ConcurrentQueue(const ConcurrentQueue&) = delete; 99 ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; 100 101 private: 102 mutable std::mutex mLock; 103 bool mIsActive GUARDED_BY(mLock) = true; 104 std::condition_variable mCond; 105 std::queue<T> mQueue GUARDED_BY(mLock); 106 }; 107 108 template <typename T> 109 class BatchingConsumer { 110 private: 111 enum class State { 112 INIT = 0, 113 RUNNING = 1, 114 STOP_REQUESTED = 2, 115 STOPPED = 3, 116 }; 117 118 public: BatchingConsumer()119 BatchingConsumer() : mState(State::INIT) {} 120 121 BatchingConsumer(const BatchingConsumer&) = delete; 122 BatchingConsumer& operator=(const BatchingConsumer&) = delete; 123 124 using OnBatchReceivedFunc = std::function<void(std::vector<T> vec)>; 125 run(ConcurrentQueue<T> * queue,std::chrono::nanoseconds batchInterval,const OnBatchReceivedFunc & func)126 void run(ConcurrentQueue<T>* queue, std::chrono::nanoseconds batchInterval, 127 const OnBatchReceivedFunc& func) { 128 mQueue = queue; 129 mBatchInterval = batchInterval; 130 131 mWorkerThread = std::thread(&BatchingConsumer<T>::runInternal, this, func); 132 } 133 requestStop()134 void requestStop() { mState = State::STOP_REQUESTED; } 135 waitStopped()136 void waitStopped() { 137 if (mWorkerThread.joinable()) { 138 mWorkerThread.join(); 139 } 140 } 141 142 private: runInternal(const OnBatchReceivedFunc & onBatchReceived)143 void runInternal(const OnBatchReceivedFunc& onBatchReceived) { 144 if (mState.exchange(State::RUNNING) == State::INIT) { 145 while (State::RUNNING == mState) { 146 mQueue->waitForItems(); 147 if (State::STOP_REQUESTED == mState) break; 148 149 std::this_thread::sleep_for(mBatchInterval); 150 if (State::STOP_REQUESTED == mState) break; 151 152 std::vector<T> items = mQueue->flush(); 153 154 if (items.size() > 0) { 155 onBatchReceived(std::move(items)); 156 } 157 } 158 } 159 160 mState = State::STOPPED; 161 } 162 163 private: 164 std::thread mWorkerThread; 165 166 std::atomic<State> mState; 167 std::chrono::nanoseconds mBatchInterval; 168 ConcurrentQueue<T>* mQueue; 169 }; 170 171 } // namespace vehicle 172 } // namespace automotive 173 } // namespace hardware 174 } // namespace android 175 176 #endif // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_ 177