• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "time_sync.h"
17 
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24 
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29     constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30     constexpr int TRIP_DIV_HALF = 2;
31     constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1s for 100ns
32 }
33 
34 // Class TimeSyncPacket
TimeSyncPacket()35 TimeSyncPacket::TimeSyncPacket()
36     : sourceTimeBegin_(0),
37       sourceTimeEnd_(0),
38       targetTimeBegin_(0),
39       targetTimeEnd_(0),
40       version_(TIME_SYNC_VERSION_V1)
41 {
42 }
43 
~TimeSyncPacket()44 TimeSyncPacket::~TimeSyncPacket()
45 {
46 }
47 
SetSourceTimeBegin(Timestamp sourceTimeBegin)48 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
49 {
50     sourceTimeBegin_ = sourceTimeBegin;
51 }
52 
GetSourceTimeBegin() const53 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
54 {
55     return sourceTimeBegin_;
56 }
57 
SetSourceTimeEnd(Timestamp sourceTimeEnd)58 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
59 {
60     sourceTimeEnd_ = sourceTimeEnd;
61 }
62 
GetSourceTimeEnd() const63 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
64 {
65     return sourceTimeEnd_;
66 }
67 
SetTargetTimeBegin(Timestamp targetTimeBegin)68 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
69 {
70     targetTimeBegin_ = targetTimeBegin;
71 }
72 
GetTargetTimeBegin() const73 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
74 {
75     return targetTimeBegin_;
76 }
77 
SetTargetTimeEnd(Timestamp targetTimeEnd)78 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
79 {
80     targetTimeEnd_ = targetTimeEnd;
81 }
82 
GetTargetTimeEnd() const83 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
84 {
85     return targetTimeEnd_;
86 }
87 
SetVersion(uint32_t version)88 void TimeSyncPacket::SetVersion(uint32_t version)
89 {
90     version_ = version;
91 }
92 
GetVersion() const93 uint32_t TimeSyncPacket::GetVersion() const
94 {
95     return version_;
96 }
97 
CalculateLen()98 uint32_t TimeSyncPacket::CalculateLen()
99 {
100     uint32_t len = Parcel::GetUInt32Len();
101     len += Parcel::GetUInt64Len();
102     len += Parcel::GetUInt64Len();
103     len += Parcel::GetUInt64Len();
104     len += Parcel::GetUInt64Len();
105     len = Parcel::GetEightByteAlign(len);
106     return len;
107 }
108 
109 // Class TimeSync
TimeSync()110 TimeSync::TimeSync()
111     : communicateHandle_(nullptr),
112       metadata_(nullptr),
113       timeHelper_(nullptr),
114       retryTime_(0),
115       driverTimerId_(0),
116       isSynced_(false),
117       isAckReceived_(false),
118       timeChangedListener_(nullptr),
119       timeDriverLockCount_(0),
120       isOnline_(true),
121       closed_(false)
122 {
123 }
124 
~TimeSync()125 TimeSync::~TimeSync()
126 {
127     Finalize();
128     driverTimerId_ = 0;
129 
130     if (timeChangedListener_ != nullptr) {
131         timeChangedListener_->Drop(true);
132         timeChangedListener_ = nullptr;
133     }
134     timeHelper_ = nullptr;
135     communicateHandle_ = nullptr;
136     metadata_ = nullptr;
137 
138     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
139     timeSyncSet_.erase(this);
140 }
141 
RegisterTransformFunc()142 int TimeSync::RegisterTransformFunc()
143 {
144     TransformFunc func;
145     func.computeFunc = std::bind(&TimeSync::CalculateLen, std::placeholders::_1);
146     func.serializeFunc = std::bind(&TimeSync::Serialization, std::placeholders::_1,
147                                    std::placeholders::_2, std::placeholders::_3);
148     func.deserializeFunc = std::bind(&TimeSync::DeSerialization, std::placeholders::_1,
149                                      std::placeholders::_2, std::placeholders::_3);
150     return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
151 }
152 
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)153 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
154     const ISyncInterface *storage, const DeviceID &deviceId)
155 {
156     if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
157         return -E_INVALID_ARGS;
158     }
159     {
160         std::lock_guard<std::mutex> lock(timeSyncSetLock_);
161         timeSyncSet_.insert(this);
162     }
163     communicateHandle_ = communicator;
164     metadata_ = metadata;
165     deviceId_ = deviceId;
166     timeHelper_ = std::make_unique<TimeHelper>();
167 
168     int errCode = timeHelper_->Initialize(storage, metadata_);
169     if (errCode != E_OK) {
170         timeHelper_ = nullptr;
171         LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
172         return errCode;
173     }
174 
175     driverCallback_ = std::bind(&TimeSync::TimeSyncDriver, this, std::placeholders::_1);
176     errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
177     if (errCode != E_OK) {
178         return errCode;
179     }
180     return errCode;
181 }
182 
Finalize()183 void TimeSync::Finalize()
184 {
185     // Stop the timer
186     LOGD("[TimeSync] Finalize enter!");
187     RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
188     TimerId timerId;
189     {
190         std::unique_lock<std::mutex> lock(timeDriverLock_);
191         timerId = driverTimerId_;
192     }
193     runtimeContext->RemoveTimer(timerId, true);
194     std::unique_lock<std::mutex> lock(timeDriverLock_);
195     timeDriverCond_.wait(lock, [this](){ return this->timeDriverLockCount_ == 0; });
196     LOGD("[TimeSync] Finalized!");
197 }
198 
SyncStart(const CommErrHandler & handler,uint32_t sessionId)199 int TimeSync::SyncStart(const CommErrHandler &handler,  uint32_t sessionId)
200 {
201     isOnline_ = true;
202     TimeSyncPacket packet;
203     Timestamp startTime = timeHelper_->GetTime();
204     packet.SetSourceTimeBegin(startTime);
205     // send timeSync request
206     LOGD("[TimeSync] startTime = %" PRIu64 ", dev = %s{private}", startTime, deviceId_.c_str());
207 
208     Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
209     if (message == nullptr) {
210         return -E_OUT_OF_MEMORY;
211     }
212     message->SetSessionId(sessionId);
213     message->SetMessageType(TYPE_REQUEST);
214     message->SetPriority(Priority::NORMAL);
215     int errCode = message->SetCopiedObject<>(packet);
216     if (errCode != E_OK) {
217         delete message;
218         message = nullptr;
219         return errCode;
220     }
221 
222     errCode = SendPacket(deviceId_, message, handler);
223     if (errCode != E_OK) {
224         delete message;
225         message = nullptr;
226     }
227     return errCode;
228 }
229 
CalculateLen(const Message * inMsg)230 uint32_t TimeSync::CalculateLen(const Message *inMsg)
231 {
232     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
233         return 0;
234     }
235 
236     const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
237     if (packet == nullptr) {
238         return 0;
239     }
240 
241     return TimeSyncPacket::CalculateLen();
242 }
243 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)244 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
245 {
246     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
247         return -E_INVALID_ARGS;
248     }
249     const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
250     if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
251         return -E_INVALID_ARGS;
252     }
253 
254     Parcel parcel(buffer, length);
255     Timestamp srcBegin = packet->GetSourceTimeBegin();
256     Timestamp srcEnd = packet->GetSourceTimeEnd();
257     Timestamp targetBegin = packet->GetTargetTimeBegin();
258     Timestamp targetEnd = packet->GetTargetTimeEnd();
259 
260     int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
261     if (errCode != E_OK) {
262         return -E_SECUREC_ERROR;
263     }
264     errCode = parcel.WriteUInt64(srcBegin);
265     if (errCode != E_OK) {
266         return -E_SECUREC_ERROR;
267     }
268     errCode = parcel.WriteUInt64(srcEnd);
269     if (errCode != E_OK) {
270         return -E_SECUREC_ERROR;
271     }
272     errCode = parcel.WriteUInt64(targetBegin);
273     if (errCode != E_OK) {
274         return -E_SECUREC_ERROR;
275     }
276     errCode = parcel.WriteUInt64(targetEnd);
277     if (errCode != E_OK) {
278         return -E_SECUREC_ERROR;
279     }
280     parcel.EightByteAlign();
281 
282     return errCode;
283 }
284 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)285 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
286 {
287     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
288         return -E_INVALID_ARGS;
289     }
290     TimeSyncPacket packet;
291     Parcel parcel(const_cast<uint8_t *>(buffer), length);
292     Timestamp srcBegin;
293     Timestamp srcEnd;
294     Timestamp targetBegin;
295     Timestamp targetEnd;
296 
297     uint32_t version = 0;
298     parcel.ReadUInt32(version);
299     if (parcel.IsError()) {
300         return -E_INVALID_ARGS;
301     }
302     if (version > TIME_SYNC_VERSION_V1) {
303         packet.SetVersion(version);
304         return inMsg->SetCopiedObject<>(packet);
305     }
306     parcel.ReadUInt64(srcBegin);
307     parcel.ReadUInt64(srcEnd);
308     parcel.ReadUInt64(targetBegin);
309     parcel.ReadUInt64(targetEnd);
310     if (parcel.IsError()) {
311         return -E_INVALID_ARGS;
312     }
313     packet.SetSourceTimeBegin(srcBegin);
314     packet.SetSourceTimeEnd(srcEnd);
315     packet.SetTargetTimeBegin(targetBegin);
316     packet.SetTargetTimeEnd(targetEnd);
317 
318     return inMsg->SetCopiedObject<>(packet);
319 }
320 
AckRecv(const Message * message,uint32_t targetSessionId)321 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
322 {
323     // only check when sessionId is not 0, because old version timesync sessionId is 0.
324     if (message != nullptr && message->GetSessionId() != 0 &&
325         message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
326         LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
327         return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
328     }
329     if (!IsPacketValid(message, TYPE_RESPONSE)) {
330         return -E_INVALID_ARGS;
331     }
332     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
333     if (packet == nullptr) {
334         LOGE("[TimeSync] AckRecv packet is null");
335         return -E_INVALID_ARGS;
336     }
337 
338     TimeSyncPacket packetData = TimeSyncPacket(*packet);
339     Timestamp sourceTimeEnd = timeHelper_->GetTime();
340     packetData.SetSourceTimeEnd(sourceTimeEnd);
341     if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
342         packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
343         packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
344         packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
345         LOGD("[TimeSync][AckRecv] Time valid check failed.");
346         return -E_INVALID_TIME;
347     }
348     // calculate timeoffset of two devices
349     TimeOffset offset = CalculateTimeOffset(packetData);
350     LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
351         ", tBegin = %" PRIu64 ", offset = %" PRId64,
352         deviceId_.c_str(),
353         packetData.GetSourceTimeEnd(),
354         packetData.GetTargetTimeEnd(),
355         packetData.GetSourceTimeBegin(),
356         packetData.GetTargetTimeBegin(),
357         offset);
358 
359     // save timeoffset into metadata, maybe a block action
360     int errCode = SaveTimeOffset(deviceId_, offset);
361     isSynced_ = true;
362     {
363         std::lock_guard<std::mutex> lock(cvLock_);
364         isAckReceived_ = true;
365     }
366     conditionVar_.notify_all();
367     ResetTimer();
368     return errCode;
369 }
370 
RequestRecv(const Message * message)371 int TimeSync::RequestRecv(const Message *message)
372 {
373     if (!IsPacketValid(message, TYPE_REQUEST)) {
374         return -E_INVALID_ARGS;
375     }
376     Timestamp targetTimeBegin = timeHelper_->GetTime();
377 
378     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
379     if (packet == nullptr) {
380         return -E_INVALID_ARGS;
381     }
382 
383     // build timeSync ack packet
384     TimeSyncPacket ackPacket = TimeSyncPacket(*packet);
385     ackPacket.SetTargetTimeBegin(targetTimeBegin);
386     Timestamp targetTimeEnd = timeHelper_->GetTime();
387     ackPacket.SetTargetTimeEnd(targetTimeEnd);
388     LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
389         ", tbegin = %" PRIu64, deviceId_.c_str(), ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(),
390         ackPacket.GetSourceTimeBegin(), ackPacket.GetTargetTimeBegin());
391     if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
392         LOGD("[TimeSync][RequestRecv] Time valid check failed.");
393         return -E_INVALID_TIME;
394     }
395 
396     TimeOffset timeoffsetIgnoreRtt = static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - targetTimeBegin);
397     TimeOffset metadataTimeoffset;
398     metadata_->GetTimeOffset(deviceId_, metadataTimeoffset);
399 
400     // 2 is half of INT64_MAX
401     if ((std::abs(metadataTimeoffset) >= INT64_MAX / 2) || (std::abs(timeoffsetIgnoreRtt) >= INT64_MAX / 2) ||
402         (std::abs(metadataTimeoffset - timeoffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE)) {
403         LOGI("[TimeSync][RequestRecv] timeoffSet invalid, should do time sync");
404         isSynced_ = false;
405     }
406 
407     Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
408     if (ackMessage == nullptr) {
409         return -E_OUT_OF_MEMORY;
410     }
411     ackMessage->SetSessionId(message->GetSessionId());
412     ackMessage->SetPriority(Priority::NORMAL);
413     ackMessage->SetMessageType(TYPE_RESPONSE);
414     ackMessage->SetTarget(deviceId_);
415     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
416     if (errCode != E_OK) {
417         delete ackMessage;
418         ackMessage = nullptr;
419         return errCode;
420     }
421 
422     errCode = SendPacket(deviceId_, ackMessage);
423     if (errCode != E_OK) {
424         delete ackMessage;
425         ackMessage = nullptr;
426     }
427     return errCode;
428 }
429 
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)430 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
431 {
432     return metadata_->SaveTimeOffset(deviceID, timeOffset);
433 }
434 
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)435 TimeOffset TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
436 {
437     TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
438         timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
439     TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
440         timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
441     TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
442         timeSyncInfo.GetSourceTimeEnd());
443     TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
444     LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
445         ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
446     return offset;
447 }
448 
IsPacketValid(const Message * inMsg,uint16_t messageType)449 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
450 {
451     if (inMsg == nullptr) {
452         return false;
453     }
454     if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
455         LOGD("message Id = %d", inMsg->GetMessageId());
456         return false;
457     }
458     if (messageType != inMsg->GetMessageType()) {
459         LOGD("input Type = %" PRId16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
460         return false;
461     }
462     return true;
463 }
464 
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)465 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
466 {
467     SendConfig conf;
468     timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
469     int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
470     if (errCode != E_OK) {
471         LOGE("[TimeSync] SendPacket failed, err %d", errCode);
472     }
473     return errCode;
474 }
475 
TimeSyncDriver(TimerId timerId)476 int TimeSync::TimeSyncDriver(TimerId timerId)
477 {
478     if (timerId != driverTimerId_) {
479         return -E_INTERNAL_ERROR;
480     }
481     if (!isOnline_) {
482         return E_OK;
483     }
484     std::lock_guard<std::mutex> lock(timeDriverLock_);
485     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
486         CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
487         (void)this->SyncStart(handler);
488         std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
489         this->timeDriverLockCount_--;
490         this->timeDriverCond_.notify_all();
491     });
492     if (errCode != E_OK) {
493         LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
494         return errCode;
495     }
496     timeDriverLockCount_++;
497     return E_OK;
498 }
499 
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)500 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
501 {
502     if (!isSynced_) {
503         {
504             std::lock_guard<std::mutex> lock(cvLock_);
505             isAckReceived_ = false;
506         }
507         CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
508         int errCode = SyncStart(handler, sessionId);
509         LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
510             TimeHelper::GetSysCurrentTime(), errCode, timeout);
511         std::unique_lock<std::mutex> lock(cvLock_);
512         if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout),
513             [this](){ return this->isAckReceived_ || this->closed_; })) {
514             LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
515             retryTime_++;
516             if (retryTime_ < MAX_RETRY_TIME) {
517                 lock.unlock();
518                 LOGI("TimeSync::GetTimeOffset timeout, try again");
519                 return GetTimeOffset(outOffset, timeout);
520             }
521             retryTime_ = 0;
522             return -E_TIMEOUT;
523         }
524     }
525     if (IsClosed()) {
526         return -E_BUSY;
527     }
528     retryTime_ = 0;
529     metadata_->GetTimeOffset(deviceId_, outOffset);
530     return E_OK;
531 }
532 
IsNeedSync() const533 bool TimeSync::IsNeedSync() const
534 {
535     return !isSynced_;
536 }
537 
SetOnline(bool isOnline)538 void TimeSync::SetOnline(bool isOnline)
539 {
540     isOnline_ = isOnline;
541 }
542 
CommErrHandlerFunc(int errCode,TimeSync * timeSync)543 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
544 {
545     LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
546     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
547     if (timeSyncSet_.count(timeSync) == 0) {
548         LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
549         return;
550     }
551     if (timeSync == nullptr) {
552         LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
553         return;
554     }
555     if (errCode != E_OK) {
556         timeSync->SetOnline(false);
557     } else {
558         timeSync->SetOnline(true);
559     }
560 }
561 
ResetTimer()562 void TimeSync::ResetTimer()
563 {
564     std::lock_guard<std::mutex> lock(timeDriverLock_);
565     RuntimeContext::GetInstance()->RemoveTimer(driverTimerId_, true);
566     int errCode = RuntimeContext::GetInstance()->SetTimer(
567         TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
568     if (errCode != E_OK) {
569         LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
570     }
571 }
572 
Close()573 void TimeSync::Close()
574 {
575     Finalize();
576     {
577         std::lock_guard<std::mutex> lock(cvLock_);
578         closed_ = true;
579     }
580     conditionVar_.notify_all();
581 }
582 
IsClosed()583 bool TimeSync::IsClosed()
584 {
585     std::lock_guard<std::mutex> lock(cvLock_);
586     return closed_ ;
587 }
588 } // namespace DistributedDB