• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef PROFILER_DATA_REPEATER_H
16 #define PROFILER_DATA_REPEATER_H
17 
18 #include <condition_variable>
19 #include <deque>
20 #include <memory>
21 #include <mutex>
22 #include "logging.h"
23 #include "nocopyable.h"
24 #include "profiler_service_types.pb.h"
25 
26 using ProfilerPluginDataPtr = STD_PTR(shared, ProfilerPluginData);
27 
28 template <typename T>
29 class ProfilerDataRepeater {
30 public:
31     explicit ProfilerDataRepeater(size_t maxSize);
32     ~ProfilerDataRepeater();
33 
34     bool PutPluginData(const std::shared_ptr<T>& pluginData);
35 
36     std::shared_ptr<T> TakePluginData();
37 
38     int TakePluginData(std::vector<std::shared_ptr<T>>& pluginDataVec);
39 
40     void Close();
41 
42     void Reset();
43 
44     size_t Size();
45 
46     void ClearQueue();
47 
48 private:
49     std::mutex mutex_;
50     std::condition_variable slotCondVar_;
51     std::condition_variable itemCondVar_;
52     std::deque<std::shared_ptr<T>> dataQueue_;
53     size_t maxSize_;
54     bool closed_;
55 
56     DISALLOW_COPY_AND_MOVE(ProfilerDataRepeater);
57 };
58 
59 template <typename T>
ProfilerDataRepeater(size_t maxSize)60 ProfilerDataRepeater<T>::ProfilerDataRepeater(size_t maxSize)
61 {
62     maxSize_ = maxSize;
63     closed_ = false;
64 }
65 
66 template <typename T>
~ProfilerDataRepeater()67 ProfilerDataRepeater<T>::~ProfilerDataRepeater()
68 {
69     Close();
70 }
71 
72 template <typename T>
Size()73 size_t ProfilerDataRepeater<T>::Size()
74 {
75     std::unique_lock<std::mutex> lock(mutex_);
76     return dataQueue_.size();
77 }
78 
79 template <typename T>
Reset()80 void ProfilerDataRepeater<T>::Reset()
81 {
82     std::unique_lock<std::mutex> lock(mutex_);
83     closed_ = false;
84 }
85 
86 template <typename T>
Close()87 void ProfilerDataRepeater<T>::Close()
88 {
89     {
90         std::unique_lock<std::mutex> lock(mutex_);
91         dataQueue_.clear();
92         closed_ = true;
93     }
94     slotCondVar_.notify_all();
95     itemCondVar_.notify_all();
96 }
97 
98 template <typename T>
PutPluginData(const std::shared_ptr<T> & pluginData)99 bool ProfilerDataRepeater<T>::PutPluginData(const std::shared_ptr<T>& pluginData)
100 {
101     std::unique_lock<std::mutex> lock(mutex_);
102 
103     if ((pluginData == nullptr) && (dataQueue_.size() > 0)) {
104         PROFILER_LOG_INFO(LOG_CORE, "no need put nullptr if queue has data, dataQueue_.size() = %zu",
105                           dataQueue_.size());
106         return true;
107     }
108 
109     while (dataQueue_.size() >= maxSize_ && !closed_) {
110         slotCondVar_.wait(lock);
111     }
112     if (closed_) {
113         return false;
114     }
115 
116     dataQueue_.push_back(pluginData);
117     lock.unlock();
118 
119     itemCondVar_.notify_one();
120     return true;
121 }
122 
123 template <typename T>
TakePluginData()124 std::shared_ptr<T> ProfilerDataRepeater<T>::TakePluginData()
125 {
126     std::unique_lock<std::mutex> lock(mutex_);
127     while (dataQueue_.empty() && !closed_) {
128         itemCondVar_.wait(lock);
129     }
130     if (closed_) {
131         return nullptr;
132     }
133 
134     auto result = dataQueue_.front();
135     dataQueue_.pop_front();
136     lock.unlock();
137 
138     slotCondVar_.notify_one();
139     return result;
140 }
141 
142 template <typename T>
TakePluginData(std::vector<std::shared_ptr<T>> & pluginDataVec)143 int ProfilerDataRepeater<T>::TakePluginData(std::vector<std::shared_ptr<T>>& pluginDataVec)
144 {
145     std::unique_lock<std::mutex> lock(mutex_);
146     while (dataQueue_.empty() && !closed_) {
147         itemCondVar_.wait(lock);
148     }
149     if (closed_) {
150         return -1;
151     }
152 
153     int count = 0;
154     while (dataQueue_.size() > 0) {
155         auto result = dataQueue_.front();
156         pluginDataVec.push_back(result);
157         dataQueue_.pop_front();
158         count++;
159     }
160     lock.unlock();
161 
162     slotCondVar_.notify_one();
163     return count;
164 }
165 
166 template <typename T>
ClearQueue()167 void ProfilerDataRepeater<T>::ClearQueue()
168 {
169     std::unique_lock<std::mutex> lock(mutex_);
170     dataQueue_.clear();
171 }
172 
173 using ProfilerDataRepeaterPtr = STD_PTR(shared, ProfilerDataRepeater<ProfilerPluginData>);
174 
175 #endif // PROFILER_DATA_REPEATER_H