• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_data_message_schedule.h"
16 
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "version.h"
20 #include "single_ver_data_sync.h"
21 
22 namespace DistributedDB {
~SingleVerDataMessageSchedule()23 SingleVerDataMessageSchedule::~SingleVerDataMessageSchedule()
24 {
25     LOGD("~SingleVerDataMessageSchedule");
26     ClearMsg();
27 }
28 
Initialize(const std::string & label,const std::string & deviceId)29 void SingleVerDataMessageSchedule::Initialize(const std::string &label, const std::string &deviceId)
30 {
31     label_ = label;
32     deviceId_ = deviceId;
33 }
34 
PutMsg(Message * inMsg)35 void SingleVerDataMessageSchedule::PutMsg(Message *inMsg)
36 {
37     if (inMsg == nullptr) {
38         return;
39     }
40     std::lock_guard<std::mutex> lock(queueLock_);
41     msgQueue_.push(inMsg);
42     isNeedReload_ = true;
43 }
44 
IsNeedReloadQueue()45 bool SingleVerDataMessageSchedule::IsNeedReloadQueue()
46 {
47     std::lock_guard<std::mutex> lock(queueLock_);
48     return isNeedReload_;
49 }
50 
MoveNextMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)51 Message *SingleVerDataMessageSchedule::MoveNextMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
52     bool &isNeedContinue)
53 {
54     uint32_t remoteVersion = context->GetRemoteSoftwareVersion();
55     if (remoteVersion < SOFTWARE_VERSION_RELEASE_3_0) {
56         // just get last msg when remote version is < 103 or >=103 but just open db now
57         return GetLastMsgFromQueue();
58     }
59     {
60         std::lock_guard<std::mutex> lock(workingLock_);
61         if (isWorking_) {
62             isNeedContinue = false;
63             return nullptr;
64         }
65         isWorking_ = true;
66     }
67     ResetTimer(context);
68     UpdateMsgMap();
69     Message *msg = GetMsgFromMap(isNeedHandle);
70     isNeedContinue = true;
71     if (msg == nullptr) {
72         StopTimer();
73         std::lock_guard<std::mutex> lock(workingLock_);
74         isWorking_ = false;
75         return nullptr;
76     }
77     return msg;
78 }
79 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * inMsg)80 void SingleVerDataMessageSchedule::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap,
81     const Message *inMsg)
82 {
83     if (isNeedHandleStatus) {
84         const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
85         if (packet == nullptr) {
86             LOGE("[DataMsgSchedule] packet is nullptr");
87             return;
88         }
89         uint64_t curPacketId = packet->GetPacketId();
90         {
91             std::lock_guard<std::mutex> lock(lock_);
92             finishedPacketId_ = curPacketId;
93             if (isNeedClearMap) {
94                 ClearMsgMapWithNoLock();
95                 expectedSequenceId_ = 1;
96             } else {
97                 LOGI("[DataMsgSchedule] DealMsg seqId=%" PRIu32 " finishedPacketId=%" PRIu64 " ok,label=%s,dev=%s",
98                     expectedSequenceId_, finishedPacketId_, label_.c_str(), STR_MASK(deviceId_));
99                 expectedSequenceId_++;
100             }
101         }
102     }
103     std::lock_guard<std::mutex> lock(workingLock_);
104     isWorking_ = false;
105 }
106 
ClearMsg()107 void SingleVerDataMessageSchedule::ClearMsg()
108 {
109     StopTimer();
110     ClearMsgQueue();
111     ClearMsgMap();
112 }
113 
UpdateMsgMap()114 void SingleVerDataMessageSchedule::UpdateMsgMap()
115 {
116     std::queue<Message *> msgTmpQueue;
117     {
118         std::lock_guard<std::mutex> lock(queueLock_);
119         while (!msgQueue_.empty()) {
120             msgTmpQueue.push(msgQueue_.front());
121             msgQueue_.pop();
122         }
123         isNeedReload_ = false;
124     }
125     UpdateMsgMapInner(msgTmpQueue);
126 }
127 
UpdateMsgMapInner(std::queue<Message * > & msgTmpQueue)128 void SingleVerDataMessageSchedule::UpdateMsgMapInner(std::queue<Message *> &msgTmpQueue)
129 {
130     // update msg map
131     std::lock_guard<std::mutex> lock(lock_);
132     while (!msgTmpQueue.empty()) {
133         Message *msg = msgTmpQueue.front();
134         msgTmpQueue.pop();
135         // insert new msg into map and delete old msg
136         int errCode = UpdateMsgMapIfNeed(msg);
137         if (errCode != E_OK) {
138             delete msg;
139         }
140     }
141 }
142 
GetMsgFromMap(bool & isNeedHandle)143 Message *SingleVerDataMessageSchedule::GetMsgFromMap(bool &isNeedHandle)
144 {
145     isNeedHandle = true;
146     std::lock_guard<std::mutex> lock(lock_);
147     while (!messageMap_.empty()) {
148         auto iter = messageMap_.begin();
149         Message *msg = iter->second;
150         messageMap_.erase(iter);
151         const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
152         if (packet == nullptr) {
153             LOGE("[DataMsgSchedule] expected error");
154             delete msg;
155             continue;
156         }
157         uint32_t sequenceId = msg->GetSequenceId();
158         uint64_t packetId = packet->GetPacketId();
159         if (sequenceId < expectedSequenceId_) {
160             uint64_t revisePacketId = finishedPacketId_ - (expectedSequenceId_ - 1 - sequenceId);
161             LOGI("[DataMsgSchedule] drop msg because seqId less than exSeqId");
162             if (packetId < revisePacketId) {
163                 delete msg;
164                 continue;
165             }
166             // means already handle the msg, and just send E_OK ack in dataSync
167             isNeedHandle = false;
168             return msg;
169         }
170         if (sequenceId == expectedSequenceId_) {
171             if (packetId < finishedPacketId_) {
172                 LOGI("[DataMsgSchedule] drop msg because packetId less than finishedPacketId");
173                 delete msg;
174                 continue;
175             }
176             // if packetId == finishedPacketId_ need handle
177             // it will happened while watermark/need_abilitySync when last ack is missing
178             return msg;
179         }
180         // sequenceId > expectedSequenceId_, not need handle, put into map again
181         messageMap_[sequenceId] = msg;
182         return nullptr;
183     }
184     return nullptr;
185 }
186 
GetLastMsgFromQueue()187 Message *SingleVerDataMessageSchedule::GetLastMsgFromQueue()
188 {
189     std::lock_guard<std::mutex> lock(queueLock_);
190     isNeedReload_ = false;
191     while (!msgQueue_.empty()) {
192         Message *msg = msgQueue_.front();
193         msgQueue_.pop();
194         if (msgQueue_.empty()) { // means last msg
195             return msg;
196         }
197         delete msg;
198     }
199     return nullptr;
200 }
201 
ClearMsgMap()202 void SingleVerDataMessageSchedule::ClearMsgMap()
203 {
204     std::lock_guard<std::mutex> lock(lock_);
205     ClearMsgMapWithNoLock();
206 }
207 
ClearMsgMapWithNoLock()208 void SingleVerDataMessageSchedule::ClearMsgMapWithNoLock()
209 {
210     LOGD("[DataMsgSchedule] begin to ClearMsgMapWithNoLock");
211     for (auto &iter : messageMap_) {
212         delete iter.second;
213         iter.second = nullptr;
214     }
215     messageMap_.clear();
216 }
217 
ClearMsgQueue()218 void SingleVerDataMessageSchedule::ClearMsgQueue()
219 {
220     std::lock_guard<std::mutex> lock(queueLock_);
221     while (!msgQueue_.empty()) {
222         Message *msg = msgQueue_.front();
223         msgQueue_.pop();
224         delete msg;
225     }
226 }
227 
StartTimer(SingleVerSyncTaskContext * context)228 void SingleVerDataMessageSchedule::StartTimer(SingleVerSyncTaskContext *context)
229 {
230     std::lock_guard<std::mutex> lock(lock_);
231     TimerId timerId = 0;
232     RefObject::IncObjRef(context);
233     TimerAction timeOutCallback = std::bind(&SingleVerDataMessageSchedule::TimeOut, this, std::placeholders::_1);
234     int errCode = RuntimeContext::GetInstance()->SetTimer(IDLE_TIME_OUT, timeOutCallback,
235         [context]() {
236             int errCode = RuntimeContext::GetInstance()->ScheduleTask([context]() {
237                 RefObject::DecObjRef(context);
238             });
239             if (errCode != E_OK) {
240                 LOGE("[DataMsgSchedule] timer finalizer ScheduleTask,errCode=%d", errCode);
241             }
242         }, timerId);
243     if (errCode != E_OK) {
244         RefObject::DecObjRef(context);
245         LOGE("[DataMsgSchedule] timer ScheduleTask, errCode=%d", errCode);
246         return;
247     }
248     timerId_ = timerId;
249     LOGD("[DataMsgSchedule] StartTimer,TimerId=%" PRIu64, timerId_);
250 }
251 
StopTimer()252 void SingleVerDataMessageSchedule::StopTimer()
253 {
254     TimerId timerId;
255     {
256         std::lock_guard<std::mutex> lock(lock_);
257         LOGD("[DataMsgSchedule] StopTimer,remove TimerId=%" PRIu64, timerId_);
258         if (timerId_ == 0) {
259             return;
260         }
261         timerId = timerId_;
262         timerId_ = 0;
263     }
264     RuntimeContext::GetInstance()->RemoveTimer(timerId);
265 }
266 
ResetTimer(SingleVerSyncTaskContext * context)267 void SingleVerDataMessageSchedule::ResetTimer(SingleVerSyncTaskContext *context)
268 {
269     StopTimer();
270     StartTimer(context);
271 }
272 
TimeOut(TimerId timerId)273 int SingleVerDataMessageSchedule::TimeOut(TimerId timerId)
274 {
275     if (IsNeedReloadQueue()) {
276         LOGI("[DataMsgSchedule] new msg exists, no need to timeout handle");
277         return E_OK;
278     }
279     {
280         std::lock_guard<std::mutex> lock(workingLock_);
281         if (isWorking_) {
282             LOGI("[DataMsgSchedule] other thread is handle msg, no need to timeout handle");
283             return E_OK;
284         }
285     }
286     {
287         std::lock_guard<std::mutex> lock(lock_);
288         LOGI("[DataMsgSchedule] timeout handling, stop timerId_[%" PRIu64 "]", timerId);
289         if (timerId == timerId_) {
290             ClearMsgMapWithNoLock();
291             timerId_ = 0;
292         }
293     }
294     RuntimeContext::GetInstance()->RemoveTimer(timerId);
295     return E_OK;
296 }
297 
UpdateMsgMapIfNeed(Message * msg)298 int SingleVerDataMessageSchedule::UpdateMsgMapIfNeed(Message *msg)
299 {
300     if (msg == nullptr) {
301         return -E_INVALID_ARGS;
302     }
303     const DataRequestPacket *packet = msg->GetObject<DataRequestPacket>();
304     if (packet == nullptr) {
305         return -E_INVALID_ARGS;
306     }
307     uint32_t sessionId = msg->GetSessionId();
308     uint32_t sequenceId = msg->GetSequenceId();
309     uint64_t packetId = packet->GetPacketId();
310     if (prevSessionId_ != 0 && sessionId == prevSessionId_) {
311         LOGD("[DataMsgSchedule] recv prev sessionId msg, drop msg, label=%s, dev=%s", label_.c_str(),
312             STR_MASK(deviceId_));
313         return -E_INVALID_ARGS;
314     }
315     if (sessionId != currentSessionId_) {
316         // make sure all msg sessionId is same in msgMap
317         ClearMsgMapWithNoLock();
318         prevSessionId_ = currentSessionId_;
319         currentSessionId_ = sessionId;
320         finishedPacketId_ = 0;
321         expectedSequenceId_ = 1;
322     }
323     if (messageMap_.count(sequenceId) > 0) {
324         const auto *cachePacket = messageMap_[sequenceId]->GetObject<DataRequestPacket>();
325         if (cachePacket != nullptr) {
326             if (packetId != 0 && packetId < cachePacket->GetPacketId()) {
327                 LOGD("[DataMsgSchedule] drop msg packetId=%" PRIu64 ", cachePacketId=%" PRIu64 ", label=%s, dev=%s",
328                     packetId, cachePacket->GetPacketId(), label_.c_str(), STR_MASK(deviceId_));
329                 return -E_INVALID_ARGS;
330             }
331         }
332         delete messageMap_[sequenceId];
333         messageMap_[sequenceId] = nullptr;
334     }
335     messageMap_[sequenceId] = msg;
336     LOGD("[DataMsgSchedule] put into msgMap seqId=%" PRIu32 ", packetId=%" PRIu64 ", label=%s, dev=%s", sequenceId,
337         packetId, label_.c_str(), STR_MASK(deviceId_));
338     return E_OK;
339 }
340 }