1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_data_message_schedule.h"
16
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "version.h"
20 #include "single_ver_data_sync.h"
21
22 namespace DistributedDB {
~SingleVerDataMessageSchedule()23 SingleVerDataMessageSchedule::~SingleVerDataMessageSchedule()
24 {
25 LOGD("~SingleVerDataMessageSchedule");
26 ClearMsg();
27 }
28
Initialize(const std::string & label,const std::string & deviceId)29 void SingleVerDataMessageSchedule::Initialize(const std::string &label, const std::string &deviceId)
30 {
31 label_ = label;
32 deviceId_ = deviceId;
33 }
34
PutMsg(Message * inMsg)35 void SingleVerDataMessageSchedule::PutMsg(Message *inMsg)
36 {
37 if (inMsg == nullptr) {
38 return;
39 }
40 std::lock_guard<std::mutex> lock(queueLock_);
41 msgQueue_.push(inMsg);
42 isNeedReload_ = true;
43 }
44
IsNeedReloadQueue()45 bool SingleVerDataMessageSchedule::IsNeedReloadQueue()
46 {
47 std::lock_guard<std::mutex> lock(queueLock_);
48 return isNeedReload_;
49 }
50
MoveNextMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)51 Message *SingleVerDataMessageSchedule::MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
52 bool &isNeedContinue)
53 {
54 uint32_t remoteVersion = context->GetRemoteSoftwareVersion();
55 if (remoteVersion < SOFTWARE_VERSION_RELEASE_3_0) {
56 // just get last msg when remote version is < 103 or >=103 but just open db now
57 return GetLastMsgFromQueue();
58 }
59 {
60 std::lock_guard<std::mutex> lock(workingLock_);
61 if (isWorking_) {
62 isNeedContinue = false;
63 return nullptr;
64 }
65 isWorking_ = true;
66 }
67 ResetTimer(context);
68 UpdateMsgMap();
69 Message *msg = GetMsgFromMap(isNeedHandle);
70 isNeedContinue = true;
71 if (msg == nullptr) {
72 StopTimer();
73 std::lock_guard<std::mutex> lock(workingLock_);
74 isWorking_ = false;
75 return nullptr;
76 }
77 return msg;
78 }
79
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * inMsg)80 void SingleVerDataMessageSchedule::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap,
81 const Message *inMsg)
82 {
83 if (isNeedHandleStatus) {
84 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
85 if (packet == nullptr) {
86 LOGE("[DataMsgSchedule] packet is nullptr");
87 return;
88 }
89 uint64_t curPacketId = packet->GetPacketId();
90 {
91 std::lock_guard<std::mutex> lock(lock_);
92 finishedPacketId_ = curPacketId;
93 if (isNeedClearMap) {
94 ClearMsgMapWithNoLock();
95 expectedSequenceId_ = 1;
96 } else {
97 LOGI("[DataMsgSchedule] DealMsg seqId=%" PRIu32 " finishedPacketId=%" PRIu64 " ok,label=%s,dev=%s",
98 expectedSequenceId_, finishedPacketId_, label_.c_str(), STR_MASK(deviceId_));
99 expectedSequenceId_++;
100 }
101 }
102 }
103 std::lock_guard<std::mutex> lock(workingLock_);
104 isWorking_ = false;
105 }
106
ClearMsg()107 void SingleVerDataMessageSchedule::ClearMsg()
108 {
109 StopTimer();
110 ClearMsgQueue();
111 ClearMsgMap();
112 }
113
UpdateMsgMap()114 void SingleVerDataMessageSchedule::UpdateMsgMap()
115 {
116 std::queue<Message *> msgTmpQueue;
117 {
118 std::lock_guard<std::mutex> lock(queueLock_);
119 while (!msgQueue_.empty()) {
120 msgTmpQueue.push(msgQueue_.front());
121 msgQueue_.pop();
122 }
123 isNeedReload_ = false;
124 }
125 UpdateMsgMapInner(msgTmpQueue);
126 }
127
UpdateMsgMapInner(std::queue<Message * > & msgTmpQueue)128 void SingleVerDataMessageSchedule::UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue)
129 {
130 // update msg map
131 std::lock_guard<std::mutex> lock(lock_);
132 while (!msgTmpQueue.empty()) {
133 Message *msg = msgTmpQueue.front();
134 msgTmpQueue.pop();
135 // insert new msg into map and delete old msg
136 int errCode = UpdateMsgMapIfNeed(msg);
137 if (errCode != E_OK) {
138 delete msg;
139 }
140 }
141 }
142
GetMsgFromMap(bool & isNeedHandle)143 Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle)
144 {
145 isNeedHandle = true;
146 std::lock_guard<std::mutex> lock(lock_);
147 while (!messageMap_.empty()) {
148 auto iter = messageMap_.begin();
149 Message *msg = iter->second;
150 messageMap_.erase(iter);
151 const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
152 if (packet == nullptr) {
153 LOGE("[DataMsgSchedule] expected error");
154 delete msg;
155 continue;
156 }
157 uint32_t sequenceId = msg->GetSequenceId();
158 uint64_t packetId = packet->GetPacketId();
159 if (sequenceId < expectedSequenceId_) {
160 uint64_t revisePacketId = finishedPacketId_ - (expectedSequenceId_ - 1 - sequenceId);
161 LOGI("[DataMsgSchedule] drop msg because seqId less than exSeqId");
162 if (packetId < revisePacketId) {
163 delete msg;
164 continue;
165 }
166 // means already handle the msg, and just send E_OK ack in dataSync
167 isNeedHandle = false;
168 return msg;
169 }
170 if (sequenceId == expectedSequenceId_) {
171 if (packetId < finishedPacketId_) {
172 LOGI("[DataMsgSchedule] drop msg because packetId less than finishedPacketId");
173 delete msg;
174 continue;
175 }
176 // if packetId == finishedPacketId_ need handle
177 // it will happened while watermark/need_abilitySync when last ack is missing
178 return msg;
179 }
180 // sequenceId > expectedSequenceId_, not need handle, put into map again
181 messageMap_[sequenceId] = msg;
182 return nullptr;
183 }
184 return nullptr;
185 }
186
GetLastMsgFromQueue()187 Message *SingleVerDataMessageSchedule::GetLastMsgFromQueue()
188 {
189 std::lock_guard<std::mutex> lock(queueLock_);
190 isNeedReload_ = false;
191 while (!msgQueue_.empty()) {
192 Message *msg = msgQueue_.front();
193 msgQueue_.pop();
194 if (msgQueue_.empty()) { // means last msg
195 return msg;
196 }
197 delete msg;
198 }
199 return nullptr;
200 }
201
ClearMsgMap()202 void SingleVerDataMessageSchedule::ClearMsgMap()
203 {
204 std::lock_guard<std::mutex> lock(lock_);
205 ClearMsgMapWithNoLock();
206 }
207
ClearMsgMapWithNoLock()208 void SingleVerDataMessageSchedule::ClearMsgMapWithNoLock()
209 {
210 LOGD("[DataMsgSchedule] begin to ClearMsgMapWithNoLock");
211 for (auto &iter : messageMap_) {
212 delete iter.second;
213 iter.second = nullptr;
214 }
215 messageMap_.clear();
216 }
217
ClearMsgQueue()218 void SingleVerDataMessageSchedule::ClearMsgQueue()
219 {
220 std::lock_guard<std::mutex> lock(queueLock_);
221 while (!msgQueue_.empty()) {
222 Message *msg = msgQueue_.front();
223 msgQueue_.pop();
224 delete msg;
225 }
226 }
227
StartTimer(SingleVerSyncTaskContext * context)228 void SingleVerDataMessageSchedule::StartTimer(SingleVerSyncTaskContext *context)
229 {
230 std::lock_guard<std::mutex> lock(lock_);
231 TimerId timerId = 0;
232 RefObject::IncObjRef(context);
233 TimerAction timeOutCallback = std::bind(&SingleVerDataMessageSchedule::TimeOut, this, std::placeholders::_1);
234 int errCode = RuntimeContext::GetInstance()->SetTimer(IDLE_TIME_OUT, timeOutCallback,
235 [context]() {
236 int errCode = RuntimeContext::GetInstance()->ScheduleTask([context]() {
237 RefObject::DecObjRef(context);
238 });
239 if (errCode != E_OK) {
240 LOGE("[DataMsgSchedule] timer finalizer ScheduleTask,errCode=%d", errCode);
241 }
242 }, timerId);
243 if (errCode != E_OK) {
244 RefObject::DecObjRef(context);
245 LOGE("[DataMsgSchedule] timer ScheduleTask, errCode=%d", errCode);
246 return;
247 }
248 timerId_ = timerId;
249 LOGD("[DataMsgSchedule] StartTimer,TimerId=%" PRIu64, timerId_);
250 }
251
StopTimer()252 void SingleVerDataMessageSchedule::StopTimer()
253 {
254 TimerId timerId;
255 {
256 std::lock_guard<std::mutex> lock(lock_);
257 LOGD("[DataMsgSchedule] StopTimer,remove TimerId=%" PRIu64, timerId_);
258 if (timerId_ == 0) {
259 return;
260 }
261 timerId = timerId_;
262 timerId_ = 0;
263 }
264 RuntimeContext::GetInstance()->RemoveTimer(timerId);
265 }
266
ResetTimer(SingleVerSyncTaskContext * context)267 void SingleVerDataMessageSchedule::ResetTimer(SingleVerSyncTaskContext *context)
268 {
269 StopTimer();
270 StartTimer(context);
271 }
272
TimeOut(TimerId timerId)273 int SingleVerDataMessageSchedule::TimeOut(TimerId timerId)
274 {
275 if (IsNeedReloadQueue()) {
276 LOGI("[DataMsgSchedule] new msg exists, no need to timeout handle");
277 return E_OK;
278 }
279 {
280 std::lock_guard<std::mutex> lock(workingLock_);
281 if (isWorking_) {
282 LOGI("[DataMsgSchedule] other thread is handle msg, no need to timeout handle");
283 return E_OK;
284 }
285 }
286 {
287 std::lock_guard<std::mutex> lock(lock_);
288 LOGI("[DataMsgSchedule] timeout handling, stop timerId_[%" PRIu64 "]", timerId);
289 if (timerId == timerId_) {
290 ClearMsgMapWithNoLock();
291 timerId_ = 0;
292 }
293 }
294 RuntimeContext::GetInstance()->RemoveTimer(timerId);
295 return E_OK;
296 }
297
UpdateMsgMapIfNeed(Message * msg)298 int SingleVerDataMessageSchedule::UpdateMsgMapIfNeed(Message *msg)
299 {
300 if (msg == nullptr) {
301 return -E_INVALID_ARGS;
302 }
303 const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
304 if (packet == nullptr) {
305 return -E_INVALID_ARGS;
306 }
307 uint32_t sessionId = msg->GetSessionId();
308 uint32_t sequenceId = msg->GetSequenceId();
309 uint64_t packetId = packet->GetPacketId();
310 if (prevSessionId_ != 0 && sessionId == prevSessionId_) {
311 LOGD("[DataMsgSchedule] recv prev sessionId msg, drop msg, label=%s, dev=%s", label_.c_str(),
312 STR_MASK(deviceId_));
313 return -E_INVALID_ARGS;
314 }
315 if (sessionId != currentSessionId_) {
316 // make sure all msg sessionId is same in msgMap
317 ClearMsgMapWithNoLock();
318 prevSessionId_ = currentSessionId_;
319 currentSessionId_ = sessionId;
320 finishedPacketId_ = 0;
321 expectedSequenceId_ = 1;
322 }
323 if (messageMap_.count(sequenceId) > 0) {
324 const auto *cachePacket = messageMap_[sequenceId]->GetObject<DataRequestPacket>();
325 if (cachePacket != nullptr) {
326 if (packetId != 0 && packetId < cachePacket->GetPacketId()) {
327 LOGD("[DataMsgSchedule] drop msg packetId=%" PRIu64 ", cachePacketId=%" PRIu64 ", label=%s, dev=%s",
328 packetId, cachePacket->GetPacketId(), label_.c_str(), STR_MASK(deviceId_));
329 return -E_INVALID_ARGS;
330 }
331 }
332 delete messageMap_[sequenceId];
333 messageMap_[sequenceId] = nullptr;
334 }
335 messageMap_[sequenceId] = msg;
336 LOGD("[DataMsgSchedule] put into msgMap seqId=%" PRIu32 ", packetId=%" PRIu64 ", label=%s, dev=%s", sequenceId,
337 packetId, label_.c_str(), STR_MASK(deviceId_));
338 return E_OK;
339 }
340 }