1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_sync_state_machine.h"
18
19 #include <cmath>
20 #include <climits>
21 #include <algorithm>
22
23 #include "message_transform.h"
24 #include "log_print.h"
25 #include "sync_types.h"
26 #include "db_common.h"
27 #include "ref_object.h"
28 #include "performance_analysis.h"
29
30 namespace DistributedDB {
31 namespace {
ChangeEntriesTimestamp(std::vector<MultiVerKvEntry * > & entries,TimeOffset outOffset,TimeOffset timefixOffset)32 void ChangeEntriesTimestamp(std::vector<MultiVerKvEntry *> &entries, TimeOffset outOffset, TimeOffset timefixOffset)
33 {
34 for (MultiVerKvEntry *entry : entries) {
35 if (entry == nullptr) {
36 continue;
37 }
38 Timestamp timestamp;
39 entry->GetTimestamp(timestamp);
40 timestamp = timestamp - static_cast<Timestamp>(outOffset + timefixOffset);
41 entry->SetTimestamp(timestamp);
42 }
43 }
44 }
45 std::vector<StateSwitchTable> MultiVerSyncStateMachine::stateSwitchTables_;
MultiVerSyncStateMachine()46 MultiVerSyncStateMachine::MultiVerSyncStateMachine()
47 : context_(nullptr),
48 multiVerStorage_(nullptr),
49 timeSync_(nullptr),
50 commitHistorySync_(nullptr),
51 multiVerDataSync_(nullptr),
52 valueSliceSync_(nullptr)
53 {
54 }
55
~MultiVerSyncStateMachine()56 MultiVerSyncStateMachine::~MultiVerSyncStateMachine()
57 {
58 Clear();
59 }
60
Initialize(ISyncTaskContext * context,ISyncInterface * syncInterface,const std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)61 int MultiVerSyncStateMachine::Initialize(ISyncTaskContext *context, ISyncInterface *syncInterface,
62 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
63 {
64 if (context == nullptr || syncInterface == nullptr || metadata == nullptr || communicator == nullptr) {
65 return -E_INVALID_ARGS;
66 }
67 int errCode = SyncStateMachine::Initialize(context, syncInterface, metadata, communicator);
68 if (errCode != E_OK) {
69 return errCode;
70 }
71
72 timeSync_ = std::make_unique<TimeSync>();
73 commitHistorySync_ = std::make_unique<CommitHistorySync>();
74 multiVerDataSync_ = std::make_unique<MultiVerDataSync>();
75 valueSliceSync_ = std::make_unique<ValueSliceSync>();
76
77 errCode = timeSync_->Initialize(communicator, metadata, syncInterface, context->GetDeviceId(),
78 context->GetTargetUserId());
79 if (errCode != E_OK) {
80 LOGE("timeSync_->Initialize failed err %d", errCode);
81 goto ERROR_OUT;
82 }
83 LOGD("timeSync_->Initialize OK");
84
85 // init functions below will never fail
86 multiVerStorage_ = static_cast<MultiVerKvDBSyncInterface *>(syncInterface);
87 commitHistorySync_->Initialize(multiVerStorage_, communicator);
88 multiVerDataSync_->Initialize(multiVerStorage_, communicator);
89 valueSliceSync_->Initialize(multiVerStorage_, communicator);
90
91 context_ = static_cast<MultiVerSyncTaskContext *>(context);
92 currentState_ = IDLE;
93 (void)timeSync_->SyncStart();
94 return E_OK;
95
96 ERROR_OUT:
97 Clear();
98 return errCode;
99 }
100
SyncStep()101 void MultiVerSyncStateMachine::SyncStep()
102 {
103 RefObject::IncObjRef(context_);
104 RefObject::IncObjRef(communicator_);
105 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] { SyncStepInnerLocked(); });
106 if (errCode != E_OK) {
107 LOGE("[MultiVerSyncStateMachine] Schedule SyncStep failed");
108 RefObject::DecObjRef(communicator_);
109 RefObject::DecObjRef(context_);
110 }
111 }
112
StepToIdle()113 void MultiVerSyncStateMachine::StepToIdle()
114 {
115 currentState_ = IDLE;
116 StopWatchDog();
117 context_->Clear();
118 PerformanceAnalysis::GetInstance()->TimeRecordEnd();
119 LOGD("[MultiVerSyncStateMachine][%s] step to idle", STR_MASK(context_->GetDeviceId()));
120 }
121
MessageCallbackCheck(const Message * inMsg)122 int MultiVerSyncStateMachine::MessageCallbackCheck(const Message *inMsg)
123 {
124 RefObject::AutoLock lock(context_);
125 if (context_->IsKilled()) {
126 return -E_OBJ_IS_KILLED;
127 }
128 if (!IsPacketValid(inMsg)) {
129 return -E_INVALID_ARGS;
130 }
131 if ((inMsg->GetMessageType() == TYPE_RESPONSE) && (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
132 context_->IncSequenceId();
133 int errCode = ResetWatchDog();
134 if (errCode != E_OK) {
135 LOGW("[MultiVerSyncStateMachine][MessageCallback] ResetWatchDog failed , err %d", errCode);
136 }
137 }
138 return E_OK;
139 }
140
ReceiveMessageCallback(Message * inMsg)141 int MultiVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg)
142 {
143 if (inMsg == nullptr) {
144 return -E_INVALID_ARGS;
145 }
146 if (inMsg->IsFeedbackError()) {
147 LOGE("[MultiVerSyncStateMachine] Feedback Message with errorNo=%u.", inMsg->GetErrorNo());
148 return -static_cast<int>(inMsg->GetErrorNo());
149 }
150 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
151 return TimeSyncPacketRecvCallback(context_, inMsg);
152 }
153 std::lock_guard<std::mutex> lock(stateMachineLock_);
154 int errCode = MessageCallbackCheck(inMsg);
155 if (errCode != E_OK) {
156 return errCode;
157 }
158 switch (inMsg->GetMessageId()) {
159 case COMMIT_HISTORY_SYNC_MESSAGE:
160 errCode = CommitHistorySyncPktRecvCallback(context_, inMsg);
161 if ((errCode != -E_NOT_FOUND) && (inMsg->GetMessageType() == TYPE_REQUEST) && (errCode != -E_NOT_PERMIT)) {
162 SyncResponseBegin(inMsg->GetSessionId());
163 }
164 break;
165 case MULTI_VER_DATA_SYNC_MESSAGE:
166 errCode = MultiVerDataPktRecvCallback(context_, inMsg);
167 break;
168 case VALUE_SLICE_SYNC_MESSAGE:
169 errCode = ValueSlicePktRecvCallback(context_, inMsg);
170 break;
171 default:
172 errCode = -E_NOT_SUPPORT;
173 break;
174 }
175 if (errCode == -E_LAST_SYNC_FRAME) {
176 SyncResponseEnd(inMsg->GetSessionId());
177 return errCode;
178 }
179 if (errCode != E_OK && inMsg->GetMessageType() == TYPE_RESPONSE) {
180 Abort();
181 }
182 return errCode;
183 }
184
StepToTimeout(TimerId timerId)185 void MultiVerSyncStateMachine::StepToTimeout(TimerId timerId)
186 {
187 {
188 std::lock_guard<std::mutex> lock(stateMachineLock_);
189 TimerId timer = syncContext_->GetTimerId();
190 if (timer != timerId) {
191 return;
192 }
193 currentState_ = SYNC_TIME_OUT;
194 }
195 Abort();
196 }
197
CommitHistorySyncStepInner(void)198 int MultiVerSyncStateMachine::CommitHistorySyncStepInner(void)
199 {
200 int errCode = commitHistorySync_->SyncStart(context_);
201 if (errCode != E_OK) {
202 LOGE("[MultiVerSyncStateMachine][CommitHistorySyncStep] failed, errCode %d", errCode);
203 }
204 return errCode;
205 }
206
MultiVerDataSyncStepInner(void)207 int MultiVerSyncStateMachine::MultiVerDataSyncStepInner(void)
208 {
209 return multiVerDataSync_->SyncStart(context_);
210 }
211
ValueSliceSyncStepInner(void)212 int MultiVerSyncStateMachine::ValueSliceSyncStepInner(void)
213 {
214 return valueSliceSync_->SyncStart(context_);
215 }
216
SyncStepInnerLocked()217 void MultiVerSyncStateMachine::SyncStepInnerLocked()
218 {
219 if (context_->IncUsedCount() != E_OK) {
220 goto SYNC_STEP_OUT;
221 }
222
223 LOGD("[MultiVerSyncStateMachine] SyncStep dst=%s, state = %d", STR_MASK(context_->GetDeviceId()), currentState_);
224 int errCode;
225 {
226 std::lock_guard<std::mutex> lock(stateMachineLock_);
227 switch (currentState_) {
228 case COMMIT_HISTORY_SYNC:
229 errCode = CommitHistorySyncStepInner();
230 if (errCode != E_OK) {
231 Abort();
232 }
233 break;
234 case MULTI_VER_DATA_ENTRY_SYNC:
235 errCode = MultiVerDataSyncStepInner();
236 if (errCode == -E_NOT_FOUND) {
237 Finish();
238 goto SYNC_STEP_SAFE_OUT;
239 }
240 break;
241 case MULTI_VER_VALUE_SLICE_SYNC:
242 errCode = ValueSliceSyncStepInner();
243 if (errCode == -E_NOT_FOUND) {
244 int err = OneCommitSyncFinish();
245 if (err != E_OK) {
246 valueSliceSync_->SendFinishedRequest(context_);
247 Abort();
248 goto SYNC_STEP_SAFE_OUT;
249 }
250 currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
251 SyncStep();
252 goto SYNC_STEP_SAFE_OUT;
253 }
254 break;
255 default:
256 break;
257 }
258 }
259
260 SYNC_STEP_SAFE_OUT:
261 context_->SafeExit();
262
263 SYNC_STEP_OUT:
264 RefObject::DecObjRef(communicator_);
265 RefObject::DecObjRef(context_);
266 }
267
SyncStepInner()268 void MultiVerSyncStateMachine::SyncStepInner()
269 {
270 }
271
StartSyncInner()272 int MultiVerSyncStateMachine::StartSyncInner()
273 {
274 LOGI("[MultiVerSyncStateMachine] StartSync");
275 currentState_ = COMMIT_HISTORY_SYNC;
276 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
277 if (performance != nullptr) {
278 performance->TimeRecordStart();
279 }
280 int errCode = StartWatchDog();
281 if (errCode != E_OK) {
282 LOGE("[MultiVerSyncStateMachine][StartSync] WatchDog start failed! err:%d", errCode);
283 return errCode;
284 }
285 SyncStep();
286 return E_OK;
287 }
288
AbortInner()289 void MultiVerSyncStateMachine::AbortInner()
290 {
291 context_->Clear();
292 StepToIdle();
293 ExecNextTask();
294 }
295
GetStateSwitchTables() const296 const std::vector<StateSwitchTable> &MultiVerSyncStateMachine::GetStateSwitchTables() const
297 {
298 return stateSwitchTables_;
299 }
300
PrepareNextSyncTask()301 int MultiVerSyncStateMachine::PrepareNextSyncTask()
302 {
303 return StartSyncInner();
304 }
305
SendNotifyPacket(uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)306 void MultiVerSyncStateMachine::SendNotifyPacket(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
307 {
308 (void)sessionId;
309 (void)sequenceId;
310 (void)inMsgId;
311 }
312
CommErrAbort(uint32_t sessionId)313 void MultiVerSyncStateMachine::CommErrAbort(uint32_t sessionId)
314 {
315 (void)sessionId;
316 std::lock_guard<std::mutex> lock(stateMachineLock_);
317 Abort();
318 RefObject::DecObjRef(context_);
319 }
320
TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext * context,const Message * inMsg)321 int MultiVerSyncStateMachine::TimeSyncPacketRecvCallback(const MultiVerSyncTaskContext *context, const Message *inMsg)
322 {
323 int errCode;
324 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != TIME_SYNC_MESSAGE)) {
325 return -E_INVALID_ARGS;
326 }
327 switch (inMsg->GetMessageType()) {
328 case TYPE_REQUEST:
329 errCode = timeSync_->RequestRecv(inMsg);
330 return errCode;
331 case TYPE_RESPONSE:
332 errCode = timeSync_->AckRecv(inMsg);
333 if (errCode != E_OK) {
334 LOGE("[MultiVerSyncStateMachine] TimeSyncPacketRecvCallback AckRecv failed err %d", errCode);
335 }
336 return errCode;
337 default:
338 return -E_INVALID_ARGS;
339 }
340 }
341
CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)342 int MultiVerSyncStateMachine::CommitHistorySyncPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
343 {
344 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != COMMIT_HISTORY_SYNC_MESSAGE)) {
345 return -E_INVALID_ARGS;
346 }
347 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
348 int errCode;
349 switch (inMsg->GetMessageType()) {
350 case TYPE_REQUEST:
351 if (performance != nullptr) {
352 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_SEND_LOCAL_DATA_CHANGED_TO_COMMIT_REQUEST_RECV);
353 }
354 return commitHistorySync_->RequestRecvCallback(context, inMsg);
355 case TYPE_RESPONSE:
356 if (performance != nullptr) {
357 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_COMMIT_SEND_REQUEST_TO_ACK_RECV);
358 }
359 errCode = commitHistorySync_->AckRecvCallback(context, inMsg);
360 if (errCode != E_OK) {
361 return errCode;
362 }
363 currentState_ = MULTI_VER_DATA_ENTRY_SYNC;
364 SyncStep();
365 return errCode;
366 default:
367 return -E_INVALID_ARGS;
368 }
369 }
370
MultiVerDataPktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)371 int MultiVerSyncStateMachine::MultiVerDataPktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
372 {
373 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
374 return -E_INVALID_ARGS;
375 }
376 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
377 int errCode;
378 switch (inMsg->GetMessageType()) {
379 case TYPE_REQUEST:
380 return multiVerDataSync_->RequestRecvCallback(context, inMsg);
381 case TYPE_RESPONSE:
382 if (performance != nullptr) {
383 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
384 }
385 errCode = multiVerDataSync_->AckRecvCallback(context, inMsg);
386 if (errCode != E_OK) {
387 multiVerDataSync_->SendFinishedRequest(context);
388 return errCode;
389 }
390 currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
391 SyncStep();
392 return errCode;
393 default:
394 return -E_INVALID_ARGS;
395 }
396 }
397
ValueSlicePktRecvCallback(MultiVerSyncTaskContext * context,const Message * inMsg)398 int MultiVerSyncStateMachine::ValueSlicePktRecvCallback(MultiVerSyncTaskContext *context, const Message *inMsg)
399 {
400 if ((context == nullptr) || (inMsg == nullptr) || (inMsg->GetMessageId() != VALUE_SLICE_SYNC_MESSAGE)) {
401 return -E_INVALID_ARGS;
402 }
403 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
404 int errCode;
405 switch (inMsg->GetMessageType()) {
406 case TYPE_REQUEST:
407 return valueSliceSync_->RequestRecvCallback(context, inMsg);
408 case TYPE_RESPONSE:
409 if (performance != nullptr) {
410 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_VALUE_SLICE_SEND_REQUEST_TO_ACK_RECV);
411 }
412 errCode = valueSliceSync_->AckRecvCallback(context, inMsg);
413 if (errCode != E_OK) {
414 valueSliceSync_->SendFinishedRequest(context);
415 return errCode;
416 }
417 currentState_ = MULTI_VER_VALUE_SLICE_SYNC;
418 SyncStep();
419 return errCode;
420 default:
421 return -E_INVALID_ARGS;
422 }
423 }
424
Finish()425 void MultiVerSyncStateMachine::Finish()
426 {
427 MultiVerCommitNode commit;
428 int commitsSize = context_->GetCommitsSize();
429 if (commitsSize > 0) {
430 context_->GetCommit(commitsSize - 1, commit);
431 std::vector<MultiVerCommitNode> commits;
432 context_->GetCommits(commits);
433 LOGD("MultiVerSyncStateMachine::Finish merge src=%s", STR_MASK(context_->GetDeviceId()));
434 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
435 if (performance != nullptr) {
436 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_MERGE);
437 }
438 int errCode = multiVerDataSync_->MergeSyncCommit(commit, commits);
439 LOGD("MultiVerSyncStateMachine::Finish merge src=%s, MergeSyncCommit errCode:%d",
440 STR_MASK(context_->GetDeviceId()), errCode);
441 if (performance != nullptr) {
442 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_MERGE);
443 }
444 }
445 RefObject::AutoLock lock(context_);
446 context_->SetOperationStatus(SyncOperation::OP_FINISHED_ALL);
447 StepToIdle();
448 ExecNextTask();
449 }
450
OneCommitSyncFinish()451 int MultiVerSyncStateMachine::OneCommitSyncFinish()
452 {
453 MultiVerCommitNode commit;
454 TimeOffset outOffset = 0;
455 int errCode = E_OK;
456 int commitIndex = context_->GetCommitIndex();
457
458 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, commitIndex = %d,", STR_MASK(context_->GetDeviceId()),
459 commitIndex);
460 if (commitIndex > 0) {
461 context_->GetCommit(commitIndex - 1, commit);
462 std::string deviceName = context_->GetDeviceId();
463 std::vector<MultiVerKvEntry *> entries;
464 context_->GetEntries(entries);
465 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, entries size = %lu",
466 STR_MASK(context_->GetDeviceId()), entries.size());
467 errCode = timeSync_->GetTimeOffset(outOffset, TIME_SYNC_WAIT_TIME);
468 if (errCode != E_OK) {
469 LOGI("MultiVerSyncStateMachine::OneCommitSyncFinish GetTimeOffset fail errCode:%d", errCode);
470 return errCode;
471 }
472 Timestamp currentLocalTime = context_->GetCurrentLocalTime();
473 commit.timestamp -= static_cast<Timestamp>(outOffset);
474
475 // Due to time sync error, commit timestamp may bigger than currentLocalTime, we need to fix the timestamp
476 TimeOffset timefixOffset = (commit.timestamp < currentLocalTime) ? 0 : (commit.timestamp -
477 static_cast<Timestamp>(currentLocalTime));
478 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish src=%s, timefixOffset = %" PRId64,
479 STR_MASK(context_->GetDeviceId()), timefixOffset);
480 commit.timestamp -= static_cast<Timestamp>(timefixOffset);
481 ChangeEntriesTimestamp(entries, outOffset, timefixOffset);
482 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
483 if (performance != nullptr) {
484 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
485 }
486 errCode = multiVerDataSync_->PutCommitData(commit, entries, deviceName);
487 LOGD("MultiVerSyncStateMachine::OneCommitSyncFinish PutCommitData src=%s, errCode = %d",
488 STR_MASK(context_->GetDeviceId()), errCode);
489 if (performance != nullptr) {
490 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_PUT_COMMIT_DATA);
491 }
492 if (errCode == E_OK) {
493 context_->ReleaseEntries();
494 }
495 }
496 DBCommon::PrintHexVector(commit.commitId, __LINE__);
497 return errCode;
498 }
499
IsPacketValid(const Message * inMsg) const500 bool MultiVerSyncStateMachine::IsPacketValid(const Message *inMsg) const
501 {
502 if (inMsg == nullptr) {
503 return false;
504 }
505
506 if ((inMsg->GetMessageId() < TIME_SYNC_MESSAGE) || (inMsg->GetMessageId() > VALUE_SLICE_SYNC_MESSAGE) ||
507 (inMsg->GetMessageId() == DATA_SYNC_MESSAGE)) {
508 LOGE("[MultiVerSyncStateMachine] Message is invalid, id = %d", inMsg->GetMessageId());
509 return false;
510 }
511 if (inMsg->GetMessageId() == TIME_SYNC_MESSAGE) {
512 return true;
513 }
514 if (inMsg->GetMessageType() == TYPE_RESPONSE) {
515 if ((inMsg->GetSequenceId() != context_->GetSequenceId()) ||
516 (inMsg->GetSessionId() != context_->GetRequestSessionId())) {
517 LOGE("[MultiVerSyncStateMachine] Message is invalid, inMsg SequenceId = %d, context seq = %d,"
518 "msg session id = %d, context session = %d", inMsg->GetSequenceId(), context_->GetSequenceId(),
519 inMsg->GetSessionId(), context_->GetRequestSessionId());
520 return false;
521 }
522 }
523 return true;
524 }
525
Clear()526 void MultiVerSyncStateMachine::Clear()
527 {
528 commitHistorySync_ = nullptr;
529 multiVerDataSync_ = nullptr;
530 timeSync_ = nullptr;
531 valueSliceSync_ = nullptr;
532 multiVerStorage_ = nullptr;
533 context_ = nullptr;
534 }
535
SyncResponseBegin(uint32_t sessionId)536 void MultiVerSyncStateMachine::SyncResponseBegin(uint32_t sessionId)
537 {
538 {
539 std::lock_guard<std::mutex> lock(responseInfosLock_);
540 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
541 return info.sessionId == sessionId;
542 });
543 if (iter != responseInfos_.end()) {
544 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] sessionId existed! exit.");
545 return;
546 }
547 TimerAction timeOutCallback = [this](TimerId timerId) -> int { return SyncResponseTimeout(timerId); };
548 // To make sure context_ alive in timeout callback, we should IncObjRef for the context_.
549 RefObject::IncObjRef(context_);
550 TimerId timerId = 0;
551 int errCode = RuntimeContext::GetInstance()->SetTimer(
552 RESPONSE_TIME_OUT, timeOutCallback,
553 [this]() {
554 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(context_); });
555 if (ret != E_OK) {
556 LOGE("[MultiVerSyncStateMachine][SyncResponseEnd] timer finalizer ScheduleTask, errCode %d", ret);
557 }
558 },
559 timerId);
560 if (errCode != E_OK) {
561 LOGE("[MultiVerSyncStateMachine][ResponseSessionBegin] SetTimer failed err %d", errCode);
562 RefObject::DecObjRef(context_);
563 return;
564 }
565 ResponseInfo info{sessionId, timerId};
566 responseInfos_.push_back(info);
567 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] begin");
568 }
569 multiVerStorage_->NotifyStartSyncOperation();
570 }
571
SyncResponseEnd(uint32_t sessionId)572 void MultiVerSyncStateMachine::SyncResponseEnd(uint32_t sessionId)
573 {
574 {
575 std::lock_guard<std::mutex> lock(responseInfosLock_);
576 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [sessionId](const ResponseInfo &info) {
577 return info.sessionId == sessionId;
578 });
579 if (iter == responseInfos_.end()) {
580 LOGW("[MultiVerSyncStateMachine][SyncResponseEnd] Can't find sync response %d", sessionId);
581 return;
582 }
583 RuntimeContext::GetInstance()->RemoveTimer(iter->timerId);
584 responseInfos_.erase(iter);
585 LOGI("[MultiVerSyncStateMachine][SyncResponseBegin] end response");
586 }
587 multiVerStorage_->NotifyFinishSyncOperation();
588 }
589
SyncResponseTimeout(TimerId timerId)590 int MultiVerSyncStateMachine::SyncResponseTimeout(TimerId timerId)
591 {
592 uint32_t sessionId;
593 {
594 std::lock_guard<std::mutex> lock(responseInfosLock_);
595 auto iter = std::find_if(responseInfos_.begin(), responseInfos_.end(), [timerId](const ResponseInfo &info) {
596 return info.timerId == timerId;
597 });
598 if (iter == responseInfos_.end()) {
599 LOGW("[MultiVerSyncStateMachine][SyncResponseTimeout] Can't find sync response timerId %" PRIu64, timerId);
600 return E_OK;
601 }
602 sessionId = iter->sessionId;
603 }
604 SyncResponseEnd(sessionId);
605 return E_OK;
606 }
607
IsNeedTriggerQueryAutoSync(Message * inMsg,QuerySyncObject & query)608 bool MultiVerSyncStateMachine::IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query)
609 {
610 (void) inMsg;
611 (void) query;
612 return false;
613 }
614 }
615 #endif