1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "frame_retainer.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "serial_buffer.h"
20
21 namespace DistributedDB {
22 namespace {
23 const uint32_t MAX_CAPACITY = 67108864; // 64 M bytes
24 const uint32_t MAX_RETAIN_TIME = 10; // 10 s
25 const uint32_t MAX_RETAIN_FRAME_SIZE = 33554432; // 32 M bytes
26 const uint32_t MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET = 5; // Allow 5 frame per communicator per source target
27 const int SURVAIL_PERIOD_IN_MILLISECOND = 1000; // Period is 1 s
LogRetainInfo(const std::string & logPrefix,const LabelType & label,const std::string & target,uint64_t order,const RetainWork & work)28 inline void LogRetainInfo(const std::string &logPrefix, const LabelType &label, const std::string &target,
29 uint64_t order, const RetainWork &work)
30 {
31 LOGI("%s : Label=%s, target=%s{private}, retainOrder=%llu, frameId=%u, remainTime=%u, frameSize=%u.",
32 logPrefix.c_str(), VEC_TO_STR(label), target.c_str(), ULL(order),
33 work.frameId, work.remainTime, work.buffer->GetSize());
34 }
35 }
36
Initialize()37 void FrameRetainer::Initialize()
38 {
39 RuntimeContext *context = RuntimeContext::GetInstance();
40 if (context == nullptr) {
41 return; // Never gonna happen, context always be valid.
42 }
43 TimerAction action = [this](TimerId inTimerId)->int {
44 PeriodicalSurveillance();
45 return E_OK;
46 };
47 int errCode = context->SetTimer(SURVAIL_PERIOD_IN_MILLISECOND, action, nullptr, timerId_);
48 if (errCode != E_OK) {
49 LOGE("[Retainer][Init] Set timer fail, errCode=%d.", errCode);
50 return;
51 }
52 isTimerWork_ = true;
53 }
54
Finalize()55 void FrameRetainer::Finalize()
56 {
57 RuntimeContext *context = RuntimeContext::GetInstance();
58 if (context == nullptr) {
59 return; // Never gonna happen, context always be valid.
60 }
61 // First: Stop the timer
62 if (isTimerWork_) {
63 // After return, the timer rely no more on retainer.
64 context->RemoveTimer(timerId_, true);
65 isTimerWork_ = false;
66 }
67 // Second: Clear the retainWorkPool_
68 for (auto &eachLabel : retainWorkPool_) {
69 for (auto &eachTarget : eachLabel.second) {
70 for (auto &eachFrame : eachTarget.second) {
71 LogRetainInfo("[Retainer][Final] DISCARD", eachLabel.first, eachTarget.first, eachFrame.first,
72 eachFrame.second);
73 delete eachFrame.second.buffer;
74 eachFrame.second.buffer = nullptr;
75 }
76 }
77 }
78 retainWorkPool_.clear();
79 totalSizeByByte_ = 0;
80 totalRetainFrames_ = 0;
81 }
82
RetainFrame(const FrameInfo & inFrame)83 void FrameRetainer::RetainFrame(const FrameInfo &inFrame)
84 {
85 if (inFrame.buffer == nullptr) {
86 return; // Never gonna happen
87 }
88 RetainWork work{inFrame.buffer, inFrame.frameId, MAX_RETAIN_TIME};
89 if (work.buffer->GetSize() > MAX_RETAIN_FRAME_SIZE) {
90 LOGE("[Retainer][Retain] Frame size=%u over limit=%u.", work.buffer->GetSize(), MAX_RETAIN_FRAME_SIZE);
91 delete work.buffer;
92 work.buffer = nullptr;
93 return;
94 }
95 int errCode = work.buffer->ConvertForCrossThread();
96 if (errCode != E_OK) {
97 LOGE("[Retainer][Retain] ConvertForCrossThread fail, errCode=%d.", errCode);
98 delete work.buffer;
99 work.buffer = nullptr;
100 return;
101 }
102
103 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
104 std::map<uint64_t, RetainWork> &perLabelPerTarget = retainWorkPool_[inFrame.commLabel][inFrame.srcTarget];
105 if (perLabelPerTarget.size() >= MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET) {
106 // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
107 auto iter = perLabelPerTarget.begin();
108 LogRetainInfo("[Retainer][Retain] DISCARD", inFrame.commLabel, inFrame.srcTarget, iter->first, iter->second);
109 totalSizeByByte_ -= iter->second.buffer->GetSize();
110 totalRetainFrames_--;
111 delete iter->second.buffer;
112 iter->second.buffer = nullptr;
113 perLabelPerTarget.erase(iter);
114 }
115 // Retain the new frame, update the statistics
116 perLabelPerTarget[incRetainOrder_++] = work;
117 totalSizeByByte_ += inFrame.buffer->GetSize();
118 totalRetainFrames_++;
119 // Discard obsolete frames until totalSize under capacity.
120 DiscardObsoleteFramesIfNeed();
121 // Display the final statistics
122 LOGI("[Retainer][Retain] Order=%llu. Statistics: TOTAL_BYTE=%u, TOTAL_FRAME=%u.", ULL(incRetainOrder_ - 1),
123 totalSizeByByte_, totalRetainFrames_);
124 }
125
FetchFramesForSpecificCommunicator(const LabelType & inCommLabel)126 std::list<FrameInfo> FrameRetainer::FetchFramesForSpecificCommunicator(const LabelType &inCommLabel)
127 {
128 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
129 std::list<FrameInfo> outFrameList;
130 if (retainWorkPool_.count(inCommLabel) == 0) {
131 return outFrameList;
132 }
133 auto &perLabel = retainWorkPool_[inCommLabel];
134 std::map<uint64_t, std::string> fetchOrder;
135 for (const auto &eachTarget : perLabel) {
136 for (const auto &eachFrame : eachTarget.second) {
137 fetchOrder[eachFrame.first] = eachTarget.first;
138 }
139 }
140 for (auto &entry : fetchOrder) {
141 RetainWork &work = perLabel[entry.second][entry.first];
142 LogRetainInfo("[Retainer][Fetch] FETCH-OUT", inCommLabel, entry.second, entry.first, work);
143 outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, inCommLabel, work.frameId});
144 // Update statistics
145 totalSizeByByte_ -= work.buffer->GetSize();
146 totalRetainFrames_--;
147 }
148 retainWorkPool_.erase(inCommLabel);
149 return outFrameList;
150 }
151
PeriodicalSurveillance()152 void FrameRetainer::PeriodicalSurveillance()
153 {
154 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
155 // First: Discard overtime frames.
156 for (auto &eachLabel : retainWorkPool_) {
157 for (auto &eachTarget : eachLabel.second) {
158 std::set<uint64_t> frameToDiscard;
159 for (auto &eachFrame : eachTarget.second) {
160 // Decrease remainTime and discard if need. The remainTime will not be zero before decrease.
161 eachFrame.second.remainTime--;
162 if (eachFrame.second.remainTime == 0) {
163 LogRetainInfo("[Retainer][Surveil] DISCARD", eachLabel.first, eachTarget.first, eachFrame.first,
164 eachFrame.second);
165 totalSizeByByte_ -= eachFrame.second.buffer->GetSize();
166 totalRetainFrames_--;
167 // Free this retain work first
168 delete eachFrame.second.buffer;
169 eachFrame.second.buffer = nullptr;
170 // Record this frame in discard list
171 frameToDiscard.insert(eachFrame.first);
172 }
173 }
174 // Remove the retain work from frameMap.
175 for (auto &entry : frameToDiscard) {
176 eachTarget.second.erase(entry);
177 }
178 }
179 }
180 // Second: Shrink the retainWorkPool_
181 ShrinkRetainWorkPool();
182 }
183
DiscardObsoleteFramesIfNeed()184 void FrameRetainer::DiscardObsoleteFramesIfNeed()
185 {
186 if (totalSizeByByte_ <= MAX_CAPACITY) {
187 return;
188 }
189 std::map<uint64_t, std::pair<LabelType, std::string>> discardOrder;
190 // Sort all the frames by their retain order ascendingly
191 for (const auto &eachLabel : retainWorkPool_) {
192 for (const auto &eachTarget : eachLabel.second) {
193 for (const auto &eachFrame : eachTarget.second) {
194 discardOrder[eachFrame.first] = {eachLabel.first, eachTarget.first};
195 }
196 }
197 }
198 // Discard obsolete frames until totalSize under capacity.
199 while (totalSizeByByte_ > MAX_CAPACITY) {
200 if (discardOrder.empty()) { // Unlikely to happen
201 LOGE("[Retainer][Discard] Internal Error: Byte=%u, Frames=%u.", totalSizeByByte_, totalRetainFrames_);
202 return;
203 }
204 auto iter = discardOrder.begin();
205 RetainWork &workRef = retainWorkPool_[iter->second.first][iter->second.second][iter->first];
206 LogRetainInfo("[Retainer][Discard] DISCARD", iter->second.first, iter->second.second, iter->first, workRef);
207 // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
208 totalSizeByByte_ -= workRef.buffer->GetSize();
209 totalRetainFrames_--;
210 delete workRef.buffer;
211 workRef.buffer = nullptr;
212 retainWorkPool_[iter->second.first][iter->second.second].erase(iter->first);
213 // Remove from the discardOrder
214 discardOrder.erase(iter);
215 }
216 // Shrink the retainWorkPool_ to remove out empty node on the map
217 ShrinkRetainWorkPool();
218 }
219
ShrinkRetainWorkPool()220 void FrameRetainer::ShrinkRetainWorkPool()
221 {
222 std::set<LabelType> emptyLabel;
223 for (auto &eachLabel : retainWorkPool_) {
224 std::set<std::string> emptyTarget;
225 for (auto &eachTarget : eachLabel.second) {
226 // Record corresponding target if its frameMap empty.
227 if (eachTarget.second.empty()) {
228 emptyTarget.insert(eachTarget.first);
229 }
230 }
231 // Remove the empty frameMap from the targetMap. Record corresponding label if its targetMap empty.
232 for (auto &entry : emptyTarget) {
233 eachLabel.second.erase(entry);
234 }
235 if (eachLabel.second.empty()) {
236 emptyLabel.insert(eachLabel.first);
237 }
238 }
239 // Remove the empty targetMap from retainWorkPool_
240 for (auto &entry : emptyLabel) {
241 retainWorkPool_.erase(entry);
242 }
243 }
244 } // namespace DistributedDB