• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "time_sync.h"
17 
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24 
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29     constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30     constexpr int TRIP_DIV_HALF = 2;
31     constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
32     constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
33     constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units
34 }
35 
36 // Class TimeSyncPacket
TimeSyncPacket()37 TimeSyncPacket::TimeSyncPacket()
38     : sourceTimeBegin_(0),
39       sourceTimeEnd_(0),
40       targetTimeBegin_(0),
41       targetTimeEnd_(0),
42       version_(TIME_SYNC_VERSION_V1),
43       requestLocalOffset_(0),
44       responseLocalOffset_(0)
45 {
46 }
47 
~TimeSyncPacket()48 TimeSyncPacket::~TimeSyncPacket()
49 {
50 }
51 
SetSourceTimeBegin(Timestamp sourceTimeBegin)52 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
53 {
54     sourceTimeBegin_ = sourceTimeBegin;
55 }
56 
GetSourceTimeBegin() const57 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
58 {
59     return sourceTimeBegin_;
60 }
61 
SetSourceTimeEnd(Timestamp sourceTimeEnd)62 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
63 {
64     sourceTimeEnd_ = sourceTimeEnd;
65 }
66 
GetSourceTimeEnd() const67 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
68 {
69     return sourceTimeEnd_;
70 }
71 
SetTargetTimeBegin(Timestamp targetTimeBegin)72 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
73 {
74     targetTimeBegin_ = targetTimeBegin;
75 }
76 
GetTargetTimeBegin() const77 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
78 {
79     return targetTimeBegin_;
80 }
81 
SetTargetTimeEnd(Timestamp targetTimeEnd)82 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
83 {
84     targetTimeEnd_ = targetTimeEnd;
85 }
86 
GetTargetTimeEnd() const87 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
88 {
89     return targetTimeEnd_;
90 }
91 
SetVersion(uint32_t version)92 void TimeSyncPacket::SetVersion(uint32_t version)
93 {
94     version_ = version;
95 }
96 
GetVersion() const97 uint32_t TimeSyncPacket::GetVersion() const
98 {
99     return version_;
100 }
101 
GetRequestLocalOffset() const102 TimeOffset TimeSyncPacket::GetRequestLocalOffset() const
103 {
104     return requestLocalOffset_;
105 }
106 
SetRequestLocalOffset(TimeOffset offset)107 void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset)
108 {
109     requestLocalOffset_ = offset;
110 }
111 
GetResponseLocalOffset() const112 TimeOffset TimeSyncPacket::GetResponseLocalOffset() const
113 {
114     return responseLocalOffset_;
115 }
116 
SetResponseLocalOffset(TimeOffset offset)117 void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset)
118 {
119     responseLocalOffset_ = offset;
120 }
121 
CalculateLen()122 uint32_t TimeSyncPacket::CalculateLen()
123 {
124     uint32_t len = Parcel::GetUInt32Len(); // version_
125     len += Parcel::GetUInt64Len(); // sourceTimeBegin_
126     len += Parcel::GetUInt64Len(); // sourceTimeEnd_
127     len += Parcel::GetUInt64Len(); // targetTimeBegin_
128     len += Parcel::GetUInt64Len(); // targetTimeEnd_
129     len = Parcel::GetEightByteAlign(len);
130     len += Parcel::GetInt64Len(); // requestLocalOffset_
131     len += Parcel::GetInt64Len(); // responseLocalOffset_
132     return len;
133 }
134 
135 // Class TimeSync
TimeSync()136 TimeSync::TimeSync()
137     : communicateHandle_(nullptr),
138       metadata_(nullptr),
139       timeHelper_(nullptr),
140       retryTime_(0),
141       driverTimerId_(0),
142       isSynced_(false),
143       isAckReceived_(false),
144       timeChangedListener_(nullptr),
145       timeDriverLockCount_(0),
146       isOnline_(true),
147       closed_(false)
148 {
149 }
150 
~TimeSync()151 TimeSync::~TimeSync()
152 {
153     Finalize();
154     driverTimerId_ = 0;
155 
156     if (timeChangedListener_ != nullptr) {
157         timeChangedListener_->Drop(true);
158         timeChangedListener_ = nullptr;
159     }
160     timeHelper_ = nullptr;
161     communicateHandle_ = nullptr;
162     metadata_ = nullptr;
163 
164     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
165     timeSyncSet_.erase(this);
166 }
167 
RegisterTransformFunc()168 int TimeSync::RegisterTransformFunc()
169 {
170     TransformFunc func;
171     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
172     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
173         return Serialization(buffer, length, inMsg);
174     };
175     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
176         return DeSerialization(buffer, length, inMsg);
177     };
178     return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
179 }
180 
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId,const DeviceID & userId)181 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
182     const ISyncInterface *storage, const DeviceID &deviceId, const DeviceID &userId)
183 {
184     if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
185         return -E_INVALID_ARGS;
186     }
187     {
188         std::lock_guard<std::mutex> lock(timeSyncSetLock_);
189         timeSyncSet_.insert(this);
190     }
191     communicateHandle_ = communicator;
192     metadata_ = metadata;
193     deviceId_ = deviceId;
194     userId_ = userId;
195     timeHelper_ = std::make_unique<TimeHelper>();
196 
197     int errCode = timeHelper_->Initialize(storage, metadata_);
198     if (errCode != E_OK) {
199         timeHelper_ = nullptr;
200         LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
201         return errCode;
202     }
203     dbId_ = storage->GetIdentifier();
204     driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); };
205     errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
206     if (errCode != E_OK) {
207         return errCode;
208     }
209     isSynced_ = metadata_->IsTimeSyncFinish(deviceId_, userId_);
210     return errCode;
211 }
212 
Finalize()213 void TimeSync::Finalize()
214 {
215     // Stop the timer
216     LOGD("[TimeSync] Finalize enter!");
217     RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
218     TimerId timerId;
219     {
220         std::unique_lock<std::mutex> lock(timeDriverLock_);
221         timerId = driverTimerId_;
222     }
223     runtimeContext->RemoveTimer(timerId, true);
224     std::unique_lock<std::mutex> lock(timeDriverLock_);
225     timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; });
226     LOGD("[TimeSync] Finalized!");
227 }
228 
SyncStart(const CommErrHandler & handler,uint32_t sessionId,bool isRetryTask)229 int TimeSync::SyncStart(const CommErrHandler &handler, uint32_t sessionId, bool isRetryTask)
230 {
231     isOnline_ = true;
232     TimeSyncPacket packet;
233     Timestamp startTime = timeHelper_->GetTime();
234     packet.SetSourceTimeBegin(startTime);
235     TimeOffset timeOffset = metadata_->GetLocalTimeOffset();
236     packet.SetRequestLocalOffset(timeOffset);
237     // send timeSync request
238     LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset,
239         deviceId_.c_str());
240 
241     Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
242     if (message == nullptr) {
243         return -E_OUT_OF_MEMORY;
244     }
245     message->SetSessionId(sessionId);
246     message->SetMessageType(TYPE_REQUEST);
247     message->SetPriority(Priority::NORMAL);
248     int errCode = message->SetCopiedObject<>(packet);
249     if (errCode != E_OK) {
250         delete message;
251         message = nullptr;
252         return errCode;
253     }
254     errCode = SendMessageWithSendEnd(message, handler, isRetryTask);
255     if (errCode != E_OK) {
256         delete message;
257         message = nullptr;
258     }
259     return errCode;
260 }
261 
CalculateLen(const Message * inMsg)262 uint32_t TimeSync::CalculateLen(const Message *inMsg)
263 {
264     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
265         return 0;
266     }
267 
268     const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
269     if (packet == nullptr) {
270         return 0;
271     }
272 
273     return TimeSyncPacket::CalculateLen();
274 }
275 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)276 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
277 {
278     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
279         return -E_INVALID_ARGS;
280     }
281     const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
282     if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
283         return -E_INVALID_ARGS;
284     }
285 
286     Parcel parcel(buffer, length);
287     Timestamp srcBegin = packet->GetSourceTimeBegin();
288     Timestamp srcEnd = packet->GetSourceTimeEnd();
289     Timestamp targetBegin = packet->GetTargetTimeBegin();
290     Timestamp targetEnd = packet->GetTargetTimeEnd();
291 
292     int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
293     if (errCode != E_OK) {
294         return -E_SECUREC_ERROR;
295     }
296     errCode = parcel.WriteUInt64(srcBegin);
297     if (errCode != E_OK) {
298         return -E_SECUREC_ERROR;
299     }
300     errCode = parcel.WriteUInt64(srcEnd);
301     if (errCode != E_OK) {
302         return -E_SECUREC_ERROR;
303     }
304     errCode = parcel.WriteUInt64(targetBegin);
305     if (errCode != E_OK) {
306         return -E_SECUREC_ERROR;
307     }
308     errCode = parcel.WriteUInt64(targetEnd);
309     if (errCode != E_OK) {
310         return -E_SECUREC_ERROR;
311     }
312     parcel.EightByteAlign();
313     parcel.WriteInt64(packet->GetRequestLocalOffset());
314     parcel.WriteInt64(packet->GetResponseLocalOffset());
315     if (parcel.IsError()) {
316         return -E_PARSE_FAIL;
317     }
318     return errCode;
319 }
320 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)321 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
322 {
323     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
324         return -E_INVALID_ARGS;
325     }
326     TimeSyncPacket packet;
327     Parcel parcel(const_cast<uint8_t *>(buffer), length);
328     Timestamp srcBegin;
329     Timestamp srcEnd;
330     Timestamp targetBegin;
331     Timestamp targetEnd;
332 
333     uint32_t version = 0;
334     parcel.ReadUInt32(version);
335     if (parcel.IsError()) {
336         return -E_INVALID_ARGS;
337     }
338     if (version > TIME_SYNC_VERSION_V1) {
339         packet.SetVersion(version);
340         return inMsg->SetCopiedObject<>(packet);
341     }
342     parcel.ReadUInt64(srcBegin);
343     parcel.ReadUInt64(srcEnd);
344     parcel.ReadUInt64(targetBegin);
345     parcel.ReadUInt64(targetEnd);
346     if (parcel.IsError()) {
347         return -E_INVALID_ARGS;
348     }
349     packet.SetSourceTimeBegin(srcBegin);
350     packet.SetSourceTimeEnd(srcEnd);
351     packet.SetTargetTimeBegin(targetBegin);
352     packet.SetTargetTimeEnd(targetEnd);
353     parcel.EightByteAlign();
354     if (parcel.IsContinueRead()) {
355         TimeOffset requestLocalOffset;
356         TimeOffset responseLocalOffset;
357         parcel.ReadInt64(requestLocalOffset);
358         parcel.ReadInt64(responseLocalOffset);
359         if (parcel.IsError()) {
360             LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType());
361             return -E_PARSE_FAIL;
362         }
363         packet.SetRequestLocalOffset(requestLocalOffset);
364         packet.SetResponseLocalOffset(responseLocalOffset);
365     }
366 
367     return inMsg->SetCopiedObject<>(packet);
368 }
369 
AckRecv(const Message * message,uint32_t targetSessionId)370 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
371 {
372     // only check when sessionId is not 0, because old version timesync sessionId is 0.
373     if (message != nullptr && message->GetSessionId() != 0 && message->GetSessionId() == targetSessionId) {
374         if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
375             LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
376             return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
377         }
378     }
379     if (!IsPacketValid(message, TYPE_RESPONSE)) {
380         return -E_INVALID_ARGS;
381     }
382     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
383     if (packet == nullptr) {
384         LOGE("[TimeSync] AckRecv packet is null");
385         return -E_INVALID_ARGS;
386     }
387 
388     TimeSyncPacket packetData = TimeSyncPacket(*packet);
389     Timestamp sourceTimeEnd = timeHelper_->GetTime();
390     packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId));
391     packetData.SetSourceTimeEnd(sourceTimeEnd);
392     if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
393         packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
394         packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
395         packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
396         LOGD("[TimeSync][AckRecv] Time valid check failed.");
397         return -E_INVALID_TIME;
398     }
399     int errCode = SaveOffsetWithAck(packetData);
400     {
401         std::lock_guard<std::mutex> lock(cvLock_);
402         isAckReceived_ = true;
403     }
404     conditionVar_.notify_all();
405     ResetTimer();
406     return errCode;
407 }
408 
RequestRecv(const Message * message)409 int TimeSync::RequestRecv(const Message *message)
410 {
411     if (!IsPacketValid(message, TYPE_REQUEST)) {
412         return -E_INVALID_ARGS;
413     }
414 
415     const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
416     if (packet == nullptr) {
417         return -E_INVALID_ARGS;
418     }
419 
420     // build timeSync ack packet
421     TimeSyncPacket ackPacket = BuildAckPacket(*packet);
422     if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
423         LOGD("[TimeSync][RequestRecv] Time valid check failed.");
424         return -E_INVALID_TIME;
425     }
426     ReTimeSyncIfNeed(ackPacket);
427 
428     Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
429     if (ackMessage == nullptr) {
430         return -E_OUT_OF_MEMORY;
431     }
432     ackMessage->SetSessionId(message->GetSessionId());
433     ackMessage->SetPriority(Priority::NORMAL);
434     ackMessage->SetMessageType(TYPE_RESPONSE);
435     ackMessage->SetTarget(deviceId_);
436     int errCode = ackMessage->SetCopiedObject<>(ackPacket);
437     if (errCode != E_OK) {
438         delete ackMessage;
439         ackMessage = nullptr;
440         return errCode;
441     }
442 
443     errCode = SendPacket(deviceId_, ackMessage);
444     if (errCode != E_OK) {
445         delete ackMessage;
446         ackMessage = nullptr;
447     }
448     return errCode;
449 }
450 
SaveTimeOffset(const DeviceID & deviceID,const DeviceID & userId,TimeOffset timeOffset)451 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, const DeviceID &userId, TimeOffset timeOffset)
452 {
453     return metadata_->SaveTimeOffset(deviceID, userId, timeOffset);
454 }
455 
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)456 std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
457 {
458     TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
459         timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
460     TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
461         timeSyncInfo.GetSourceTimeBegin()) - (roundTrip / TRIP_DIV_HALF);
462     TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetSourceTimeEnd()) +
463         (roundTrip / TRIP_DIV_HALF);
464     TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
465     LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
466         ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
467     return {offset, roundTrip};
468 }
469 
IsPacketValid(const Message * inMsg,uint16_t messageType)470 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
471 {
472     if (inMsg == nullptr) {
473         return false;
474     }
475     if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
476         LOGD("message Id = %d", inMsg->GetMessageId());
477         return false;
478     }
479     if (messageType != inMsg->GetMessageType()) {
480         LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
481         return false;
482     }
483     return true;
484 }
485 
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler,bool isRetryTask)486 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler,
487     bool isRetryTask)
488 {
489     SendConfig conf;
490     timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
491     conf.isRetryTask = isRetryTask;
492     int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
493     if (errCode != E_OK) {
494         LOGE("[TimeSync] SendPacket failed, err %d", errCode);
495     }
496     return errCode;
497 }
498 
TimeSyncDriver(TimerId timerId)499 int TimeSync::TimeSyncDriver(TimerId timerId)
500 {
501     if (timerId != driverTimerId_) {
502         return -E_INTERNAL_ERROR;
503     }
504     if (!isOnline_) {
505         return E_OK;
506     }
507     std::lock_guard<std::mutex> lock(timeDriverLock_);
508     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
509         CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
510         (void)this->SyncStart(handler);
511         std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
512         this->timeDriverLockCount_--;
513         this->timeDriverCond_.notify_all();
514     });
515     if (errCode != E_OK) {
516         LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
517         return errCode;
518     }
519     timeDriverLockCount_++;
520     return E_OK;
521 }
522 
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)523 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
524 {
525     if (!isSynced_) {
526         {
527             std::lock_guard<std::mutex> lock(cvLock_);
528             isAckReceived_ = false;
529         }
530         CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
531         int errCode = SyncStart(handler, sessionId);
532         LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
533             TimeHelper::GetSysCurrentTime(), errCode, timeout);
534         std::unique_lock<std::mutex> lock(cvLock_);
535         if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() {
536             return this->isAckReceived_ || this->closed_;
537             })) {
538             LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
539             retryTime_++;
540             if (retryTime_ < MAX_RETRY_TIME) {
541                 lock.unlock();
542                 LOGI("TimeSync::GetTimeOffset timeout, try again");
543                 return GetTimeOffset(outOffset, timeout);
544             }
545             retryTime_ = 0;
546             return -E_TIMEOUT;
547         }
548     }
549     if (IsClosed()) {
550         return -E_BUSY;
551     }
552     retryTime_ = 0;
553     metadata_->GetTimeOffset(deviceId_, userId_, outOffset);
554     return E_OK;
555 }
556 
IsNeedSync() const557 bool TimeSync::IsNeedSync() const
558 {
559     return !isSynced_;
560 }
561 
SetOnline(bool isOnline)562 void TimeSync::SetOnline(bool isOnline)
563 {
564     isOnline_ = isOnline;
565 }
566 
CommErrHandlerFunc(int errCode,TimeSync * timeSync)567 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
568 {
569     LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
570     std::lock_guard<std::mutex> lock(timeSyncSetLock_);
571     if (timeSyncSet_.count(timeSync) == 0) {
572         LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
573         return;
574     }
575     if (timeSync == nullptr) {
576         LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
577         return;
578     }
579     if (errCode != E_OK) {
580         timeSync->SetOnline(false);
581     } else {
582         timeSync->SetOnline(true);
583     }
584 }
585 
ResetTimer()586 void TimeSync::ResetTimer()
587 {
588     TimerId timerId;
589     {
590         std::lock_guard<std::mutex> lock(timeDriverLock_);
591         timerId = driverTimerId_;
592         driverTimerId_ = 0u;
593     }
594     if (timerId == 0u) {
595         return;
596     }
597     RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
598     int errCode = RuntimeContext::GetInstance()->SetTimer(
599         TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
600     if (errCode != E_OK) {
601         LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
602     } else {
603         std::lock_guard<std::mutex> lock(timeDriverLock_);
604         driverTimerId_ = timerId;
605     }
606 }
607 
Close()608 void TimeSync::Close()
609 {
610     Finalize();
611     {
612         std::lock_guard<std::mutex> lock(cvLock_);
613         closed_ = true;
614     }
615     conditionVar_.notify_all();
616 }
617 
IsClosed() const618 bool TimeSync::IsClosed() const
619 {
620     std::lock_guard<std::mutex> lock(cvLock_);
621     return closed_ ;
622 }
623 
SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler,bool isRetryTask)624 int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler, bool isRetryTask)
625 {
626     std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
627     auto sessionId = message->GetSessionId();
628     return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
629         if (closed_) {
630             LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
631             return;
632         }
633         {
634             std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
635             sessionBeginTime_.clear();
636             sessionBeginTime_[sessionId] = timeHelper_->GetTime();
637         }
638         if (handler != nullptr) {
639             handler(errCode, isDirectEnd);
640         }
641     }, isRetryTask);
642 }
643 
GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)644 Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId)
645 {
646     std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
647     if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) {
648         LOGW("[TimeSync] Current cache not exist packet send time");
649         return packetBeginTime;
650     }
651     auto sendTime = sessionBeginTime_[sessionId];
652     LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime);
653     return sendTime;
654 }
655 
BuildAckPacket(const TimeSyncPacket & request)656 TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request)
657 {
658     TimeSyncPacket ackPacket = TimeSyncPacket(request);
659     Timestamp targetTimeBegin = timeHelper_->GetTime();
660     ackPacket.SetTargetTimeBegin(targetTimeBegin);
661     Timestamp targetTimeEnd = timeHelper_->GetTime();
662     ackPacket.SetTargetTimeEnd(targetTimeEnd);
663     TimeOffset requestOffset = request.GetRequestLocalOffset();
664     TimeOffset responseOffset = metadata_->GetLocalTimeOffset();
665     ackPacket.SetRequestLocalOffset(requestOffset);
666     ackPacket.SetResponseLocalOffset(responseOffset);
667     LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
668         ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(),
669         ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(),
670         ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset);
671     return ackPacket;
672 }
673 
IsRemoteLowVersion(uint32_t checkVersion)674 bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion)
675 {
676     uint16_t version = 0;
677     int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version);
678     return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST);
679 }
680 
ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)681 void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket)
682 {
683     TimeOffset timeOffsetIgnoreRtt =
684         static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin());
685     bool reTimeSync = false;
686     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
687         reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt);
688     } else {
689         reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket);
690     }
691 
692     if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX
693         LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync");
694         SetTimeSyncFinishInner(false);
695         RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
696     }
697 
698     // reset time change by time sync
699     int errCode = metadata_->SetTimeChangeMark(deviceId_, userId_, false);
700     if (errCode != E_OK) {
701         LOGW("[TimeSync] Mark dev %.3s_%.3s no time change failed %d", deviceId_.c_str(), userId_.c_str(), errCode);
702     }
703 }
704 
CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)705 bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)
706 {
707     TimeOffset metadataTimeOffset;
708     metadata_->GetTimeOffset(deviceId_, userId_, metadataTimeOffset);
709     return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX
710         (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE);
711 }
712 
CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)713 bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket)
714 {
715     TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() +
716         ackPacket.GetResponseLocalOffset();
717     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
718     return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE);
719 }
720 
SaveOffsetWithAck(const TimeSyncPacket & ackPacket)721 int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket)
722 {
723     // calculate timeoffset of two devices
724     auto [offset, rtt] = CalculateTimeOffset(ackPacket);
725     TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset);
726     LOGD("TimeSync::AckRecv, dev = %s{private}_%s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
727         ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64
728         ", responseLocalOffset = %" PRId64,
729         deviceId_.c_str(),
730         userId_.c_str(),
731         ackPacket.GetSourceTimeEnd(),
732         ackPacket.GetTargetTimeEnd(),
733         ackPacket.GetSourceTimeBegin(),
734         ackPacket.GetTargetTimeBegin(),
735         offset,
736         rawOffset,
737         ackPacket.GetRequestLocalOffset(),
738         ackPacket.GetResponseLocalOffset());
739 
740     // save timeoffset into metadata, maybe a block action
741     int errCode = SaveTimeOffset(deviceId_, userId_, offset);
742     if (errCode != E_OK) {
743         return errCode;
744     }
745     errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, rawOffset);
746     if (errCode != E_OK) {
747         return errCode;
748     }
749     DeviceTimeInfo info;
750     info.systemTimeOffset = rawOffset;
751     info.recordTime = timeHelper_->GetSysCurrentTime();
752     info.rtt = rtt;
753     RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
754     SetTimeSyncFinishInner(true);
755     // save finish next time after save failed
756     return E_OK;
757 }
758 
CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)759 TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime)
760 {
761     // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2
762     // rawTimeOffset =  request - response + (t1' - t1 + t2' - t2)/2
763     // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset
764     return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset();
765 }
766 
CheckSkipTimeSync(const DeviceTimeInfo & info)767 bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info)
768 {
769     uint64_t currentRawTime = timeHelper_->GetSysCurrentTime();
770     if (currentRawTime < info.recordTime) {
771         LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime);
772         return false;
773     }
774     uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime;
775     if (info.rtt < MAX_TIME_RTT_NOISE) {
776         return true;
777     }
778     if (interval > RTT_NOISE_CHECK_INTERVAL) {
779         LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt,
780             interval);
781         return false;
782     }
783 #ifdef DE_DEBUG_ENV
784 #ifdef TEST_RTT_NOISE_CHECK_INTERVAL
785     if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) {
786         LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64,
787             info.rtt, interval);
788         return false;
789     }
790 #endif
791 #endif
792     return true;
793 }
794 
SetTimeSyncFinishIfNeed()795 void TimeSync::SetTimeSyncFinishIfNeed()
796 {
797     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
798     if (errCode != E_OK) {
799         return;
800     }
801     int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_, userId_);
802     LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset);
803     if (!CheckSkipTimeSync(info) || (IsNeedSync() &&
804         std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) {
805         SetTimeSyncFinishInner(false);
806         return;
807     }
808     if (IsNeedSync()) {
809         SetTimeSyncFinishInner(true);
810     }
811     if (systemTimeOffset != info.systemTimeOffset) {
812         errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, info.systemTimeOffset);
813         if (errCode != E_OK) {
814             return;
815         }
816     }
817     LOGI("[TimeSync] Mark time sync finish success");
818 }
819 
ClearTimeSyncFinish()820 void TimeSync::ClearTimeSyncFinish()
821 {
822     RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
823     SetTimeSyncFinishInner(false);
824 }
825 
GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)826 int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset)
827 {
828     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
829         return E_OK;
830     }
831     auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
832     bool syncFinish = !IsNeedSync();
833     bool timeChange = metadata_->IsTimeChange(deviceId_, userId_);
834     // avoid local time change but remote record time sync finish
835     // should return re time sync, after receive time sync request, reset time change mark
836     // we think offset is ok when local time sync to remote
837     if ((timeChange && !syncFinish) ||
838         (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) {
839         LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64
840             " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange),
841             static_cast<int>(syncFinish));
842         ClearTimeSyncFinish();
843         RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
844         return -E_NEED_TIME_SYNC;
845     }
846     // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset
847     // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset
848     // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset
849     TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset();
850     errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, -systemOffset);
851     if (errCode != E_OK) {
852         return errCode;
853     }
854     errCode = metadata_->SaveTimeOffset(deviceId_, userId_, offset);
855     if (errCode != E_OK) {
856         return errCode;
857     }
858     SetTimeSyncFinishInner(true);
859     info.systemTimeOffset = -systemOffset;
860     info.recordTime = timeHelper_->GetSysCurrentTime();
861     RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
862     return E_OK;
863 }
864 
SetTimeSyncFinishInner(bool finish)865 void TimeSync::SetTimeSyncFinishInner(bool finish)
866 {
867     isSynced_ = finish;
868     if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
869         return;
870     }
871     int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, userId_, finish);
872     if (errCode != E_OK) {
873         LOGW("[TimeSync] Set %.3s_%.3s time sync finish %d mark failed %d", deviceId_.c_str(), userId_.c_str(),
874             static_cast<int>(finish), errCode);
875     }
876 }
877 } // namespace DistributedDB