1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifndef PROFILER_DATA_REPEATER_H
16 #define PROFILER_DATA_REPEATER_H
17
18 #include <condition_variable>
19 #include <deque>
20 #include <memory>
21 #include <mutex>
22 #include "logging.h"
23 #include "nocopyable.h"
24 #include "profiler_service_types.pb.h"
25
26 using ProfilerPluginDataPtr = STD_PTR(shared, ProfilerPluginData);
27
28 template <typename T>
29 class ProfilerDataRepeater {
30 public:
31 explicit ProfilerDataRepeater(size_t maxSize);
32 ~ProfilerDataRepeater();
33
34 bool PutPluginData(const std::shared_ptr<T>& pluginData);
35
36 std::shared_ptr<T> TakePluginData();
37
38 int TakePluginData(std::vector<std::shared_ptr<T>>& pluginDataVec);
39
40 void Close();
41
42 void Reset();
43
44 size_t Size();
45
46 void ClearQueue();
47
48 private:
49 std::mutex mutex_;
50 std::condition_variable slotCondVar_;
51 std::condition_variable itemCondVar_;
52 std::deque<std::shared_ptr<T>> dataQueue_;
53 size_t maxSize_;
54 bool closed_;
55
56 DISALLOW_COPY_AND_MOVE(ProfilerDataRepeater);
57 };
58
59 template <typename T>
ProfilerDataRepeater(size_t maxSize)60 ProfilerDataRepeater<T>::ProfilerDataRepeater(size_t maxSize)
61 {
62 maxSize_ = maxSize;
63 closed_ = false;
64 }
65
66 template <typename T>
~ProfilerDataRepeater()67 ProfilerDataRepeater<T>::~ProfilerDataRepeater()
68 {
69 Close();
70 }
71
72 template <typename T>
Size()73 size_t ProfilerDataRepeater<T>::Size()
74 {
75 std::unique_lock<std::mutex> lock(mutex_);
76 return dataQueue_.size();
77 }
78
79 template <typename T>
Reset()80 void ProfilerDataRepeater<T>::Reset()
81 {
82 std::unique_lock<std::mutex> lock(mutex_);
83 closed_ = false;
84 }
85
86 template <typename T>
Close()87 void ProfilerDataRepeater<T>::Close()
88 {
89 {
90 std::unique_lock<std::mutex> lock(mutex_);
91 dataQueue_.clear();
92 closed_ = true;
93 }
94 slotCondVar_.notify_all();
95 itemCondVar_.notify_all();
96 }
97
98 template <typename T>
PutPluginData(const std::shared_ptr<T> & pluginData)99 bool ProfilerDataRepeater<T>::PutPluginData(const std::shared_ptr<T>& pluginData)
100 {
101 std::unique_lock<std::mutex> lock(mutex_);
102
103 if ((pluginData == nullptr) && (dataQueue_.size() > 0)) {
104 PROFILER_LOG_INFO(LOG_CORE, "no need put nullptr if queue has data, dataQueue_.size() = %zu",
105 dataQueue_.size());
106 return true;
107 }
108
109 while (dataQueue_.size() >= maxSize_ && !closed_) {
110 slotCondVar_.wait(lock);
111 }
112 if (closed_) {
113 return false;
114 }
115
116 dataQueue_.push_back(pluginData);
117 lock.unlock();
118
119 itemCondVar_.notify_one();
120 return true;
121 }
122
123 template <typename T>
TakePluginData()124 std::shared_ptr<T> ProfilerDataRepeater<T>::TakePluginData()
125 {
126 std::unique_lock<std::mutex> lock(mutex_);
127 while (dataQueue_.empty() && !closed_) {
128 itemCondVar_.wait(lock);
129 }
130 if (closed_) {
131 return nullptr;
132 }
133
134 auto result = dataQueue_.front();
135 dataQueue_.pop_front();
136 lock.unlock();
137
138 slotCondVar_.notify_one();
139 return result;
140 }
141
142 template <typename T>
TakePluginData(std::vector<std::shared_ptr<T>> & pluginDataVec)143 int ProfilerDataRepeater<T>::TakePluginData(std::vector<std::shared_ptr<T>>& pluginDataVec)
144 {
145 std::unique_lock<std::mutex> lock(mutex_);
146 while (dataQueue_.empty() && !closed_) {
147 itemCondVar_.wait(lock);
148 }
149 if (closed_) {
150 return -1;
151 }
152
153 int count = 0;
154 while (dataQueue_.size() > 0) {
155 auto result = dataQueue_.front();
156 pluginDataVec.push_back(result);
157 dataQueue_.pop_front();
158 count++;
159 }
160 lock.unlock();
161
162 slotCondVar_.notify_one();
163 return count;
164 }
165
166 template <typename T>
ClearQueue()167 void ProfilerDataRepeater<T>::ClearQueue()
168 {
169 std::unique_lock<std::mutex> lock(mutex_);
170 dataQueue_.clear();
171 }
172
173 using ProfilerDataRepeaterPtr = STD_PTR(shared, ProfilerDataRepeater<ProfilerPluginData>);
174
175 #endif // PROFILER_DATA_REPEATER_H