1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "time_sync.h"
17
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29 constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30 constexpr int TRIP_DIV_HALF = 2;
31 constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1s for 100ns
32 }
33
34 // Class TimeSyncPacket
TimeSyncPacket()35 TimeSyncPacket::TimeSyncPacket()
36 : sourceTimeBegin_(0),
37 sourceTimeEnd_(0),
38 targetTimeBegin_(0),
39 targetTimeEnd_(0),
40 version_(TIME_SYNC_VERSION_V1)
41 {
42 }
43
~TimeSyncPacket()44 TimeSyncPacket::~TimeSyncPacket()
45 {
46 }
47
SetSourceTimeBegin(Timestamp sourceTimeBegin)48 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
49 {
50 sourceTimeBegin_ = sourceTimeBegin;
51 }
52
GetSourceTimeBegin() const53 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
54 {
55 return sourceTimeBegin_;
56 }
57
SetSourceTimeEnd(Timestamp sourceTimeEnd)58 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
59 {
60 sourceTimeEnd_ = sourceTimeEnd;
61 }
62
GetSourceTimeEnd() const63 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
64 {
65 return sourceTimeEnd_;
66 }
67
SetTargetTimeBegin(Timestamp targetTimeBegin)68 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
69 {
70 targetTimeBegin_ = targetTimeBegin;
71 }
72
GetTargetTimeBegin() const73 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
74 {
75 return targetTimeBegin_;
76 }
77
SetTargetTimeEnd(Timestamp targetTimeEnd)78 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
79 {
80 targetTimeEnd_ = targetTimeEnd;
81 }
82
GetTargetTimeEnd() const83 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
84 {
85 return targetTimeEnd_;
86 }
87
SetVersion(uint32_t version)88 void TimeSyncPacket::SetVersion(uint32_t version)
89 {
90 version_ = version;
91 }
92
GetVersion() const93 uint32_t TimeSyncPacket::GetVersion() const
94 {
95 return version_;
96 }
97
CalculateLen()98 uint32_t TimeSyncPacket::CalculateLen()
99 {
100 uint32_t len = Parcel::GetUInt32Len();
101 len += Parcel::GetUInt64Len();
102 len += Parcel::GetUInt64Len();
103 len += Parcel::GetUInt64Len();
104 len += Parcel::GetUInt64Len();
105 len = Parcel::GetEightByteAlign(len);
106 return len;
107 }
108
109 // Class TimeSync
TimeSync()110 TimeSync::TimeSync()
111 : communicateHandle_(nullptr),
112 metadata_(nullptr),
113 timeHelper_(nullptr),
114 retryTime_(0),
115 driverTimerId_(0),
116 isSynced_(false),
117 isAckReceived_(false),
118 timeChangedListener_(nullptr),
119 timeDriverLockCount_(0),
120 isOnline_(true),
121 closed_(false)
122 {
123 }
124
~TimeSync()125 TimeSync::~TimeSync()
126 {
127 Finalize();
128 driverTimerId_ = 0;
129
130 if (timeChangedListener_ != nullptr) {
131 timeChangedListener_->Drop(true);
132 timeChangedListener_ = nullptr;
133 }
134 timeHelper_ = nullptr;
135 communicateHandle_ = nullptr;
136 metadata_ = nullptr;
137
138 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
139 timeSyncSet_.erase(this);
140 }
141
RegisterTransformFunc()142 int TimeSync::RegisterTransformFunc()
143 {
144 TransformFunc func;
145 func.computeFunc = std::bind(&TimeSync::CalculateLen, std::placeholders::_1);
146 func.serializeFunc = std::bind(&TimeSync::Serialization, std::placeholders::_1,
147 std::placeholders::_2, std::placeholders::_3);
148 func.deserializeFunc = std::bind(&TimeSync::DeSerialization, std::placeholders::_1,
149 std::placeholders::_2, std::placeholders::_3);
150 return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
151 }
152
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)153 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
154 const ISyncInterface *storage, const DeviceID &deviceId)
155 {
156 if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
157 return -E_INVALID_ARGS;
158 }
159 {
160 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
161 timeSyncSet_.insert(this);
162 }
163 communicateHandle_ = communicator;
164 metadata_ = metadata;
165 deviceId_ = deviceId;
166 timeHelper_ = std::make_unique<TimeHelper>();
167
168 int errCode = timeHelper_->Initialize(storage, metadata_);
169 if (errCode != E_OK) {
170 timeHelper_ = nullptr;
171 LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
172 return errCode;
173 }
174
175 driverCallback_ = std::bind(&TimeSync::TimeSyncDriver, this, std::placeholders::_1);
176 errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
177 if (errCode != E_OK) {
178 return errCode;
179 }
180 return errCode;
181 }
182
Finalize()183 void TimeSync::Finalize()
184 {
185 // Stop the timer
186 LOGD("[TimeSync] Finalize enter!");
187 RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
188 TimerId timerId;
189 {
190 std::unique_lock<std::mutex> lock(timeDriverLock_);
191 timerId = driverTimerId_;
192 }
193 runtimeContext->RemoveTimer(timerId, true);
194 std::unique_lock<std::mutex> lock(timeDriverLock_);
195 timeDriverCond_.wait(lock, [this](){ return this->timeDriverLockCount_ == 0; });
196 LOGD("[TimeSync] Finalized!");
197 }
198
SyncStart(const CommErrHandler & handler,uint32_t sessionId)199 int TimeSync::SyncStart(const CommErrHandler &handler, uint32_t sessionId)
200 {
201 isOnline_ = true;
202 TimeSyncPacket packet;
203 Timestamp startTime = timeHelper_->GetTime();
204 packet.SetSourceTimeBegin(startTime);
205 // send timeSync request
206 LOGD("[TimeSync] startTime = %" PRIu64 ", dev = %s{private}", startTime, deviceId_.c_str());
207
208 Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
209 if (message == nullptr) {
210 return -E_OUT_OF_MEMORY;
211 }
212 message->SetSessionId(sessionId);
213 message->SetMessageType(TYPE_REQUEST);
214 message->SetPriority(Priority::NORMAL);
215 int errCode = message->SetCopiedObject<>(packet);
216 if (errCode != E_OK) {
217 delete message;
218 message = nullptr;
219 return errCode;
220 }
221
222 errCode = SendPacket(deviceId_, message, handler);
223 if (errCode != E_OK) {
224 delete message;
225 message = nullptr;
226 }
227 return errCode;
228 }
229
CalculateLen(const Message * inMsg)230 uint32_t TimeSync::CalculateLen(const Message *inMsg)
231 {
232 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
233 return 0;
234 }
235
236 const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
237 if (packet == nullptr) {
238 return 0;
239 }
240
241 return TimeSyncPacket::CalculateLen();
242 }
243
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)244 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
245 {
246 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
247 return -E_INVALID_ARGS;
248 }
249 const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
250 if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
251 return -E_INVALID_ARGS;
252 }
253
254 Parcel parcel(buffer, length);
255 Timestamp srcBegin = packet->GetSourceTimeBegin();
256 Timestamp srcEnd = packet->GetSourceTimeEnd();
257 Timestamp targetBegin = packet->GetTargetTimeBegin();
258 Timestamp targetEnd = packet->GetTargetTimeEnd();
259
260 int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
261 if (errCode != E_OK) {
262 return -E_SECUREC_ERROR;
263 }
264 errCode = parcel.WriteUInt64(srcBegin);
265 if (errCode != E_OK) {
266 return -E_SECUREC_ERROR;
267 }
268 errCode = parcel.WriteUInt64(srcEnd);
269 if (errCode != E_OK) {
270 return -E_SECUREC_ERROR;
271 }
272 errCode = parcel.WriteUInt64(targetBegin);
273 if (errCode != E_OK) {
274 return -E_SECUREC_ERROR;
275 }
276 errCode = parcel.WriteUInt64(targetEnd);
277 if (errCode != E_OK) {
278 return -E_SECUREC_ERROR;
279 }
280 parcel.EightByteAlign();
281 if (parcel.IsError()) {
282 return -E_PARSE_FAIL;
283 }
284 return errCode;
285 }
286
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)287 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
288 {
289 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
290 return -E_INVALID_ARGS;
291 }
292 TimeSyncPacket packet;
293 Parcel parcel(const_cast<uint8_t *>(buffer), length);
294 Timestamp srcBegin;
295 Timestamp srcEnd;
296 Timestamp targetBegin;
297 Timestamp targetEnd;
298
299 uint32_t version = 0;
300 parcel.ReadUInt32(version);
301 if (parcel.IsError()) {
302 return -E_INVALID_ARGS;
303 }
304 if (version > TIME_SYNC_VERSION_V1) {
305 packet.SetVersion(version);
306 return inMsg->SetCopiedObject<>(packet);
307 }
308 parcel.ReadUInt64(srcBegin);
309 parcel.ReadUInt64(srcEnd);
310 parcel.ReadUInt64(targetBegin);
311 parcel.ReadUInt64(targetEnd);
312 if (parcel.IsError()) {
313 return -E_INVALID_ARGS;
314 }
315 packet.SetSourceTimeBegin(srcBegin);
316 packet.SetSourceTimeEnd(srcEnd);
317 packet.SetTargetTimeBegin(targetBegin);
318 packet.SetTargetTimeEnd(targetEnd);
319
320 return inMsg->SetCopiedObject<>(packet);
321 }
322
AckRecv(const Message * message,uint32_t targetSessionId)323 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
324 {
325 // only check when sessionId is not 0, because old version timesync sessionId is 0.
326 if (message != nullptr && message->GetSessionId() != 0 &&
327 message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
328 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
329 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
330 }
331 if (!IsPacketValid(message, TYPE_RESPONSE)) {
332 return -E_INVALID_ARGS;
333 }
334 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
335 if (packet == nullptr) {
336 LOGE("[TimeSync] AckRecv packet is null");
337 return -E_INVALID_ARGS;
338 }
339
340 TimeSyncPacket packetData = TimeSyncPacket(*packet);
341 Timestamp sourceTimeEnd = timeHelper_->GetTime();
342 packetData.SetSourceTimeEnd(sourceTimeEnd);
343 if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
344 packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
345 packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
346 packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
347 LOGD("[TimeSync][AckRecv] Time valid check failed.");
348 return -E_INVALID_TIME;
349 }
350 // calculate timeoffset of two devices
351 TimeOffset offset = CalculateTimeOffset(packetData);
352 LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
353 ", tBegin = %" PRIu64 ", offset = %" PRId64,
354 deviceId_.c_str(),
355 packetData.GetSourceTimeEnd(),
356 packetData.GetTargetTimeEnd(),
357 packetData.GetSourceTimeBegin(),
358 packetData.GetTargetTimeBegin(),
359 offset);
360
361 // save timeoffset into metadata, maybe a block action
362 int errCode = SaveTimeOffset(deviceId_, offset);
363 isSynced_ = true;
364 {
365 std::lock_guard<std::mutex> lock(cvLock_);
366 isAckReceived_ = true;
367 }
368 conditionVar_.notify_all();
369 ResetTimer();
370 return errCode;
371 }
372
RequestRecv(const Message * message)373 int TimeSync::RequestRecv(const Message *message)
374 {
375 if (!IsPacketValid(message, TYPE_REQUEST)) {
376 return -E_INVALID_ARGS;
377 }
378 Timestamp targetTimeBegin = timeHelper_->GetTime();
379
380 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
381 if (packet == nullptr) {
382 return -E_INVALID_ARGS;
383 }
384
385 // build timeSync ack packet
386 TimeSyncPacket ackPacket = TimeSyncPacket(*packet);
387 ackPacket.SetTargetTimeBegin(targetTimeBegin);
388 Timestamp targetTimeEnd = timeHelper_->GetTime();
389 ackPacket.SetTargetTimeEnd(targetTimeEnd);
390 LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
391 ", tbegin = %" PRIu64, deviceId_.c_str(), ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(),
392 ackPacket.GetSourceTimeBegin(), ackPacket.GetTargetTimeBegin());
393 if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
394 LOGD("[TimeSync][RequestRecv] Time valid check failed.");
395 return -E_INVALID_TIME;
396 }
397
398 TimeOffset timeoffsetIgnoreRtt = static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - targetTimeBegin);
399 TimeOffset metadataTimeoffset;
400 metadata_->GetTimeOffset(deviceId_, metadataTimeoffset);
401
402 // 2 is half of INT64_MAX
403 if ((std::abs(metadataTimeoffset) >= INT64_MAX / 2) || (std::abs(timeoffsetIgnoreRtt) >= INT64_MAX / 2) ||
404 (std::abs(metadataTimeoffset - timeoffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE)) {
405 LOGI("[TimeSync][RequestRecv] timeoffSet invalid, should do time sync");
406 isSynced_ = false;
407 }
408
409 Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
410 if (ackMessage == nullptr) {
411 return -E_OUT_OF_MEMORY;
412 }
413 ackMessage->SetSessionId(message->GetSessionId());
414 ackMessage->SetPriority(Priority::NORMAL);
415 ackMessage->SetMessageType(TYPE_RESPONSE);
416 ackMessage->SetTarget(deviceId_);
417 int errCode = ackMessage->SetCopiedObject<>(ackPacket);
418 if (errCode != E_OK) {
419 delete ackMessage;
420 ackMessage = nullptr;
421 return errCode;
422 }
423
424 errCode = SendPacket(deviceId_, ackMessage);
425 if (errCode != E_OK) {
426 delete ackMessage;
427 ackMessage = nullptr;
428 }
429 return errCode;
430 }
431
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)432 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
433 {
434 return metadata_->SaveTimeOffset(deviceID, timeOffset);
435 }
436
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)437 TimeOffset TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
438 {
439 TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
440 timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
441 TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
442 timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
443 TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
444 timeSyncInfo.GetSourceTimeEnd());
445 TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
446 LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
447 ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
448 return offset;
449 }
450
IsPacketValid(const Message * inMsg,uint16_t messageType)451 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
452 {
453 if (inMsg == nullptr) {
454 return false;
455 }
456 if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
457 LOGD("message Id = %d", inMsg->GetMessageId());
458 return false;
459 }
460 if (messageType != inMsg->GetMessageType()) {
461 LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
462 return false;
463 }
464 return true;
465 }
466
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)467 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
468 {
469 SendConfig conf;
470 timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
471 int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
472 if (errCode != E_OK) {
473 LOGE("[TimeSync] SendPacket failed, err %d", errCode);
474 }
475 return errCode;
476 }
477
TimeSyncDriver(TimerId timerId)478 int TimeSync::TimeSyncDriver(TimerId timerId)
479 {
480 if (timerId != driverTimerId_) {
481 return -E_INTERNAL_ERROR;
482 }
483 if (!isOnline_) {
484 return E_OK;
485 }
486 std::lock_guard<std::mutex> lock(timeDriverLock_);
487 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
488 CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
489 (void)this->SyncStart(handler);
490 std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
491 this->timeDriverLockCount_--;
492 this->timeDriverCond_.notify_all();
493 });
494 if (errCode != E_OK) {
495 LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
496 return errCode;
497 }
498 timeDriverLockCount_++;
499 return E_OK;
500 }
501
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)502 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
503 {
504 if (!isSynced_) {
505 {
506 std::lock_guard<std::mutex> lock(cvLock_);
507 isAckReceived_ = false;
508 }
509 CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
510 int errCode = SyncStart(handler, sessionId);
511 LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
512 TimeHelper::GetSysCurrentTime(), errCode, timeout);
513 std::unique_lock<std::mutex> lock(cvLock_);
514 if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout),
515 [this](){ return this->isAckReceived_ || this->closed_; })) {
516 LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
517 retryTime_++;
518 if (retryTime_ < MAX_RETRY_TIME) {
519 lock.unlock();
520 LOGI("TimeSync::GetTimeOffset timeout, try again");
521 return GetTimeOffset(outOffset, timeout);
522 }
523 retryTime_ = 0;
524 return -E_TIMEOUT;
525 }
526 }
527 if (IsClosed()) {
528 return -E_BUSY;
529 }
530 retryTime_ = 0;
531 metadata_->GetTimeOffset(deviceId_, outOffset);
532 return E_OK;
533 }
534
IsNeedSync() const535 bool TimeSync::IsNeedSync() const
536 {
537 return !isSynced_;
538 }
539
SetOnline(bool isOnline)540 void TimeSync::SetOnline(bool isOnline)
541 {
542 isOnline_ = isOnline;
543 }
544
CommErrHandlerFunc(int errCode,TimeSync * timeSync)545 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
546 {
547 LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
548 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
549 if (timeSyncSet_.count(timeSync) == 0) {
550 LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
551 return;
552 }
553 if (timeSync == nullptr) {
554 LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
555 return;
556 }
557 if (errCode != E_OK) {
558 timeSync->SetOnline(false);
559 } else {
560 timeSync->SetOnline(true);
561 }
562 }
563
ResetTimer()564 void TimeSync::ResetTimer()
565 {
566 TimerId timerId;
567 {
568 std::lock_guard<std::mutex> lock(timeDriverLock_);
569 timerId = driverTimerId_;
570 driverTimerId_ = 0u;
571 }
572 if (timerId == 0u) {
573 return;
574 }
575 RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
576 int errCode = RuntimeContext::GetInstance()->SetTimer(
577 TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
578 if (errCode != E_OK) {
579 LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
580 } else {
581 std::lock_guard<std::mutex> lock(timeDriverLock_);
582 driverTimerId_ = timerId;
583 }
584 }
585
Close()586 void TimeSync::Close()
587 {
588 Finalize();
589 {
590 std::lock_guard<std::mutex> lock(cvLock_);
591 closed_ = true;
592 }
593 conditionVar_.notify_all();
594 }
595
IsClosed() const596 bool TimeSync::IsClosed() const
597 {
598 std::lock_guard<std::mutex> lock(cvLock_);
599 return closed_ ;
600 }
601 } // namespace DistributedDB