1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "time_sync.h"
17
18 #include "parcel.h"
19 #include "log_print.h"
20 #include "sync_types.h"
21 #include "message_transform.h"
22 #include "version.h"
23 #include "isync_task_context.h"
24
25 namespace DistributedDB {
26 std::mutex TimeSync::timeSyncSetLock_;
27 std::set<TimeSync *> TimeSync::timeSyncSet_;
28 namespace {
29 constexpr uint64_t TIME_SYNC_INTERVAL = 24 * 60 * 60 * 1000; // 24h
30 constexpr int TRIP_DIV_HALF = 2;
31 constexpr int64_t MAX_TIME_OFFSET_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
32 constexpr int64_t MAX_TIME_RTT_NOISE = 1 * 1000 * 10000; // 1 second in 100-nanosecond units
33 constexpr uint64_t RTT_NOISE_CHECK_INTERVAL = 30 * 60 * 1000 * 10000u; // 30 minute in 100-nanosecond units
34 }
35
36 // Class TimeSyncPacket
TimeSyncPacket()37 TimeSyncPacket::TimeSyncPacket()
38 : sourceTimeBegin_(0),
39 sourceTimeEnd_(0),
40 targetTimeBegin_(0),
41 targetTimeEnd_(0),
42 version_(TIME_SYNC_VERSION_V1),
43 requestLocalOffset_(0),
44 responseLocalOffset_(0)
45 {
46 }
47
~TimeSyncPacket()48 TimeSyncPacket::~TimeSyncPacket()
49 {
50 }
51
SetSourceTimeBegin(Timestamp sourceTimeBegin)52 void TimeSyncPacket::SetSourceTimeBegin(Timestamp sourceTimeBegin)
53 {
54 sourceTimeBegin_ = sourceTimeBegin;
55 }
56
GetSourceTimeBegin() const57 Timestamp TimeSyncPacket::GetSourceTimeBegin() const
58 {
59 return sourceTimeBegin_;
60 }
61
SetSourceTimeEnd(Timestamp sourceTimeEnd)62 void TimeSyncPacket::SetSourceTimeEnd(Timestamp sourceTimeEnd)
63 {
64 sourceTimeEnd_ = sourceTimeEnd;
65 }
66
GetSourceTimeEnd() const67 Timestamp TimeSyncPacket::GetSourceTimeEnd() const
68 {
69 return sourceTimeEnd_;
70 }
71
SetTargetTimeBegin(Timestamp targetTimeBegin)72 void TimeSyncPacket::SetTargetTimeBegin(Timestamp targetTimeBegin)
73 {
74 targetTimeBegin_ = targetTimeBegin;
75 }
76
GetTargetTimeBegin() const77 Timestamp TimeSyncPacket::GetTargetTimeBegin() const
78 {
79 return targetTimeBegin_;
80 }
81
SetTargetTimeEnd(Timestamp targetTimeEnd)82 void TimeSyncPacket::SetTargetTimeEnd(Timestamp targetTimeEnd)
83 {
84 targetTimeEnd_ = targetTimeEnd;
85 }
86
GetTargetTimeEnd() const87 Timestamp TimeSyncPacket::GetTargetTimeEnd() const
88 {
89 return targetTimeEnd_;
90 }
91
SetVersion(uint32_t version)92 void TimeSyncPacket::SetVersion(uint32_t version)
93 {
94 version_ = version;
95 }
96
GetVersion() const97 uint32_t TimeSyncPacket::GetVersion() const
98 {
99 return version_;
100 }
101
GetRequestLocalOffset() const102 TimeOffset TimeSyncPacket::GetRequestLocalOffset() const
103 {
104 return requestLocalOffset_;
105 }
106
SetRequestLocalOffset(TimeOffset offset)107 void TimeSyncPacket::SetRequestLocalOffset(TimeOffset offset)
108 {
109 requestLocalOffset_ = offset;
110 }
111
GetResponseLocalOffset() const112 TimeOffset TimeSyncPacket::GetResponseLocalOffset() const
113 {
114 return responseLocalOffset_;
115 }
116
SetResponseLocalOffset(TimeOffset offset)117 void TimeSyncPacket::SetResponseLocalOffset(TimeOffset offset)
118 {
119 responseLocalOffset_ = offset;
120 }
121
CalculateLen()122 uint32_t TimeSyncPacket::CalculateLen()
123 {
124 uint32_t len = Parcel::GetUInt32Len(); // version_
125 len += Parcel::GetUInt64Len(); // sourceTimeBegin_
126 len += Parcel::GetUInt64Len(); // sourceTimeEnd_
127 len += Parcel::GetUInt64Len(); // targetTimeBegin_
128 len += Parcel::GetUInt64Len(); // targetTimeEnd_
129 len = Parcel::GetEightByteAlign(len);
130 len += Parcel::GetInt64Len(); // requestLocalOffset_
131 len += Parcel::GetInt64Len(); // responseLocalOffset_
132 return len;
133 }
134
135 // Class TimeSync
TimeSync()136 TimeSync::TimeSync()
137 : communicateHandle_(nullptr),
138 metadata_(nullptr),
139 timeHelper_(nullptr),
140 retryTime_(0),
141 driverTimerId_(0),
142 isSynced_(false),
143 isAckReceived_(false),
144 timeChangedListener_(nullptr),
145 timeDriverLockCount_(0),
146 isOnline_(true),
147 closed_(false)
148 {
149 }
150
~TimeSync()151 TimeSync::~TimeSync()
152 {
153 Finalize();
154 driverTimerId_ = 0;
155
156 if (timeChangedListener_ != nullptr) {
157 timeChangedListener_->Drop(true);
158 timeChangedListener_ = nullptr;
159 }
160 timeHelper_ = nullptr;
161 communicateHandle_ = nullptr;
162 metadata_ = nullptr;
163
164 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
165 timeSyncSet_.erase(this);
166 }
167
RegisterTransformFunc()168 int TimeSync::RegisterTransformFunc()
169 {
170 TransformFunc func;
171 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
172 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
173 return Serialization(buffer, length, inMsg);
174 };
175 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
176 return DeSerialization(buffer, length, inMsg);
177 };
178 return MessageTransform::RegTransformFunction(TIME_SYNC_MESSAGE, func);
179 }
180
Initialize(ICommunicator * communicator,const std::shared_ptr<Metadata> & metadata,const ISyncInterface * storage,const DeviceID & deviceId,const DeviceID & userId)181 int TimeSync::Initialize(ICommunicator *communicator, const std::shared_ptr<Metadata> &metadata,
182 const ISyncInterface *storage, const DeviceID &deviceId, const DeviceID &userId)
183 {
184 if ((communicator == nullptr) || (storage == nullptr) || (metadata == nullptr)) {
185 return -E_INVALID_ARGS;
186 }
187 {
188 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
189 timeSyncSet_.insert(this);
190 }
191 communicateHandle_ = communicator;
192 metadata_ = metadata;
193 deviceId_ = deviceId;
194 userId_ = userId;
195 timeHelper_ = std::make_unique<TimeHelper>();
196
197 int errCode = timeHelper_->Initialize(storage, metadata_);
198 if (errCode != E_OK) {
199 timeHelper_ = nullptr;
200 LOGE("[TimeSync] timeHelper Init failed, err %d.", errCode);
201 return errCode;
202 }
203 dbId_ = storage->GetIdentifier();
204 driverCallback_ = [this](TimerId timerId) { return TimeSyncDriver(timerId); };
205 errCode = RuntimeContext::GetInstance()->SetTimer(TIME_SYNC_INTERVAL, driverCallback_, nullptr, driverTimerId_);
206 if (errCode != E_OK) {
207 return errCode;
208 }
209 isSynced_ = metadata_->IsTimeSyncFinish(deviceId_, userId_);
210 return errCode;
211 }
212
Finalize()213 void TimeSync::Finalize()
214 {
215 // Stop the timer
216 LOGD("[TimeSync] Finalize enter!");
217 RuntimeContext *runtimeContext = RuntimeContext::GetInstance();
218 TimerId timerId;
219 {
220 std::unique_lock<std::mutex> lock(timeDriverLock_);
221 timerId = driverTimerId_;
222 }
223 runtimeContext->RemoveTimer(timerId, true);
224 std::unique_lock<std::mutex> lock(timeDriverLock_);
225 timeDriverCond_.wait(lock, [this]() { return this->timeDriverLockCount_ == 0; });
226 LOGD("[TimeSync] Finalized!");
227 }
228
SyncStart(const CommErrHandler & handler,uint32_t sessionId,bool isRetryTask)229 int TimeSync::SyncStart(const CommErrHandler &handler, uint32_t sessionId, bool isRetryTask)
230 {
231 isOnline_ = true;
232 TimeSyncPacket packet;
233 Timestamp startTime = timeHelper_->GetTime();
234 packet.SetSourceTimeBegin(startTime);
235 TimeOffset timeOffset = metadata_->GetLocalTimeOffset();
236 packet.SetRequestLocalOffset(timeOffset);
237 // send timeSync request
238 LOGD("[TimeSync] startTime = %" PRIu64 ", offset = % " PRId64 " , dev = %s{private}", startTime, timeOffset,
239 deviceId_.c_str());
240
241 Message *message = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
242 if (message == nullptr) {
243 return -E_OUT_OF_MEMORY;
244 }
245 message->SetSessionId(sessionId);
246 message->SetMessageType(TYPE_REQUEST);
247 message->SetPriority(Priority::NORMAL);
248 int errCode = message->SetCopiedObject<>(packet);
249 if (errCode != E_OK) {
250 delete message;
251 message = nullptr;
252 return errCode;
253 }
254 errCode = SendMessageWithSendEnd(message, handler, isRetryTask);
255 if (errCode != E_OK) {
256 delete message;
257 message = nullptr;
258 }
259 return errCode;
260 }
261
CalculateLen(const Message * inMsg)262 uint32_t TimeSync::CalculateLen(const Message *inMsg)
263 {
264 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
265 return 0;
266 }
267
268 const TimeSyncPacket *packet = const_cast<TimeSyncPacket *>(inMsg->GetObject<TimeSyncPacket>());
269 if (packet == nullptr) {
270 return 0;
271 }
272
273 return TimeSyncPacket::CalculateLen();
274 }
275
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)276 int TimeSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
277 {
278 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
279 return -E_INVALID_ARGS;
280 }
281 const TimeSyncPacket *packet = inMsg->GetObject<TimeSyncPacket>();
282 if ((packet == nullptr) || (length != TimeSyncPacket::CalculateLen())) {
283 return -E_INVALID_ARGS;
284 }
285
286 Parcel parcel(buffer, length);
287 Timestamp srcBegin = packet->GetSourceTimeBegin();
288 Timestamp srcEnd = packet->GetSourceTimeEnd();
289 Timestamp targetBegin = packet->GetTargetTimeBegin();
290 Timestamp targetEnd = packet->GetTargetTimeEnd();
291
292 int errCode = parcel.WriteUInt32(TIME_SYNC_VERSION_V1);
293 if (errCode != E_OK) {
294 return -E_SECUREC_ERROR;
295 }
296 errCode = parcel.WriteUInt64(srcBegin);
297 if (errCode != E_OK) {
298 return -E_SECUREC_ERROR;
299 }
300 errCode = parcel.WriteUInt64(srcEnd);
301 if (errCode != E_OK) {
302 return -E_SECUREC_ERROR;
303 }
304 errCode = parcel.WriteUInt64(targetBegin);
305 if (errCode != E_OK) {
306 return -E_SECUREC_ERROR;
307 }
308 errCode = parcel.WriteUInt64(targetEnd);
309 if (errCode != E_OK) {
310 return -E_SECUREC_ERROR;
311 }
312 parcel.EightByteAlign();
313 parcel.WriteInt64(packet->GetRequestLocalOffset());
314 parcel.WriteInt64(packet->GetResponseLocalOffset());
315 if (parcel.IsError()) {
316 return -E_PARSE_FAIL;
317 }
318 return errCode;
319 }
320
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)321 int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
322 {
323 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
324 return -E_INVALID_ARGS;
325 }
326 TimeSyncPacket packet;
327 Parcel parcel(const_cast<uint8_t *>(buffer), length);
328 Timestamp srcBegin;
329 Timestamp srcEnd;
330 Timestamp targetBegin;
331 Timestamp targetEnd;
332
333 uint32_t version = 0;
334 parcel.ReadUInt32(version);
335 if (parcel.IsError()) {
336 return -E_INVALID_ARGS;
337 }
338 if (version > TIME_SYNC_VERSION_V1) {
339 packet.SetVersion(version);
340 return inMsg->SetCopiedObject<>(packet);
341 }
342 parcel.ReadUInt64(srcBegin);
343 parcel.ReadUInt64(srcEnd);
344 parcel.ReadUInt64(targetBegin);
345 parcel.ReadUInt64(targetEnd);
346 if (parcel.IsError()) {
347 return -E_INVALID_ARGS;
348 }
349 packet.SetSourceTimeBegin(srcBegin);
350 packet.SetSourceTimeEnd(srcEnd);
351 packet.SetTargetTimeBegin(targetBegin);
352 packet.SetTargetTimeEnd(targetEnd);
353 parcel.EightByteAlign();
354 if (parcel.IsContinueRead()) {
355 TimeOffset requestLocalOffset;
356 TimeOffset responseLocalOffset;
357 parcel.ReadInt64(requestLocalOffset);
358 parcel.ReadInt64(responseLocalOffset);
359 if (parcel.IsError()) {
360 LOGE("[TimeSync] Parse packet failed, message type %u", inMsg->GetMessageType());
361 return -E_PARSE_FAIL;
362 }
363 packet.SetRequestLocalOffset(requestLocalOffset);
364 packet.SetResponseLocalOffset(responseLocalOffset);
365 }
366
367 return inMsg->SetCopiedObject<>(packet);
368 }
369
AckRecv(const Message * message,uint32_t targetSessionId)370 int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId)
371 {
372 // only check when sessionId is not 0, because old version timesync sessionId is 0.
373 if (message != nullptr && message->GetSessionId() != 0 && message->GetSessionId() == targetSessionId) {
374 if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) {
375 LOGE("[AbilitySync][AckMsgCheck] Remote db is closed");
376 return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND;
377 }
378 }
379 if (!IsPacketValid(message, TYPE_RESPONSE)) {
380 return -E_INVALID_ARGS;
381 }
382 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
383 if (packet == nullptr) {
384 LOGE("[TimeSync] AckRecv packet is null");
385 return -E_INVALID_ARGS;
386 }
387
388 TimeSyncPacket packetData = TimeSyncPacket(*packet);
389 Timestamp sourceTimeEnd = timeHelper_->GetTime();
390 packetData.SetSourceTimeBegin(GetSourceBeginTime(packetData.GetSourceTimeBegin(), targetSessionId));
391 packetData.SetSourceTimeEnd(sourceTimeEnd);
392 if (packetData.GetSourceTimeBegin() > packetData.GetSourceTimeEnd() ||
393 packetData.GetTargetTimeBegin() > packetData.GetTargetTimeEnd() ||
394 packetData.GetSourceTimeEnd() > TimeHelper::MAX_VALID_TIME ||
395 packetData.GetTargetTimeEnd() > TimeHelper::MAX_VALID_TIME) {
396 LOGD("[TimeSync][AckRecv] Time valid check failed.");
397 return -E_INVALID_TIME;
398 }
399 int errCode = SaveOffsetWithAck(packetData);
400 {
401 std::lock_guard<std::mutex> lock(cvLock_);
402 isAckReceived_ = true;
403 }
404 conditionVar_.notify_all();
405 ResetTimer();
406 return errCode;
407 }
408
RequestRecv(const Message * message)409 int TimeSync::RequestRecv(const Message *message)
410 {
411 if (!IsPacketValid(message, TYPE_REQUEST)) {
412 return -E_INVALID_ARGS;
413 }
414
415 const TimeSyncPacket *packet = message->GetObject<TimeSyncPacket>();
416 if (packet == nullptr) {
417 return -E_INVALID_ARGS;
418 }
419
420 // build timeSync ack packet
421 TimeSyncPacket ackPacket = BuildAckPacket(*packet);
422 if (ackPacket.GetSourceTimeBegin() > TimeHelper::MAX_VALID_TIME) {
423 LOGD("[TimeSync][RequestRecv] Time valid check failed.");
424 return -E_INVALID_TIME;
425 }
426 ReTimeSyncIfNeed(ackPacket);
427
428 Message *ackMessage = new (std::nothrow) Message(TIME_SYNC_MESSAGE);
429 if (ackMessage == nullptr) {
430 return -E_OUT_OF_MEMORY;
431 }
432 ackMessage->SetSessionId(message->GetSessionId());
433 ackMessage->SetPriority(Priority::NORMAL);
434 ackMessage->SetMessageType(TYPE_RESPONSE);
435 ackMessage->SetTarget(deviceId_);
436 int errCode = ackMessage->SetCopiedObject<>(ackPacket);
437 if (errCode != E_OK) {
438 delete ackMessage;
439 ackMessage = nullptr;
440 return errCode;
441 }
442
443 errCode = SendPacket(deviceId_, ackMessage);
444 if (errCode != E_OK) {
445 delete ackMessage;
446 ackMessage = nullptr;
447 }
448 return errCode;
449 }
450
SaveTimeOffset(const DeviceID & deviceID,const DeviceID & userId,TimeOffset timeOffset)451 int TimeSync::SaveTimeOffset(const DeviceID &deviceID, const DeviceID &userId, TimeOffset timeOffset)
452 {
453 return metadata_->SaveTimeOffset(deviceID, userId, timeOffset);
454 }
455
CalculateTimeOffset(const TimeSyncPacket & timeSyncInfo)456 std::pair<TimeOffset, TimeOffset> TimeSync::CalculateTimeOffset(const TimeSyncPacket &timeSyncInfo)
457 {
458 TimeOffset roundTrip = static_cast<TimeOffset>((timeSyncInfo.GetSourceTimeEnd() -
459 timeSyncInfo.GetSourceTimeBegin()) - (timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetTargetTimeBegin()));
460 TimeOffset offset1 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeBegin() -
461 timeSyncInfo.GetSourceTimeBegin()) - (roundTrip / TRIP_DIV_HALF);
462 TimeOffset offset2 = static_cast<TimeOffset>(timeSyncInfo.GetTargetTimeEnd() - timeSyncInfo.GetSourceTimeEnd()) +
463 (roundTrip / TRIP_DIV_HALF);
464 TimeOffset offset = (offset1 / TRIP_DIV_HALF) + (offset2 / TRIP_DIV_HALF);
465 LOGD("TimeSync::CalculateTimeOffset roundTrip= %" PRId64 ", offset1 = %" PRId64 ", offset2 = %" PRId64
466 ", offset = %" PRId64, roundTrip, offset1, offset2, offset);
467 return {offset, roundTrip};
468 }
469
IsPacketValid(const Message * inMsg,uint16_t messageType)470 bool TimeSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
471 {
472 if (inMsg == nullptr) {
473 return false;
474 }
475 if (inMsg->GetMessageId() != TIME_SYNC_MESSAGE) {
476 LOGD("message Id = %d", inMsg->GetMessageId());
477 return false;
478 }
479 if (messageType != inMsg->GetMessageType()) {
480 LOGD("input Type = %" PRIu16 ", inMsg type = %" PRIu16, messageType, inMsg->GetMessageType());
481 return false;
482 }
483 return true;
484 }
485
SendPacket(const DeviceID & deviceId,const Message * message,const CommErrHandler & handler,bool isRetryTask)486 int TimeSync::SendPacket(const DeviceID &deviceId, const Message *message, const CommErrHandler &handler,
487 bool isRetryTask)
488 {
489 SendConfig conf;
490 timeHelper_->SetSendConfig(deviceId, false, SEND_TIME_OUT, conf);
491 conf.isRetryTask = isRetryTask;
492 int errCode = communicateHandle_->SendMessage(deviceId, message, conf, handler);
493 if (errCode != E_OK) {
494 LOGE("[TimeSync] SendPacket failed, err %d", errCode);
495 }
496 return errCode;
497 }
498
TimeSyncDriver(TimerId timerId)499 int TimeSync::TimeSyncDriver(TimerId timerId)
500 {
501 if (timerId != driverTimerId_) {
502 return -E_INTERNAL_ERROR;
503 }
504 if (!isOnline_) {
505 return E_OK;
506 }
507 std::lock_guard<std::mutex> lock(timeDriverLock_);
508 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
509 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
510 (void)this->SyncStart(handler);
511 std::lock_guard<std::mutex> innerLock(this->timeDriverLock_);
512 this->timeDriverLockCount_--;
513 this->timeDriverCond_.notify_all();
514 });
515 if (errCode != E_OK) {
516 LOGE("[TimeSync][TimerSyncDriver] ScheduleTask failed err %d", errCode);
517 return errCode;
518 }
519 timeDriverLockCount_++;
520 return E_OK;
521 }
522
GetTimeOffset(TimeOffset & outOffset,uint32_t timeout,uint32_t sessionId)523 int TimeSync::GetTimeOffset(TimeOffset &outOffset, uint32_t timeout, uint32_t sessionId)
524 {
525 if (!isSynced_) {
526 {
527 std::lock_guard<std::mutex> lock(cvLock_);
528 isAckReceived_ = false;
529 }
530 CommErrHandler handler = [this](int ret, bool isDirectEnd) { CommErrHandlerFunc(ret, this); };
531 int errCode = SyncStart(handler, sessionId);
532 LOGD("TimeSync::GetTimeOffset start, current time = %" PRIu64 ", errCode = %d, timeout = %" PRIu32 " ms",
533 TimeHelper::GetSysCurrentTime(), errCode, timeout);
534 std::unique_lock<std::mutex> lock(cvLock_);
535 if (errCode != E_OK || !conditionVar_.wait_for(lock, std::chrono::milliseconds(timeout), [this]() {
536 return this->isAckReceived_ || this->closed_;
537 })) {
538 LOGD("TimeSync::GetTimeOffset, retryTime_ = %d", retryTime_);
539 retryTime_++;
540 if (retryTime_ < MAX_RETRY_TIME) {
541 lock.unlock();
542 LOGI("TimeSync::GetTimeOffset timeout, try again");
543 return GetTimeOffset(outOffset, timeout);
544 }
545 retryTime_ = 0;
546 return -E_TIMEOUT;
547 }
548 }
549 if (IsClosed()) {
550 return -E_BUSY;
551 }
552 retryTime_ = 0;
553 metadata_->GetTimeOffset(deviceId_, userId_, outOffset);
554 return E_OK;
555 }
556
IsNeedSync() const557 bool TimeSync::IsNeedSync() const
558 {
559 return !isSynced_;
560 }
561
SetOnline(bool isOnline)562 void TimeSync::SetOnline(bool isOnline)
563 {
564 isOnline_ = isOnline;
565 }
566
CommErrHandlerFunc(int errCode,TimeSync * timeSync)567 void TimeSync::CommErrHandlerFunc(int errCode, TimeSync *timeSync)
568 {
569 LOGD("[TimeSync][CommErrHandle] errCode:%d", errCode);
570 std::lock_guard<std::mutex> lock(timeSyncSetLock_);
571 if (timeSyncSet_.count(timeSync) == 0) {
572 LOGI("[TimeSync][CommErrHandle] timeSync has been killed");
573 return;
574 }
575 if (timeSync == nullptr) {
576 LOGI("[TimeSync][CommErrHandle] timeSync is nullptr");
577 return;
578 }
579 if (errCode != E_OK) {
580 timeSync->SetOnline(false);
581 } else {
582 timeSync->SetOnline(true);
583 }
584 }
585
ResetTimer()586 void TimeSync::ResetTimer()
587 {
588 TimerId timerId;
589 {
590 std::lock_guard<std::mutex> lock(timeDriverLock_);
591 timerId = driverTimerId_;
592 driverTimerId_ = 0u;
593 }
594 if (timerId == 0u) {
595 return;
596 }
597 RuntimeContext::GetInstance()->RemoveTimer(timerId, true);
598 int errCode = RuntimeContext::GetInstance()->SetTimer(
599 TIME_SYNC_INTERVAL, driverCallback_, nullptr, timerId);
600 if (errCode != E_OK) {
601 LOGW("[TimeSync] Reset TimeSync timer failed err :%d", errCode);
602 } else {
603 std::lock_guard<std::mutex> lock(timeDriverLock_);
604 driverTimerId_ = timerId;
605 }
606 }
607
Close()608 void TimeSync::Close()
609 {
610 Finalize();
611 {
612 std::lock_guard<std::mutex> lock(cvLock_);
613 closed_ = true;
614 }
615 conditionVar_.notify_all();
616 }
617
IsClosed() const618 bool TimeSync::IsClosed() const
619 {
620 std::lock_guard<std::mutex> lock(cvLock_);
621 return closed_ ;
622 }
623
SendMessageWithSendEnd(const Message * message,const CommErrHandler & handler,bool isRetryTask)624 int TimeSync::SendMessageWithSendEnd(const Message *message, const CommErrHandler &handler, bool isRetryTask)
625 {
626 std::shared_ptr<TimeSync> timeSyncPtr = shared_from_this();
627 auto sessionId = message->GetSessionId();
628 return SendPacket(deviceId_, message, [handler, timeSyncPtr, sessionId, this](int errCode, bool isDirectEnd) {
629 if (closed_) {
630 LOGW("[TimeSync] DB closed, ignore send end! dev=%.3s", deviceId_.c_str());
631 return;
632 }
633 {
634 std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
635 sessionBeginTime_.clear();
636 sessionBeginTime_[sessionId] = timeHelper_->GetTime();
637 }
638 if (handler != nullptr) {
639 handler(errCode, isDirectEnd);
640 }
641 }, isRetryTask);
642 }
643
GetSourceBeginTime(Timestamp packetBeginTime,uint32_t sessionId)644 Timestamp TimeSync::GetSourceBeginTime(Timestamp packetBeginTime, uint32_t sessionId)
645 {
646 std::lock_guard<std::mutex> autoLock(beginTimeMutex_);
647 if (sessionBeginTime_.find(sessionId) == sessionBeginTime_.end()) {
648 LOGW("[TimeSync] Current cache not exist packet send time");
649 return packetBeginTime;
650 }
651 auto sendTime = sessionBeginTime_[sessionId];
652 LOGD("[TimeSync] Use packet send time %" PRIu64 " rather than %" PRIu64, sendTime, packetBeginTime);
653 return sendTime;
654 }
655
BuildAckPacket(const TimeSyncPacket & request)656 TimeSyncPacket TimeSync::BuildAckPacket(const TimeSyncPacket &request)
657 {
658 TimeSyncPacket ackPacket = TimeSyncPacket(request);
659 Timestamp targetTimeBegin = timeHelper_->GetTime();
660 ackPacket.SetTargetTimeBegin(targetTimeBegin);
661 Timestamp targetTimeEnd = timeHelper_->GetTime();
662 ackPacket.SetTargetTimeEnd(targetTimeEnd);
663 TimeOffset requestOffset = request.GetRequestLocalOffset();
664 TimeOffset responseOffset = metadata_->GetLocalTimeOffset();
665 ackPacket.SetRequestLocalOffset(requestOffset);
666 ackPacket.SetResponseLocalOffset(responseOffset);
667 LOGD("TimeSync::RequestRecv, dev = %s{private}, sTimeEnd = %" PRIu64 ", tTimeEnd = %" PRIu64 ", sbegin = %" PRIu64
668 ", tbegin = %" PRIu64 ", request offset = %" PRId64 ", response offset = %" PRId64, deviceId_.c_str(),
669 ackPacket.GetSourceTimeEnd(), ackPacket.GetTargetTimeEnd(), ackPacket.GetSourceTimeBegin(),
670 ackPacket.GetTargetTimeBegin(), requestOffset, responseOffset);
671 return ackPacket;
672 }
673
IsRemoteLowVersion(uint32_t checkVersion)674 bool TimeSync::IsRemoteLowVersion(uint32_t checkVersion)
675 {
676 uint16_t version = 0;
677 int errCode = communicateHandle_->GetRemoteCommunicatorVersion(deviceId_, version);
678 return errCode == -E_NOT_FOUND || (version < checkVersion - SOFTWARE_VERSION_EARLIEST);
679 }
680
ReTimeSyncIfNeed(const TimeSyncPacket & ackPacket)681 void TimeSync::ReTimeSyncIfNeed(const TimeSyncPacket &ackPacket)
682 {
683 TimeOffset timeOffsetIgnoreRtt =
684 static_cast<TimeOffset>(ackPacket.GetSourceTimeBegin() - ackPacket.GetTargetTimeBegin());
685 bool reTimeSync = false;
686 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
687 reTimeSync = CheckReTimeSyncIfNeedWithLowVersion(timeOffsetIgnoreRtt);
688 } else {
689 reTimeSync = CheckReTimeSyncIfNeedWithHighVersion(timeOffsetIgnoreRtt, ackPacket);
690 }
691
692 if ((std::abs(timeOffsetIgnoreRtt) >= INT64_MAX / 2) || reTimeSync) { // 2 is half of INT64_MAX
693 LOGI("[TimeSync][RequestRecv] timeOffSet invalid, should do time sync");
694 SetTimeSyncFinishInner(false);
695 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
696 }
697
698 // reset time change by time sync
699 int errCode = metadata_->SetTimeChangeMark(deviceId_, userId_, false);
700 if (errCode != E_OK) {
701 LOGW("[TimeSync] Mark dev %.3s_%.3s no time change failed %d", deviceId_.c_str(), userId_.c_str(), errCode);
702 }
703 }
704
CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)705 bool TimeSync::CheckReTimeSyncIfNeedWithLowVersion(TimeOffset timeOffsetIgnoreRtt)
706 {
707 TimeOffset metadataTimeOffset;
708 metadata_->GetTimeOffset(deviceId_, userId_, metadataTimeOffset);
709 return (std::abs(metadataTimeOffset) >= INT64_MAX / 2) || // 2 is half of INT64_MAX
710 (std::abs(metadataTimeOffset - timeOffsetIgnoreRtt) > MAX_TIME_OFFSET_NOISE);
711 }
712
CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt,const TimeSyncPacket & ackPacket)713 bool TimeSync::CheckReTimeSyncIfNeedWithHighVersion(TimeOffset timeOffsetIgnoreRtt, const TimeSyncPacket &ackPacket)
714 {
715 TimeOffset rawTimeOffset = timeOffsetIgnoreRtt - ackPacket.GetRequestLocalOffset() +
716 ackPacket.GetResponseLocalOffset();
717 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
718 return errCode == -E_NOT_FOUND || (std::abs(info.systemTimeOffset - rawTimeOffset) > MAX_TIME_OFFSET_NOISE);
719 }
720
SaveOffsetWithAck(const TimeSyncPacket & ackPacket)721 int TimeSync::SaveOffsetWithAck(const TimeSyncPacket &ackPacket)
722 {
723 // calculate timeoffset of two devices
724 auto [offset, rtt] = CalculateTimeOffset(ackPacket);
725 TimeOffset rawOffset = CalculateRawTimeOffset(ackPacket, offset);
726 LOGD("TimeSync::AckRecv, dev = %s{private}_%s{private}, sEnd = %" PRIu64 ", tEnd = %" PRIu64 ", sBegin = %" PRIu64
727 ", tBegin = %" PRIu64 ", offset = %" PRId64 ", rawOffset = %" PRId64 ", requestLocalOffset = %" PRId64
728 ", responseLocalOffset = %" PRId64,
729 deviceId_.c_str(),
730 userId_.c_str(),
731 ackPacket.GetSourceTimeEnd(),
732 ackPacket.GetTargetTimeEnd(),
733 ackPacket.GetSourceTimeBegin(),
734 ackPacket.GetTargetTimeBegin(),
735 offset,
736 rawOffset,
737 ackPacket.GetRequestLocalOffset(),
738 ackPacket.GetResponseLocalOffset());
739
740 // save timeoffset into metadata, maybe a block action
741 int errCode = SaveTimeOffset(deviceId_, userId_, offset);
742 if (errCode != E_OK) {
743 return errCode;
744 }
745 errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, rawOffset);
746 if (errCode != E_OK) {
747 return errCode;
748 }
749 DeviceTimeInfo info;
750 info.systemTimeOffset = rawOffset;
751 info.recordTime = timeHelper_->GetSysCurrentTime();
752 info.rtt = rtt;
753 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
754 SetTimeSyncFinishInner(true);
755 // save finish next time after save failed
756 return E_OK;
757 }
758
CalculateRawTimeOffset(const TimeSyncPacket & timeSyncInfo,TimeOffset deltaTime)759 TimeOffset TimeSync::CalculateRawTimeOffset(const TimeSyncPacket &timeSyncInfo, TimeOffset deltaTime)
760 {
761 // deltaTime = (t1' + response - t1 - request + t2' + response - t2 - request)/2
762 // rawTimeOffset = request - response + (t1' - t1 + t2' - t2)/2
763 // rawTimeOffset = deltaTime + requestLocalOffset - responseLocalOffset
764 return deltaTime + timeSyncInfo.GetRequestLocalOffset() - timeSyncInfo.GetResponseLocalOffset();
765 }
766
CheckSkipTimeSync(const DeviceTimeInfo & info)767 bool TimeSync::CheckSkipTimeSync(const DeviceTimeInfo &info)
768 {
769 uint64_t currentRawTime = timeHelper_->GetSysCurrentTime();
770 if (currentRawTime < info.recordTime) {
771 LOGW("[TimeSync] current time %" PRIu64 " less than record time %" PRIu64, currentRawTime, info.recordTime);
772 return false;
773 }
774 uint64_t interval = timeHelper_->GetSysCurrentTime() - info.recordTime;
775 if (info.rtt < MAX_TIME_RTT_NOISE) {
776 return true;
777 }
778 if (interval > RTT_NOISE_CHECK_INTERVAL) {
779 LOGI("[TimeSync] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64, info.rtt,
780 interval);
781 return false;
782 }
783 #ifdef DE_DEBUG_ENV
784 #ifdef TEST_RTT_NOISE_CHECK_INTERVAL
785 if (interval > TEST_RTT_NOISE_CHECK_INTERVAL) {
786 LOGI("[TimeSync][TEST] rtt %" PRId64 " is greater than noise should re time sync, interval is %" PRIu64,
787 info.rtt, interval);
788 return false;
789 }
790 #endif
791 #endif
792 return true;
793 }
794
SetTimeSyncFinishIfNeed()795 void TimeSync::SetTimeSyncFinishIfNeed()
796 {
797 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
798 if (errCode != E_OK) {
799 return;
800 }
801 int64_t systemTimeOffset = metadata_->GetSystemTimeOffset(deviceId_, userId_);
802 LOGD("[TimeSync] Check db offset %" PRId64 " cache offset %" PRId64, systemTimeOffset, info.systemTimeOffset);
803 if (!CheckSkipTimeSync(info) || (IsNeedSync() &&
804 std::abs(systemTimeOffset - info.systemTimeOffset) >= MAX_TIME_OFFSET_NOISE)) {
805 SetTimeSyncFinishInner(false);
806 return;
807 }
808 if (IsNeedSync()) {
809 SetTimeSyncFinishInner(true);
810 }
811 if (systemTimeOffset != info.systemTimeOffset) {
812 errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, info.systemTimeOffset);
813 if (errCode != E_OK) {
814 return;
815 }
816 }
817 LOGI("[TimeSync] Mark time sync finish success");
818 }
819
ClearTimeSyncFinish()820 void TimeSync::ClearTimeSyncFinish()
821 {
822 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
823 SetTimeSyncFinishInner(false);
824 }
825
GenerateTimeOffsetIfNeed(TimeOffset systemOffset,TimeOffset senderLocalOffset)826 int TimeSync::GenerateTimeOffsetIfNeed(TimeOffset systemOffset, TimeOffset senderLocalOffset)
827 {
828 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
829 return E_OK;
830 }
831 auto [errCode, info] = RuntimeContext::GetInstance()->GetDeviceTimeInfo(deviceId_);
832 bool syncFinish = !IsNeedSync();
833 bool timeChange = metadata_->IsTimeChange(deviceId_, userId_);
834 // avoid local time change but remote record time sync finish
835 // should return re time sync, after receive time sync request, reset time change mark
836 // we think offset is ok when local time sync to remote
837 if ((timeChange && !syncFinish) ||
838 (errCode == E_OK && (std::abs(info.systemTimeOffset + systemOffset) > MAX_TIME_OFFSET_NOISE))) {
839 LOGI("[TimeSync] time offset is invalid should do time sync again! packet %" PRId64 " cache %" PRId64
840 " time change %d sync finish %d", -systemOffset, info.systemTimeOffset, static_cast<int>(timeChange),
841 static_cast<int>(syncFinish));
842 ClearTimeSyncFinish();
843 RuntimeContext::GetInstance()->ClearDeviceTimeInfo(deviceId_);
844 return -E_NEED_TIME_SYNC;
845 }
846 // Sender's systemOffset = Sender's deltaTime + requestLocalOffset - responseLocalOffset
847 // Sender's deltaTime = Sender's systemOffset - requestLocalOffset + responseLocalOffset
848 // Receiver's deltaTime = -Sender's deltaTime = -Sender's systemOffset + requestLocalOffset - responseLocalOffset
849 TimeOffset offset = -systemOffset + senderLocalOffset - metadata_->GetLocalTimeOffset();
850 errCode = metadata_->SetSystemTimeOffset(deviceId_, userId_, -systemOffset);
851 if (errCode != E_OK) {
852 return errCode;
853 }
854 errCode = metadata_->SaveTimeOffset(deviceId_, userId_, offset);
855 if (errCode != E_OK) {
856 return errCode;
857 }
858 SetTimeSyncFinishInner(true);
859 info.systemTimeOffset = -systemOffset;
860 info.recordTime = timeHelper_->GetSysCurrentTime();
861 RuntimeContext::GetInstance()->SetDeviceTimeInfo(deviceId_, info);
862 return E_OK;
863 }
864
SetTimeSyncFinishInner(bool finish)865 void TimeSync::SetTimeSyncFinishInner(bool finish)
866 {
867 isSynced_ = finish;
868 if (IsRemoteLowVersion(SOFTWARE_VERSION_RELEASE_9_0)) {
869 return;
870 }
871 int errCode = metadata_->SetTimeSyncFinishMark(deviceId_, userId_, finish);
872 if (errCode != E_OK) {
873 LOGW("[TimeSync] Set %.3s_%.3s time sync finish %d mark failed %d", deviceId_.c_str(), userId_.c_str(),
874 static_cast<int>(finish), errCode);
875 }
876 }
877 } // namespace DistributedDB