• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "time_sync.h"
17 
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24 
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29     constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30     constexpr int TRIP_DIV_HALF = 2;
31     constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1s for 100ns
32 }
33 
34 // Class TimeSyncPacket
TimeSyncPacket()35 TimeSyncPacket::TimeSyncPacket()
36     : sourceTimeBegin_(0),
37       sourceTimeEnd_(0),
38       targetTimeBegin_(0),
39       targetTimeEnd_(0),
40       version_(TIME_SYNC_VERSION_V1)
41 {
42 }
43 
~TimeSyncPacket()44 TimeSyncPacket::~TimeSyncPacket()
45 {
46 }
47 
SetSourceTimeBegin(Timestamp sourceTimeBegin)48 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
49 {
50     sourceTimeBegin_ = sourceTimeBegin;
51 }
52 
GetSourceTimeBegin() const53 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
54 {
55     return sourceTimeBegin_;
56 }
57 
SetSourceTimeEnd(Timestamp sourceTimeEnd)58 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
59 {
60     sourceTimeEnd_ = sourceTimeEnd;
61 }
62 
GetSourceTimeEnd() const63 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
64 {
65     return sourceTimeEnd_;
66 }
67 
SetTargetTimeBegin(Timestamp targetTimeBegin)68 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
69 {
70     targetTimeBegin_ = targetTimeBegin;
71 }
72 
GetTargetTimeBegin() const73 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
74 {
75     return targetTimeBegin_;
76 }
77 
SetTargetTimeEnd(Timestamp targetTimeEnd)78 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
79 {
80     targetTimeEnd_ = targetTimeEnd;
81 }
82 
GetTargetTimeEnd() const83 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
84 {
85     return targetTimeEnd_;
86 }
87 
SetVersion(uint32_t version)88 void TimeSyncPacket::SetVersion(uint32_t version)
89 {
90     version_ = version;
91 }
92 
GetVersion() const93 uint32_t TimeSyncPacket::GetVersion() const
94 {
95     return version_;
96 }
97 
CalculateLen()98 uint32_t TimeSyncPacket::CalculateLen()
99 {
100     uint32_t len = Parcel::GetUInt32Len();
101     len += Parcel::GetUInt64Len();
102     len += Parcel::GetUInt64Len();
103     len += Parcel::GetUInt64Len();
104     len += Parcel::GetUInt64Len();
105     len = Parcel::GetEightByteAlign(len);
106     return len;
107 }
108 
109 // Class TimeSync
TimeSync()110 TimeSync::TimeSync()
111     : communicateHandle_(nullptr),
112       metadata_(nullptr),
113       timeHelper_(nullptr),
114       retryTime_(0),
115       driverTimerId_(0),
116       isSynced_(false),
117       isAckReceived_(false),
118       timeChangedListener_(nullptr),
119       timeDriverLockCount_(0),
120       isOnline_(true),
121       closed_(false)
122 {
123 }
124 
~TimeSync()125 TimeSync::~TimeSync()
126 {
127     Finalize();
128     driverTimerId_ = 0;
129 
130     if (timeChangedListener_ != nullptr) {
131         timeChangedListener_->Drop(true);
132         timeChangedListener_ = nullptr;
133     }
134     timeHelper_ = nullptr;
135     communicateHandle_ = nullptr;
136     metadata_ = nullptr;
137 
138     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
139     timeSyncSet_.erase(this);
140 }
141 
RegisterTransformFunc()142 int TimeSync::RegisterTransformFunc()
143 {
144     TransformFunc func;
145     func.computeFunc = std::bind(&TimeSync::CalculateLen, std::placeholders::_1);
146     func.serializeFunc = std::bind(&TimeSync::Serialization, std::placeholders::_1,
147                                    std::placeholders::_2, std::placeholders::_3);
148     func.deserializeFunc = std::bind(&TimeSync::DeSerialization, std::placeholders::_1,
149                                      std::placeholders::_2, std::placeholders::_3);
150     return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
151 }
152 
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId)153 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
154     const ISyncInterface *storage, const DeviceID &deviceId)
155 {
156     if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
157         return -E_INVALID_ARGS;
158     }
159     {
160         std::lock_guard<std::mutex> lock(timeSyncSetLock_);
161         timeSyncSet_.insert(this);
162     }
163     communicateHandle_ = communicator;
164     metadata_ = metadata;
165     deviceId_ = deviceId;
166     timeHelper_ = std::make_unique<TimeHelper>();
167 
168     int errCode = timeHelper_->Initialize(storage, metadata_);
169     if (errCode != E_OK) {
170         timeHelper_ = nullptr;
171         LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
172         return errCode;
173     }
174 
175     driverCallback_ = std::bind(&TimeSync::TimeSyncDriver, this, std::placeholders::_1);
176     errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
177     if (errCode != E_OK) {
178         return errCode;
179     }
180     return errCode;
181 }
182 
Finalize()183 void TimeSync::Finalize()
184 {
185     // Stop the timer
186     LOGD("[TimeSync] Finalize enter!");
187     RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
188     TimerId timerId;
189     {
190         std::unique_lock<std::mutex> lock(timeDriverLock_);
191         timerId = driverTimerId_;
192     }
193     runtimeContext->RemoveTimer(timerId, true);
194     std::unique_lock<std::mutex> lock(timeDriverLock_);
195     timeDriverCond_.wait(lock, [this](){ return this->timeDriverLockCount_ == 0; });
196     LOGD("[TimeSync] Finalized!");
197 }
198 
SyncStart(const CommErrHandler & handler,uint32_t sessionId)199 int TimeSync::SyncStart(const CommErrHandler &handler,  uint32_t sessionId)
200 {
201     isOnline_ = true;
202     TimeSyncPacket packet;
203     Timestamp startTime = timeHelper_->GetTime();
204     packet.SetSourceTimeBegin(startTime);
205     // send timeSync request
206     LOGD("[TimeSync] startTime = %" PRIu64 ", dev = %s{private}", startTime, deviceId_.c_str());
207 
208     Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
209     if (message == nullptr) {
210         return -E_OUT_OF_MEMORY;
211     }
212     message->SetSessionId(sessionId);
213     message->SetMessageType(TYPE_REQUEST);
214     message->SetPriority(Priority::NORMAL);
215     int errCode = message->SetCopiedObject<>(packet);
216     if (errCode != E_OK) {
217         delete message;
218         message = nullptr;
219         return errCode;
220     }
221 
222     errCode = SendPacket(deviceId_, message, handler);
223     if (errCode != E_OK) {
224         delete message;
225         message = nullptr;
226     }
227     return errCode;
228 }
229 
CalculateLen(const Message * inMsg)230 uint32_t TimeSync::CalculateLen(const Message *inMsg)
231 {
232     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
233         return 0;
234     }
235 
236     const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
237     if (packet == nullptr) {
238         return 0;
239     }
240 
241     return TimeSyncPacket::CalculateLen();
242 }
243 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)244 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
245 {
246     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
247         return -E_INVALID_ARGS;
248     }
249     const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
250     if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
251         return -E_INVALID_ARGS;
252     }
253 
254     Parcel parcel(buffer, length);
255     Timestamp srcBegin = packet->GetSourceTimeBegin();
256     Timestamp srcEnd = packet->GetSourceTimeEnd();
257     Timestamp targetBegin = packet->GetTargetTimeBegin();
258     Timestamp targetEnd = packet->GetTargetTimeEnd();
259 
260     int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
261     if (errCode != E_OK) {
262         return -E_SECUREC_ERROR;
263     }
264     errCode = parcel.WriteUInt64(srcBegin);
265     if (errCode != E_OK) {
266         return -E_SECUREC_ERROR;
267     }
268     errCode = parcel.WriteUInt64(srcEnd);
269     if (errCode != E_OK) {
270         return -E_SECUREC_ERROR;
271     }
272     errCode = parcel.WriteUInt64(targetBegin);
273     if (errCode != E_OK) {
274         return -E_SECUREC_ERROR;
275     }
276     errCode = parcel.WriteUInt64(targetEnd);
277     if (errCode != E_OK) {
278         return -E_SECUREC_ERROR;
279     }
280     parcel.EightByteAlign();
281     if (parcel.IsError()) {
282         return -E_PARSE_FAIL;
283     }
284     return errCode;
285 }
286 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)287 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
288 {
289     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
290         return -E_INVALID_ARGS;
291     }
292     TimeSyncPacket packet;
293     Parcel parcel(const_cast<uint8_t *>(buffer), length);
294     Timestamp srcBegin;
295     Timestamp srcEnd;
296     Timestamp targetBegin;
297     Timestamp targetEnd;
298 
299     uint32_t version = 0;
300     parcel.ReadUInt32(version);
301     if (parcel.IsError()) {
302         return -E_INVALID_ARGS;
303     }
304     if (version > TIME_SYNC_VERSION_V1) {
305         packet.SetVersion(version);
306         return inMsg->SetCopiedObject<>(packet);
307     }
308     parcel.ReadUInt64(srcBegin);
309     parcel.ReadUInt64(srcEnd);
310     parcel.ReadUInt64(targetBegin);
311     parcel.ReadUInt64(targetEnd);
312     if (parcel.IsError()) {
313         return -E_INVALID_ARGS;
314     }
315     packet.SetSourceTimeBegin(srcBegin);
316     packet.SetSourceTimeEnd(srcEnd);
317     packet.SetTargetTimeBegin(targetBegin);
318     packet.SetTargetTimeEnd(targetEnd);
319 
320     return inMsg->SetCopiedObject<>(packet);
321 }
322 
AckRecv(const Message * message,uint32_t targetSessionId)323 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
324 {
325     // only check when sessionId is not 0, because old version timesync sessionId is 0.
326     if (message != nullptr && message->GetSessionId() != 0 &&
327         message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) {
328         LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
329         return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
330     }
331     if (!IsPacketValid(message, TYPE_RESPONSE)) {
332         return -E_INVALID_ARGS;
333     }
334     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
335     if (packet == nullptr) {
336         LOGE("[TimeSync] AckRecv packet is null");
337         return -E_INVALID_ARGS;
338     }
339 
340     TimeSyncPacket packetData = TimeSyncPacket(*packet);
341     Timestamp sourceTimeEnd = timeHelper_->GetTime();
342     packetData.SetSourceTimeEnd(sourceTimeEnd);
343     if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
344         packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
345         packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
346         packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
347         LOGD("[TimeSync][AckRecv] Time valid check failed.");
348         return -E_INVALID_TIME;
349     }
350     // calculate timeoffset of two devices
351     TimeOffset offset = CalculateTimeOffset(packetData);
352     LOGD("TimeSync::AckRecv, dev = %s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
353         ", tBegin = %" PRIu64 ", offset = %" PRId64,
354         deviceId_.c_str(),
355         packetData.GetSourceTimeEnd(),
356         packetData.GetTargetTimeEnd(),
357         packetData.GetSourceTimeBegin(),
358         packetData.GetTargetTimeBegin(),
359         offset);
360 
361     // save timeoffset into metadata, maybe a block action
362     int errCode = SaveTimeOffset(deviceId_, offset);
363     isSynced_ = true;
364     {
365         std::lock_guard<std::mutex> lock(cvLock_);
366         isAckReceived_ = true;
367     }
368     conditionVar_.notify_all();
369     ResetTimer();
370     return errCode;
371 }
372 
RequestRecv(const Message * message)373 int TimeSync::RequestRecv(const Message *message)
374 {
375     if (!IsPacketValid(message, TYPE_REQUEST)) {
376         return -E_INVALID_ARGS;
377     }
378     Timestamp targetTimeBegin = timeHelper_->GetTime();
379 
380     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
381     if (packet == nullptr) {
382         return -E_INVALID_ARGS;
383     }
384 
385     // build timeSync ack packet
386     TimeSyncPacket ackPacket = TimeSyncPacket(*packet);
387     ackPacket.SetTargetTimeBegin(targetTimeBegin);
388     Timestamp targetTimeEnd = timeHelper_->GetTime();
389     ackPacket.SetTargetTimeEnd(targetTimeEnd);
390     LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
391         ", tbegin = %" PRIu64, deviceId_.c_str(), ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(),
392         ackPacket.GetSourceTimeBegin(), ackPacket.GetTargetTimeBegin());
393     if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
394         LOGD("[TimeSync][RequestRecv] Time valid check failed.");
395         return -E_INVALID_TIME;
396     }
397 
398     TimeOffset timeoffsetIgnoreRtt = static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - targetTimeBegin);
399     TimeOffset metadataTimeoffset;
400     metadata_->GetTimeOffset(deviceId_, metadataTimeoffset);
401 
402     // 2 is half of INT64_MAX
403     if ((std::abs(metadataTimeoffset) >= INT64_MAX / 2) || (std::abs(timeoffsetIgnoreRtt) >= INT64_MAX / 2) ||
404         (std::abs(metadataTimeoffset - timeoffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE)) {
405         LOGI("[TimeSync][RequestRecv] timeoffSet invalid, should do time sync");
406         isSynced_ = false;
407     }
408 
409     Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
410     if (ackMessage == nullptr) {
411         return -E_OUT_OF_MEMORY;
412     }
413     ackMessage->SetSessionId(message->GetSessionId());
414     ackMessage->SetPriority(Priority::NORMAL);
415     ackMessage->SetMessageType(TYPE_RESPONSE);
416     ackMessage->SetTarget(deviceId_);
417     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
418     if (errCode != E_OK) {
419         delete ackMessage;
420         ackMessage = nullptr;
421         return errCode;
422     }
423 
424     errCode = SendPacket(deviceId_, ackMessage);
425     if (errCode != E_OK) {
426         delete ackMessage;
427         ackMessage = nullptr;
428     }
429     return errCode;
430 }
431 
SaveTimeOffset(const DeviceID & deviceID,TimeOffset timeOffset)432 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, TimeOffset timeOffset)
433 {
434     return metadata_->SaveTimeOffset(deviceID, timeOffset);
435 }
436 
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)437 TimeOffset TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
438 {
439     TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
440         timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
441     TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
442         timeSyncInfo.GetSourceTimeBegin() - (roundTrip / TRIP_DIV_HALF));
443     TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() + (roundTrip / TRIP_DIV_HALF) -
444         timeSyncInfo.GetSourceTimeEnd());
445     TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
446     LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
447         ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
448     return offset;
449 }
450 
IsPacketValid(const Message * inMsg,uint16_t messageType)451 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
452 {
453     if (inMsg == nullptr) {
454         return false;
455     }
456     if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
457         LOGD("message Id = %d", inMsg->GetMessageId());
458         return false;
459     }
460     if (messageType != inMsg->GetMessageType()) {
461         LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
462         return false;
463     }
464     return true;
465 }
466 
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler)467 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler)
468 {
469     SendConfig conf;
470     timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
471     int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
472     if (errCode != E_OK) {
473         LOGE("[TimeSync] SendPacket failed, err %d", errCode);
474     }
475     return errCode;
476 }
477 
TimeSyncDriver(TimerId timerId)478 int TimeSync::TimeSyncDriver(TimerId timerId)
479 {
480     if (timerId != driverTimerId_) {
481         return -E_INTERNAL_ERROR;
482     }
483     if (!isOnline_) {
484         return E_OK;
485     }
486     std::lock_guard<std::mutex> lock(timeDriverLock_);
487     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
488         CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
489         (void)this->SyncStart(handler);
490         std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
491         this->timeDriverLockCount_--;
492         this->timeDriverCond_.notify_all();
493     });
494     if (errCode != E_OK) {
495         LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
496         return errCode;
497     }
498     timeDriverLockCount_++;
499     return E_OK;
500 }
501 
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)502 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
503 {
504     if (!isSynced_) {
505         {
506             std::lock_guard<std::mutex> lock(cvLock_);
507             isAckReceived_ = false;
508         }
509         CommErrHandler handler = std::bind(&TimeSync::CommErrHandlerFunc, std::placeholders::_1, this);
510         int errCode = SyncStart(handler, sessionId);
511         LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
512             TimeHelper::GetSysCurrentTime(), errCode, timeout);
513         std::unique_lock<std::mutex> lock(cvLock_);
514         if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout),
515             [this](){ return this->isAckReceived_ || this->closed_; })) {
516             LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
517             retryTime_++;
518             if (retryTime_ < MAX_RETRY_TIME) {
519                 lock.unlock();
520                 LOGI("TimeSync::GetTimeOffset timeout, try again");
521                 return GetTimeOffset(outOffset, timeout);
522             }
523             retryTime_ = 0;
524             return -E_TIMEOUT;
525         }
526     }
527     if (IsClosed()) {
528         return -E_BUSY;
529     }
530     retryTime_ = 0;
531     metadata_->GetTimeOffset(deviceId_, outOffset);
532     return E_OK;
533 }
534 
IsNeedSync() const535 bool TimeSync::IsNeedSync() const
536 {
537     return !isSynced_;
538 }
539 
SetOnline(bool isOnline)540 void TimeSync::SetOnline(bool isOnline)
541 {
542     isOnline_ = isOnline;
543 }
544 
CommErrHandlerFunc(int errCode,TimeSync * timeSync)545 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
546 {
547     LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
548     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
549     if (timeSyncSet_.count(timeSync) == 0) {
550         LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
551         return;
552     }
553     if (timeSync == nullptr) {
554         LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
555         return;
556     }
557     if (errCode != E_OK) {
558         timeSync->SetOnline(false);
559     } else {
560         timeSync->SetOnline(true);
561     }
562 }
563 
ResetTimer()564 void TimeSync::ResetTimer()
565 {
566     TimerId timerId;
567     {
568         std::lock_guard<std::mutex> lock(timeDriverLock_);
569         timerId = driverTimerId_;
570         driverTimerId_ = 0u;
571     }
572     if (timerId == 0u) {
573         return;
574     }
575     RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
576     int errCode = RuntimeContext::GetInstance()->SetTimer(
577         TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
578     if (errCode != E_OK) {
579         LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
580     } else {
581         std::lock_guard<std::mutex> lock(timeDriverLock_);
582         driverTimerId_ = timerId;
583     }
584 }
585 
Close()586 void TimeSync::Close()
587 {
588     Finalize();
589     {
590         std::lock_guard<std::mutex> lock(cvLock_);
591         closed_ = true;
592     }
593     conditionVar_.notify_all();
594 }
595 
IsClosed() const596 bool TimeSync::IsClosed() const
597 {
598     std::lock_guard<std::mutex> lock(cvLock_);
599     return closed_ ;
600 }
601 } // namespace DistributedDB