• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_sync_state_machine.h"
18 
19 #include <cmath>
20 #include <climits>
21 #include <algorithm>
22 
23 #include "message_transform.h"
24 #include "log_print.h"
25 #include "sync_types.h"
26 #include "db_common.h"
27 #include "ref_object.h"
28 #include "performance_analysis.h"
29 
30 namespace DistributedDB {
31 namespace {
ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32 void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset)
33 {
34     for (MultiVerKvEntry *entry : entries) {
35         if (entry == nullptr) {
36             continue;
37         }
38         Timestamp timestamp;
39         entry->GetTimestamp(timestamp);
40         timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset);
41         entry->SetTimestamp(timestamp);
42     }
43 }
44 }
45 std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_;
MultiVerSyncStateMachine()46 MultiVerSyncStateMachine::MultiVerSyncStateMachine()
47     : context_(nullptr),
48       multiVerStorage_(nullptr),
49       timeSync_(nullptr),
50       commitHistorySync_(nullptr),
51       multiVerDataSync_(nullptr),
52       valueSliceSync_(nullptr)
53 {
54 }
55 
~MultiVerSyncStateMachine()56 MultiVerSyncStateMachine::~MultiVerSyncStateMachine()
57 {
58     Clear();
59 }
60 
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61 int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
62     std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
63 {
64     if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) {
65         return -E_INVALID_ARGS;
66     }
67     int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator);
68     if (errCode != E_OK) {
69         return errCode;
70     }
71 
72     timeSync_ = std::make_unique<TimeSync>();
73     commitHistorySync_ = std::make_unique<CommitHistorySync>();
74     multiVerDataSync_ = std::make_unique<MultiVerDataSync>();
75     valueSliceSync_ = std::make_unique<ValueSliceSync>();
76 
77     errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId());
78     if (errCode != E_OK) {
79         LOGE("timeSync_->Initialize failed err %d", errCode);
80         goto ERROR_OUT;
81     }
82     LOGD("timeSync_->Initialize OK");
83 
84     // init functions below will never fail
85     multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface);
86     commitHistorySync_->Initialize(multiVerStorage_, communicator);
87     multiVerDataSync_->Initialize(multiVerStorage_, communicator);
88     valueSliceSync_->Initialize(multiVerStorage_, communicator);
89 
90     context_ = static_cast<MultiVerSyncTaskContext *>(context);
91     currentState_ = IDLE;
92     (void)timeSync_->SyncStart();
93     return E_OK;
94 
95 ERROR_OUT:
96     Clear();
97     return errCode;
98 }
99 
SyncStep()100 void MultiVerSyncStateMachine::SyncStep()
101 {
102     RefObject::IncObjRef(context_);
103     RefObject::IncObjRef(communicator_);
104     int errCode = RuntimeContext::GetInstance()->ScheduleTask(
105         std::bind(&MultiVerSyncStateMachine::SyncStepInnerLocked, this));
106     if (errCode != E_OK) {
107         LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed");
108         RefObject::DecObjRef(communicator_);
109         RefObject::DecObjRef(context_);
110     }
111 }
112 
StepToIdle()113 void MultiVerSyncStateMachine::StepToIdle()
114 {
115     currentState_ = IDLE;
116     StopWatchDog();
117     context_->Clear();
118     PerformanceAnalysis::GetInstance()->TimeRecordEnd();
119     LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId()));
120 }
121 
MessageCallbackCheck(const Message * inMsg)122 int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg)
123 {
124     RefObject::AutoLock lock(context_);
125     if (context_->IsKilled()) {
126         return -E_OBJ_IS_KILLED;
127     }
128     if (!IsPacketValid(inMsg)) {
129         return -E_INVALID_ARGS;
130     }
131     if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
132         context_->IncSequenceId();
133         int errCode = ResetWatchDog();
134         if (errCode != E_OK) {
135             LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode);
136         }
137     }
138     return E_OK;
139 }
140 
ReceiveMessageCallback(Message * inMsg)141 int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
142 {
143     if (inMsg == nullptr) {
144         return -E_INVALID_ARGS;
145     }
146     if (inMsg->IsFeedbackError()) {
147         LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
148         return -static_cast<int>(inMsg->GetErrorNo());
149     }
150     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
151         return TimeSyncPacketRecvCallback(context_, inMsg);
152     }
153     std::lock_guard<std::mutex> lock(stateMachineLock_);
154     int errCode = MessageCallbackCheck(inMsg);
155     if (errCode != E_OK) {
156         return errCode;
157     }
158     switch (inMsg->GetMessageId()) {
159         case COMMIT_HISTORY_SYNC_MESSAGE:
160             errCode = CommitHistorySyncPktRecvCallback(context_, inMsg);
161             if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) {
162                 SyncResponseBegin(inMsg->GetSessionId());
163             }
164             break;
165         case MULTI_VER_DATA_SYNC_MESSAGE:
166             errCode = MultiVerDataPktRecvCallback(context_, inMsg);
167             break;
168         case VALUE_SLICE_SYNC_MESSAGE:
169             errCode = ValueSlicePktRecvCallback(context_, inMsg);
170             break;
171         default:
172             errCode = -E_NOT_SUPPORT;
173             break;
174     }
175     if (errCode == -E_LAST_SYNC_FRAME) {
176         SyncResponseEnd(inMsg->GetSessionId());
177         return errCode;
178     }
179     if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) {
180         Abort();
181     }
182     return errCode;
183 }
184 
StepToTimeout(TimerId timerId)185 void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId)
186 {
187     {
188         std::lock_guard<std::mutex> lock(stateMachineLock_);
189         TimerId timer = syncContext_->GetTimerId();
190         if (timer != timerId) {
191             return;
192         }
193         currentState_ = SYNC_TIME_OUT;
194     }
195     Abort();
196 }
197 
CommitHistorySyncStepInner(void)198 int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void)
199 {
200     int errCode = commitHistorySync_->SyncStart(context_);
201     if (errCode != E_OK) {
202         LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode);
203     }
204     return errCode;
205 }
206 
MultiVerDataSyncStepInner(void)207 int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void)
208 {
209     return multiVerDataSync_->SyncStart(context_);
210 }
211 
ValueSliceSyncStepInner(void)212 int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void)
213 {
214     return valueSliceSync_->SyncStart(context_);
215 }
216 
SyncStepInnerLocked()217 void MultiVerSyncStateMachine::SyncStepInnerLocked()
218 {
219     if (context_->IncUsedCount() != E_OK) {
220         goto SYNC_STEP_OUT;
221     }
222 
223     LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_);
224     int errCode;
225     {
226         std::lock_guard<std::mutex> lock(stateMachineLock_);
227         switch (currentState_) {
228             case COMMIT_HISTORY_SYNC:
229                 errCode = CommitHistorySyncStepInner();
230                 if (errCode != E_OK) {
231                     Abort();
232                 }
233                 break;
234             case MULTI_VER_DATA_ENTRY_SYNC:
235                 errCode = MultiVerDataSyncStepInner();
236                 if (errCode == -E_NOT_FOUND) {
237                     Finish();
238                     goto SYNC_STEP_SAFE_OUT;
239                 }
240                 break;
241             case MULTI_VER_VALUE_SLICE_SYNC:
242                 errCode = ValueSliceSyncStepInner();
243                 if (errCode == -E_NOT_FOUND) {
244                     int err = OneCommitSyncFinish();
245                     if (err != E_OK) {
246                         valueSliceSync_->SendFinishedRequest(context_);
247                         Abort();
248                         goto SYNC_STEP_SAFE_OUT;
249                     }
250                     currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
251                     SyncStep();
252                     goto SYNC_STEP_SAFE_OUT;
253                 }
254                 break;
255             default:
256                 break;
257         }
258     }
259 
260 SYNC_STEP_SAFE_OUT:
261     context_->SafeExit();
262 
263 SYNC_STEP_OUT:
264     RefObject::DecObjRef(communicator_);
265     RefObject::DecObjRef(context_);
266 }
267 
SyncStepInner()268 void MultiVerSyncStateMachine::SyncStepInner()
269 {
270 }
271 
StartSyncInner()272 int MultiVerSyncStateMachine::StartSyncInner()
273 {
274     LOGI("[MultiVerSyncStateMachine] StartSync");
275     currentState_ = COMMIT_HISTORY_SYNC;
276     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
277     if (performance != nullptr) {
278         performance->TimeRecordStart();
279     }
280     int errCode = StartWatchDog();
281     if (errCode != E_OK) {
282         LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode);
283         return errCode;
284     }
285     SyncStep();
286     return E_OK;
287 }
288 
AbortInner()289 void MultiVerSyncStateMachine::AbortInner()
290 {
291     context_->Clear();
292     StepToIdle();
293     ExecNextTask();
294 }
295 
GetStateSwitchTables() const296 const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const
297 {
298     return stateSwitchTables_;
299 }
300 
PrepareNextSyncTask()301 int MultiVerSyncStateMachine::PrepareNextSyncTask()
302 {
303     return StartSyncInner();
304 }
305 
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)306 void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
307 {
308     (void)sessionId;
309     (void)sequenceId;
310     (void)inMsgId;
311 }
312 
CommErrAbort(uint32_t sessionId)313 void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
314 {
315     std::lock_guard<std::mutex> lock(stateMachineLock_);
316     Abort();
317     RefObject::DecObjRef(context_);
318 }
319 
TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)320 int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg)
321 {
322     int errCode;
323     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
324         return -E_INVALID_ARGS;
325     }
326     switch (inMsg->GetMessageType()) {
327         case TYPE_REQUEST:
328             errCode = timeSync_->RequestRecv(inMsg);
329             return errCode;
330         case TYPE_RESPONSE:
331             errCode = timeSync_->AckRecv(inMsg);
332             if (errCode != E_OK) {
333                 LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode);
334             }
335             return errCode;
336         default:
337             return -E_INVALID_ARGS;
338     }
339 }
340 
CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)341 int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
342 {
343     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
344         return -E_INVALID_ARGS;
345     }
346     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
347     int errCode;
348     switch (inMsg->GetMessageType()) {
349         case TYPE_REQUEST:
350             if (performance != nullptr) {
351                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
352             }
353             return commitHistorySync_->RequestRecvCallback(context, inMsg);
354         case TYPE_RESPONSE:
355             if (performance != nullptr) {
356                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
357             }
358             errCode = commitHistorySync_->AckRecvCallback(context, inMsg);
359             if (errCode != E_OK) {
360                 return errCode;
361             }
362             currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
363             SyncStep();
364             return errCode;
365         default:
366             return -E_INVALID_ARGS;
367     }
368 }
369 
MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)370 int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
371 {
372     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
373         return -E_INVALID_ARGS;
374     }
375     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
376     int errCode;
377     switch (inMsg->GetMessageType()) {
378         case TYPE_REQUEST:
379             return multiVerDataSync_->RequestRecvCallback(context, inMsg);
380         case TYPE_RESPONSE:
381             if (performance != nullptr) {
382                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
383             }
384             errCode = multiVerDataSync_->AckRecvCallback(context, inMsg);
385             if (errCode != E_OK) {
386                 multiVerDataSync_->SendFinishedRequest(context);
387                 return errCode;
388             }
389             currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
390             SyncStep();
391             return errCode;
392         default:
393             return -E_INVALID_ARGS;
394     }
395 }
396 
ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)397 int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
398 {
399     if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
400         return -E_INVALID_ARGS;
401     }
402     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
403     int errCode;
404     switch (inMsg->GetMessageType()) {
405         case TYPE_REQUEST:
406             return valueSliceSync_->RequestRecvCallback(context, inMsg);
407         case TYPE_RESPONSE:
408             if (performance != nullptr) {
409                 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
410             }
411             errCode = valueSliceSync_->AckRecvCallback(context, inMsg);
412             if (errCode != E_OK) {
413                 valueSliceSync_->SendFinishedRequest(context);
414                 return errCode;
415             }
416             currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
417             SyncStep();
418             return errCode;
419         default:
420             return -E_INVALID_ARGS;
421     }
422 }
423 
Finish()424 void MultiVerSyncStateMachine::Finish()
425 {
426     MultiVerCommitNode commit;
427     std::vector<MultiVerCommitNode> commits;
428     int commitsSize = context_->GetCommitsSize();
429     if (commitsSize > 0) {
430         context_->GetCommit(commitsSize - 1, commit);
431         context_->GetCommits(commits);
432         LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId()));
433         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
434         if (performance != nullptr) {
435             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE);
436         }
437         int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits);
438         LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d",
439             STR_MASK(context_->GetDeviceId()), errCode);
440         if (performance != nullptr) {
441             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE);
442         }
443     }
444     RefObject::AutoLock lock(context_);
445     context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
446     StepToIdle();
447     ExecNextTask();
448 }
449 
OneCommitSyncFinish()450 int MultiVerSyncStateMachine::OneCommitSyncFinish()
451 {
452     MultiVerCommitNode commit;
453     std::vector<MultiVerKvEntry *> entries;
454     std::string deviceName;
455     TimeOffset outOffset = 0;
456     int errCode = E_OK;
457     int commitIndex = context_->GetCommitIndex();
458 
459     LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish  src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()),
460         commitIndex);
461     if (commitIndex > 0) {
462         context_->GetCommit(commitIndex - 1, commit);
463         deviceName = context_->GetDeviceId();
464         context_->GetEntries(entries);
465         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu",
466             STR_MASK(context_->GetDeviceId()), entries.size());
467         errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME);
468         if (errCode != E_OK) {
469             LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode);
470             return errCode;
471         }
472         Timestamp currentLocalTime = context_->GetCurrentLocalTime();
473         commit.timestamp -= static_cast<Timestamp>(outOffset);
474 
475         // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp
476         TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp -
477             static_cast<Timestamp>(currentLocalTime));
478         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64,
479             STR_MASK(context_->GetDeviceId()), timefixOffset);
480         commit.timestamp -= static_cast<Timestamp>(timefixOffset);
481         ChangeEntriesTimestamp(entries, outOffset, timefixOffset);
482         PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
483         if (performance != nullptr) {
484             performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
485         }
486         errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName);
487         LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d",
488             STR_MASK(context_->GetDeviceId()), errCode);
489         if (performance != nullptr) {
490             performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
491         }
492         if (errCode == E_OK) {
493             context_->ReleaseEntries();
494         }
495     }
496     DBCommon::PrintHexVector(commit.commitId, __LINE__);
497     return errCode;
498 }
499 
IsPacketValid(const Message * inMsg) const500 bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
501 {
502     if (inMsg == nullptr) {
503         return false;
504     }
505 
506     if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) ||
507         (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) {
508         LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId());
509         return false;
510     }
511     if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
512         return true;
513     }
514     if (inMsg->GetMessageType() == TYPE_RESPONSE) {
515         if ((inMsg->GetSequenceId() != context_->GetSequenceId()) ||
516             (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
517             LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d,"
518                 "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(),
519                 inMsg->GetSessionId(), context_->GetRequestSessionId());
520             return false;
521         }
522     }
523     return true;
524 }
525 
Clear()526 void MultiVerSyncStateMachine::Clear()
527 {
528     commitHistorySync_ = nullptr;
529     multiVerDataSync_ = nullptr;
530     timeSync_ = nullptr;
531     valueSliceSync_ = nullptr;
532     multiVerStorage_ = nullptr;
533     context_ = nullptr;
534 }
535 
SyncResponseBegin(uint32_t sessionId)536 void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId)
537 {
538     {
539         std::lock_guard<std::mutex> lock(responseInfosLock_);
540         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
541             return info.sessionId == sessionId;
542         });
543         if (iter != responseInfos_.end()) {
544             LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit.");
545             return;
546         }
547         TimerAction timeOutCallback =
548             std::bind(&MultiVerSyncStateMachine::SyncResponseTimeout, this, std::placeholders::_1);
549         // To make sure context_ alive in timeout callback, we should IncObjRef for the context_.
550         RefObject::IncObjRef(context_);
551         TimerId timerId = 0;
552         int errCode = RuntimeContext::GetInstance()->SetTimer(
553             RESPONSE_TIME_OUT, timeOutCallback,
554             [this]() {
555                 int ret = RuntimeContext::GetInstance()->ScheduleTask([this](){ RefObject::DecObjRef(context_); });
556                 if (ret != E_OK) {
557                     LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret);
558                 }
559             },
560             timerId);
561         if (errCode != E_OK) {
562             LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode);
563             RefObject::DecObjRef(context_);
564             return;
565         }
566         ResponseInfo info{sessionId, timerId};
567         responseInfos_.push_back(info);
568         LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin");
569     }
570     multiVerStorage_->NotifyStartSyncOperation();
571 }
572 
SyncResponseEnd(uint32_t sessionId)573 void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId)
574 {
575     {
576         std::lock_guard<std::mutex> lock(responseInfosLock_);
577         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
578             return info.sessionId == sessionId;
579         });
580         if (iter == responseInfos_.end()) {
581             LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId);
582             return;
583         }
584         RuntimeContext::GetInstance()->RemoveTimer(iter->timerId);
585         responseInfos_.erase(iter);
586         LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response");
587     }
588     multiVerStorage_->NotifyFinishSyncOperation();
589 }
590 
SyncResponseTimeout(TimerId timerId)591 int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId)
592 {
593     uint32_t sessionId;
594     {
595         std::lock_guard<std::mutex> lock(responseInfosLock_);
596         auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) {
597             return info.timerId == timerId;
598         });
599         if (iter == responseInfos_.end()) {
600             LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId);
601             return E_OK;
602         }
603         sessionId = iter->sessionId;
604     }
605     SyncResponseEnd(sessionId);
606     return E_OK;
607 }
608 
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)609 bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
610 {
611     (void) inMsg;
612     (void) query;
613     return false;
614 }
615 }
616 #endif