1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 }
44
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)45 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
46 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
47 {
48 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
49 return -E_INVALID_ARGS;
50 }
51 storage_ = static_cast<SyncGenericInterface *>(inStorage);
52 communicateHandle_ = inCommunicateHandle;
53 metadata_ = inMetadata;
54 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
55 std::vector<uint8_t> label = inStorage->GetIdentifier();
56 label.resize(3); // only show 3 Bytes enough
57 label_ = DBCommon::VectorToHexString(label);
58 deviceId_ = deviceId;
59 msgSchedule_.Initialize(label_, deviceId_);
60 return E_OK;
61 }
62
SyncStart(int mode,SingleVerSyncTaskContext * context)63 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
64 {
65 std::lock_guard<std::mutex> lock(lock_);
66 int errCode = CheckPermitSendData(mode, context);
67 if (errCode != E_OK) {
68 return errCode;
69 }
70 if (sessionId_ != 0) { // auto sync timeout resend
71 return ReSendData(context);
72 }
73 ResetSyncStatus(mode, context);
74 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
75 int tmpMode = SyncOperation::TransferSyncMode(mode);
76 if (tmpMode == SyncModeType::PUSH) {
77 errCode = PushStart(context);
78 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79 errCode = PushPullStart(context);
80 } else if (tmpMode == SyncModeType::PULL) {
81 errCode = PullRequestStart(context);
82 } else {
83 SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
84 errCode = PullResponseStart(context);
85 }
86 if (context->IsSkipTimeoutError(errCode)) {
87 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
88 // just return to avoid higher pressure for send.
89 return E_OK;
90 }
91 if (errCode != E_OK) {
92 LOGE("[DataSync] SendStart errCode=%d", errCode);
93 return errCode;
94 }
95 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
96 LOGE("wait for recv finished for push and pull mode");
97 return -E_EKEYREVOKED;
98 }
99 return InnerSyncStart(context);
100 }
101
InnerSyncStart(SingleVerSyncTaskContext * context)102 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
103 {
104 int errCode = CheckPermitSendData(mode_, context);
105 if (errCode != E_OK) {
106 return errCode;
107 }
108 while (true) {
109 if (windowSize_ <= 0 || isAllDataHasSent_) {
110 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
111 label_.c_str(), STR_MASK(deviceId_));
112 return E_OK;
113 }
114 int mode = SyncOperation::TransferSyncMode(mode_);
115 if (mode == SyncModeType::PULL) {
116 LOGE("[DataSync] unexpected error");
117 return -E_INVALID_ARGS;
118 }
119 context->IncSequenceId();
120 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
121 errCode = PushStart(context);
122 } else {
123 errCode = PullResponseStart(context);
124 }
125 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
126 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
127 isAllDataHasSent_ = true;
128 return -E_EKEYREVOKED;
129 }
130 if (context->IsSkipTimeoutError(errCode)) {
131 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
132 // just return to avoid higher pressure for send.
133 return E_OK;
134 }
135 if (errCode != E_OK) {
136 LOGE("[DataSync] InnerSend errCode=%d", errCode);
137 return errCode;
138 }
139 }
140 return E_OK;
141 }
142
InnerClearSyncStatus()143 void SingleVerDataSync::InnerClearSyncStatus()
144 {
145 sessionId_ = 0;
146 reSendMap_.clear();
147 windowSize_ = 0;
148 maxSequenceIdHasSent_ = 0;
149 isAllDataHasSent_ = false;
150 }
151
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)152 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
153 {
154 if (message == nullptr) {
155 LOGE("[DataSync] AckRecv message nullptr");
156 return -E_INVALID_ARGS;
157 }
158 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
159 if (packet == nullptr) {
160 return -E_INVALID_ARGS;
161 }
162 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
163 uint32_t sessionId = message->GetSessionId();
164 uint32_t sequenceId = message->GetSequenceId();
165
166 std::lock_guard<std::mutex> lock(lock_);
167 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
168 windowSize_, label_.c_str(), STR_MASK(deviceId_));
169 if (sessionId != sessionId_) {
170 LOGI("[DataSync] ignore ack,sessionId is different");
171 return E_OK;
172 }
173 Timestamp lastQueryTime = 0;
174 if (reSendMap_.count(sequenceId) != 0) {
175 lastQueryTime = reSendMap_[sequenceId].end;
176 reSendMap_.erase(sequenceId);
177 windowSize_++;
178 } else {
179 LOGI("[DataSync] ack seqId not in map");
180 return E_OK;
181 }
182 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
183 Timestamp dbLastQueryTime = 0;
184 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
185 context->GetTargetUserId(), dbLastQueryTime);
186 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
187 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
188 context->GetTargetUserId(), lastQueryTime);
189 }
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 }
194 if (!isAllDataHasSent_) {
195 return InnerSyncStart(context);
196 } else if (reSendMap_.empty()) {
197 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198 InnerClearSyncStatus();
199 return -E_FINISHED;
200 }
201 return E_OK;
202 }
203
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206 std::lock_guard<std::mutex> lock(lock_);
207 InnerClearSyncStatus();
208 }
209
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212 if (reSendMap_.empty()) {
213 LOGI("[DataSync] ReSend map empty");
214 return -E_INTERNAL_ERROR;
215 }
216 uint32_t sequenceId = reSendMap_.begin()->first;
217 ReSendInfo reSendInfo = reSendMap_.begin()->second;
218 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221 windowSize_, label_.c_str(), STR_MASK(deviceId_));
222 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224 return ReSend(context, dataReSendInfo);
225 }
226
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229 std::string deviceInfo;
230 if (communicateHandle_ != nullptr) {
231 communicateHandle_->GetLocalIdentity(deviceInfo);
232 }
233 return deviceInfo;
234 }
235
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237 uint32_t packetLen)
238 {
239 bool startFeedDogRet = false;
240 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244 }
245 SendConfig sendConfig;
246 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247 sendConfig.isRetryTask = context->IsRetryTask();
248 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
249 if (errCode != E_OK) {
250 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
251 if (startFeedDogRet) {
252 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
253 }
254 }
255 return errCode;
256 }
257
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)258 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
259 std::vector<SendDataItem> &outData, size_t packetSize)
260 {
261 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
262 if (performance != nullptr) {
263 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
264 }
265 // start a watch dog before get data
266 // it will send ack util get data finished
267 context->StartFeedDogForGetData(context->GetResponseSessionId());
268 int errCode = GetData(context, packetSize, outData);
269 context->StopFeedDogForGetData();
270 if (performance != nullptr) {
271 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
272 }
273 if (!outData.empty()) {
274 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
275 }
276 return errCode;
277 }
278
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)279 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
280 {
281 int errCode;
282 UpdateMtuSize();
283 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
284 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
285 LOGI("[DataSync][GetData] resend data");
286 errCode = GetUnsyncData(context, outData, packetSize);
287 } else {
288 ContinueToken token;
289 context->GetContinueToken(token);
290 if (token == nullptr) {
291 errCode = GetUnsyncData(context, outData, packetSize);
292 } else {
293 LOGD("[DataSync][GetData] get data from token");
294 // if there is data to send, read out data, and update local watermark, send data
295 errCode = GetNextUnsyncData(context, outData, packetSize);
296 }
297 }
298 if (errCode == -E_UNFINISHED) {
299 LOGD("[DataSync][GetData] not finished.");
300 }
301 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
303 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
304 }
305 return errCode;
306 }
307
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)308 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
309 {
310 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
311 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
312 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
313 bool needCompressOnSync = false;
314 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
315 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
316 int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
317 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
318 context->SetTaskErrCode(errCode);
319 return errCode;
320 }
321
322 int innerCode = InterceptData(syncOutData);
323 if (innerCode != E_OK) {
324 context->SetTaskErrCode(innerCode);
325 return innerCode;
326 }
327
328 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
329 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
330 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
331 { remoteAlgo, version });
332 if (compressCode != E_OK) {
333 return compressCode;
334 }
335 }
336 return errCode;
337 }
338
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)339 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
340 size_t packetSize)
341 {
342 SyncTimeRange waterRange;
343 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
344 WaterMark startMark = 0;
345 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
346 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
347 if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
348 return E_OK;
349 }
350 waterRange.beginTime = startMark;
351 if (curType == SyncType::QUERY_SYNC_TYPE) {
352 WaterMark deletedStartMark = 0;
353 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
354 Timestamp lastQueryTimestamp = 0;
355 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
356 context->GetTargetUserId(), lastQueryTimestamp);
357 if (errCode != E_OK) {
358 return errCode;
359 }
360 waterRange.deleteBeginTime = deletedStartMark;
361 waterRange.lastQueryTime = lastQueryTimestamp;
362 }
363 return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
364 }
365
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)366 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
367 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
368 {
369 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
370 ContinueToken token = nullptr;
371 context->GetContinueToken(token);
372 if (token != nullptr) {
373 storage_->ReleaseContinueToken(token);
374 }
375 int errCode;
376 if (curType != SyncType::QUERY_SYNC_TYPE) {
377 errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
378 syncDataSizeInfo);
379 } else {
380 QuerySyncObject queryObj = context->GetQuery();
381 errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
382 }
383 context->SetContinueToken(token);
384 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
385 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
386 }
387 return errCode;
388 }
389
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)390 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
391 size_t packetSize)
392 {
393 ContinueToken token;
394 context->GetContinueToken(token);
395 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
396 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
397 context->SetContinueToken(token);
398 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
399 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
400 }
401 return errCode;
402 }
403
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)404 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
405 SyncType curType, const QuerySyncObject &query)
406 {
407 if (inData.empty()) {
408 return E_OK;
409 }
410 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
411 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
412 if (performance != nullptr) {
413 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
414 }
415 const auto localDeviceName = GetLocalDeviceName();
416 const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
417 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
418 std::vector<SendDataItem> copyData = inData;
419 int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
420 if (errCode != E_OK) {
421 LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
422 return errCode;
423 }
424 // query only support prefix key and don't have query in packet in 104 version
425 errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
426 if (performance != nullptr) {
427 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428 }
429 if (errCode != E_OK) {
430 LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
431 }
432 return errCode;
433 }
434
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437 mode_ = inMode;
438 maxSequenceIdHasSent_ = 0;
439 isAllDataHasSent_ = false;
440 context->ReSetSequenceId();
441 reSendMap_.clear();
442 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443 windowSize_ = LOW_VERSION_WINDOW_SIZE;
444 } else {
445 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446 }
447 int mode = SyncOperation::TransferSyncMode(inMode);
448 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449 sessionId_ = context->GetRequestSessionId();
450 } else {
451 sessionId_ = context->GetResponseSessionId();
452 }
453 }
454
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458 WaterMark localMark = 0;
459 WaterMark deleteMark = 0;
460 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461 GetLocalDeleteSyncWaterMark(context, deleteMark);
462 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468 WaterMark localMark = 0;
469 int errCode = E_OK;
470 const std::string &deviceId = context->GetDeviceId();
471 std::string queryId = context->GetQuerySyncId();
472 if (syncType != SyncType::QUERY_SYNC_TYPE) {
473 if (isCheckBeforUpdate) {
474 GetLocalWaterMark(syncType, queryId, context, localMark);
475 if (localMark >= dataTimeRange.endTime) {
476 return E_OK;
477 }
478 }
479 errCode = metadata_->SaveLocalWaterMark(deviceId, context->GetTargetUserId(), dataTimeRange.endTime);
480 } else {
481 bool isNeedUpdateMark = true;
482 bool isNeedUpdateDeleteMark = true;
483 if (isCheckBeforUpdate) {
484 WaterMark deleteDataWaterMark = 0;
485 GetLocalWaterMark(syncType, queryId, context, localMark);
486 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487 if (localMark >= dataTimeRange.endTime) {
488 isNeedUpdateMark = false;
489 }
490 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491 isNeedUpdateDeleteMark = false;
492 }
493 }
494 if (isNeedUpdateMark) {
495 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, context->GetTargetUserId(),
497 dataTimeRange.endTime);
498 if (errCode != E_OK) {
499 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
500 return errCode;
501 }
502 }
503 if (isNeedUpdateDeleteMark) {
504 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
505 dataTimeRange.deleteEndTime);
506 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(),
507 dataTimeRange.deleteEndTime);
508 }
509 }
510 if (errCode != E_OK) {
511 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
512 }
513 return errCode;
514 }
515
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,const DeviceID & userId,WaterMark & waterMark) const516 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
517 const DeviceID &deviceId, const DeviceID &userId, WaterMark &waterMark) const
518 {
519 if (syncType != SyncType::QUERY_SYNC_TYPE) {
520 metadata_->GetPeerWaterMark(deviceId, userId, waterMark);
521 return;
522 }
523 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, userId, waterMark);
524 }
525
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,const DeviceID & userId,WaterMark & waterMark)526 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, const DeviceID &userId,
527 WaterMark &waterMark)
528 {
529 metadata_->GetRecvDeleteSyncWaterMark(deviceId, userId, waterMark);
530 }
531
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const532 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
533 WaterMark &waterMark) const
534 {
535 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), waterMark,
536 context->IsAutoLiftWaterMark());
537 }
538
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const539 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
540 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
541 {
542 if (syncType != SyncType::QUERY_SYNC_TYPE) {
543 metadata_->GetLocalWaterMark(context->GetDeviceId(), context->GetTargetUserId(), waterMark);
544 return;
545 }
546 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(), context->GetTargetUserId(),
547 waterMark, context->IsAutoLiftWaterMark());
548 }
549
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)550 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
551 WaterMark maxSendDataTime)
552 {
553 bool isNeedClearRemoteData = false;
554 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
555 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
556 uint64_t clearDeviceDataMark = 0;
557 metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearDeviceDataMark);
558 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
559 } else {
560 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
561 if (packet == nullptr) {
562 LOGE("[RemoveDeviceDataHandle] get packet object failed");
563 return -E_INVALID_ARGS;
564 }
565 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
566 WaterMark packetLocalMark = packet->GetLocalWaterMark();
567 WaterMark peerMark = 0;
568 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), context->GetTargetUserId(),
569 peerMark);
570 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
571 }
572 if (!isNeedClearRemoteData) {
573 return E_OK;
574 }
575 int errCode = E_OK;
576 if (context->IsNeedClearRemoteStaleData()) {
577 // need to clear remote device history data
578 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
579 if (errCode != E_OK) {
580 (void)SendDataAck(context, message, errCode, maxSendDataTime);
581 return errCode;
582 }
583 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
584 // avoid repeat clear in ack
585 metadata_->SaveLocalWaterMark(context->GetDeviceId(), context->GetTargetUserId(), 0);
586 }
587 }
588 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
589 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
590 if (errCode != E_OK) {
591 (void)SendDataAck(context, message, errCode, maxSendDataTime);
592 return errCode;
593 }
594 }
595 return E_OK;
596 }
597
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)598 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
599 const std::vector<uint64_t> &reserved)
600 {
601 bool isNeedClearRemoteData = false;
602 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
603 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
604 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
605 uint64_t clearDeviceDataMark = 0;
606 metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearDeviceDataMark);
607 isNeedClearRemoteData = (clearDeviceDataMark != 0);
608 } else if (reserved.empty()) {
609 WaterMark localMark = 0;
610 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
611 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
612 } else {
613 WaterMark peerMark = 0;
614 GetPeerWaterMark(curType, context->GetQuerySyncId(),
615 context->GetDeviceId(), context->GetTargetUserId(), peerMark);
616 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
617 }
618 if (!isNeedClearRemoteData) {
619 return E_OK;
620 }
621 // need to clear remote historydata
622 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
623 label_.c_str(), STR_MASK(GetDeviceId()));
624 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
625 if (errCode != E_OK) {
626 return errCode;
627 }
628 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
629 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
630 }
631 return errCode;
632 }
633
SetSessionEndTimestamp(Timestamp end)634 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
635 {
636 sessionEndTimestamp_ = end;
637 }
638
GetSessionEndTimestamp() const639 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
640 {
641 return sessionEndTimestamp_;
642 }
643
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)644 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
645 {
646 ReSendInfo reSendInfo;
647 reSendInfo.start = dataTimeRange.beginTime;
648 reSendInfo.end = dataTimeRange.endTime;
649 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
650 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
651 reSendInfo.packetId = context->GetPacketId();
652 maxSequenceIdHasSent_++;
653 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
654 windowSize_--;
655 ContinueToken token;
656 context->GetContinueToken(token);
657 if (token == nullptr) {
658 isAllDataHasSent_ = true;
659 }
660 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
661 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
662 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
663 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
664 }
665
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)666 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
667 SyncEntry &syncData, int sendCode, int mode)
668 {
669 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
670 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
671 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
672 WaterMark localMark = 0;
673 WaterMark peerMark = 0;
674 WaterMark deleteMark = 0;
675 bool needCompressOnSync = false;
676 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
677 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
678 std::string id = context->GetQuerySyncId();
679 GetLocalWaterMark(curType, id, context, localMark);
680 GetPeerWaterMark(curType, id, context->GetDeviceId(), context->GetTargetUserId(), peerMark);
681 GetLocalDeleteSyncWaterMark(context, deleteMark);
682 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
683 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
684 packet->SetLastSequence();
685 }
686 int tmpMode = mode;
687 if (mode == SyncModeType::RESPONSE_PULL) {
688 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
689 }
690 packet->SetData(syncData.entries);
691 packet->SetCompressData(syncData.compressedEntries);
692 packet->SetBasicInfo(sendCode, version, tmpMode);
693 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
694 packet->SetWaterMark(localMark, peerMark, deleteMark);
695 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
696 packet->SetEndWaterMark(context->GetEndMark());
697 packet->SetSessionId(context->GetRequestSessionId());
698 }
699 packet->SetQuery(context->GetQuery());
700 packet->SetQueryId(context->GetQuerySyncId());
701 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
702 // empty compress data should not mark compress
703 if (!packet->GetCompressData().empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
704 packet->SetCompressDataMark();
705 packet->SetCompressAlgo(curAlgo);
706 }
707 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
708 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
709 context->GetQuery().HasOrderBy())) {
710 packet->SetUpdateWaterMark();
711 }
712 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
713 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
714 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
715 }
716
RequestStart(SingleVerSyncTaskContext * context,int mode)717 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
718 {
719 int errCode = QuerySyncCheck(context);
720 if (errCode != E_OK) {
721 LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
722 return errCode;
723 }
724 errCode = RemoveDeviceDataIfNeed(context);
725 if (errCode != E_OK) {
726 context->SetTaskErrCode(errCode);
727 return errCode;
728 }
729 SyncEntry syncData;
730 // get data
731 errCode = GetMatchData(context, syncData);
732 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
733
734 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
735 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
736 return errCode;
737 }
738
739 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
740 if (packet == nullptr) {
741 LOGE("[DataSync][PushStart] new DataRequestPacket error");
742 return -E_OUT_OF_MEMORY;
743 }
744 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
745 UpdateWaterMark isUpdateWaterMark;
746 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
747 if (errCode == E_OK) {
748 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
749 }
750 FillDataRequestPacket(packet, context, syncData, errCode, mode);
751 errCode = SendDataPacket(curType, packet, context);
752 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
753 if (performance != nullptr) {
754 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
755 }
756 if (errCode == E_OK || errCode == -E_TIMEOUT) {
757 UpdateSendInfo(dataTime, context);
758 }
759 if (errCode == E_OK) {
760 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
761 context->GetQuery().HasOrderBy())) {
762 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
763 return E_OK;
764 }
765 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
766 SaveLocalWaterMark(curType, context, tmpDataTime);
767 }
768 return errCode;
769 }
770
PushStart(SingleVerSyncTaskContext * context)771 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
772 {
773 if (context == nullptr) {
774 return -E_INVALID_ARGS;
775 }
776 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
777 return RequestStart(context,
778 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
779 }
780
PushPullStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
782 {
783 if (context == nullptr) {
784 return -E_INVALID_ARGS;
785 }
786 return RequestStart(context, context->GetMode());
787 }
788
PullRequestStart(SingleVerSyncTaskContext * context)789 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
790 {
791 if (context == nullptr) {
792 return -E_INVALID_ARGS;
793 }
794 int errCode = QuerySyncCheck(context);
795 if (errCode != E_OK) {
796 return errCode;
797 }
798 errCode = RemoveDeviceDataIfNeed(context);
799 if (errCode != E_OK) {
800 context->SetTaskErrCode(errCode);
801 return errCode;
802 }
803 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
804 if (packet == nullptr) {
805 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
806 return -E_OUT_OF_MEMORY;
807 }
808 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
809 WaterMark peerMark = 0;
810 WaterMark localMark = 0;
811 WaterMark deleteMark = 0;
812 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
813 context->GetDeviceId(), context->GetTargetUserId(), peerMark);
814 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
815 GetLocalDeleteSyncWaterMark(context, deleteMark);
816 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
817 WaterMark endMark = context->GetEndMark();
818 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
819 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
820 packet->SetBasicInfo(E_OK, version, context->GetMode());
821 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
822 packet->SetWaterMark(localMark, peerMark, deleteMark);
823 packet->SetEndWaterMark(endMark);
824 packet->SetSessionId(context->GetRequestSessionId());
825 packet->SetQuery(context->GetQuery());
826 packet->SetQueryId(context->GetQuerySyncId());
827 packet->SetLastSequence();
828 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
829 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
830 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
831 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
832 label_.c_str(), STR_MASK(GetDeviceId()));
833 UpdateSendInfo(dataTime, context);
834 return SendDataPacket(syncType, packet, context);
835 }
836
PullResponseStart(SingleVerSyncTaskContext * context)837 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
838 {
839 if (context == nullptr) {
840 return -E_INVALID_ARGS;
841 }
842 SyncEntry syncData;
843 // get data
844 int errCode = GetMatchData(context, syncData);
845 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
846 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
847 (void)SendPullResponseDataPkt(errCode, syncData, context);
848 }
849 return errCode;
850 }
851 // if send finished
852 int ackCode = E_OK;
853 ContinueToken token = nullptr;
854 context->GetContinueToken(token);
855 if (errCode == E_OK && token == nullptr) {
856 LOGD("[DataSync][PullResponse] send last frame end");
857 ackCode = SEND_FINISHED;
858 }
859 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
860 UpdateWaterMark isUpdateWaterMark;
861 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
862 if (errCode == E_OK) {
863 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
864 }
865 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
866 if (errCode == E_OK || errCode == -E_TIMEOUT) {
867 UpdateSendInfo(dataTime, context);
868 }
869 if (errCode == E_OK) {
870 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
871 context->GetQuery().HasOrderBy())) {
872 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
873 return E_OK;
874 }
875 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
876 SaveLocalWaterMark(curType, context, tmpDataTime);
877 }
878 return errCode;
879 }
880
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)881 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
882 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
883 {
884 WaterMark tmpPeerWatermark = dataTime.endTime;
885 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
886 if (isUpdateWaterMark.normalUpdateMark) {
887 tmpPeerWatermark++;
888 }
889 if (isUpdateWaterMark.deleteUpdateMark) {
890 tmpPeerDeletedWatermark++;
891 }
892 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
893 }
894
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)895 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
896 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
897 {
898 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
899 return;
900 }
901 int errCode = E_OK;
902 if (syncType != SyncType::QUERY_SYNC_TYPE) {
903 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), context->GetTargetUserId(), peerWatermark, true);
904 } else {
905 if (peerWatermark != 0) {
906 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
907 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), context->GetTargetUserId(),
908 peerWatermark);
909 if (errCode != E_OK) {
910 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
911 }
912 }
913 if (peerDeletedWatermark != 0) {
914 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
915 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
916 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(),
917 peerDeletedWatermark);
918 }
919 }
920 if (errCode != E_OK) {
921 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
922 }
923 }
924
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)925 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
926 {
927 uint16_t remoteCommunicatorVersion = 0;
928 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
929 -E_NOT_FOUND) {
930 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
931 return -E_VERSION_NOT_SUPPORT;
932 }
933 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
934 if (remoteCommunicatorVersion == 0) {
935 LOGI("[DataSync] set remote version 0");
936 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
937 return E_OK;
938 } else {
939 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
940 if (isControlMsg) {
941 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
942 } else {
943 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
944 }
945 return -E_NEED_ABILITY_SYNC;
946 }
947 }
948
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)949 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
950 {
951 if (context == nullptr || message == nullptr) {
952 return -E_INVALID_ARGS;
953 }
954 auto *packet = message->GetObject<DataRequestPacket>();
955 if (packet == nullptr) {
956 return -E_INVALID_ARGS;
957 }
958 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
959 return DoAbilitySyncIfNeed(context, message);
960 }
961 int32_t sendCode = packet->GetSendCode();
962 if (sendCode == -E_VERSION_NOT_SUPPORT) {
963 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
964 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
965 return -E_WAIT_NEXT_MESSAGE;
966 }
967 // only deal with pull response packet errCode
968 if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
969 message->GetSessionId() == context->GetRequestSessionId()) {
970 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
971 return sendCode;
972 }
973 int errCode = RunPermissionCheck(context, message, packet);
974 if (errCode != E_OK) {
975 return errCode;
976 }
977 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
978 errCode = CheckSchemaStrategy(context, message);
979 }
980 if (errCode == E_OK) {
981 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
982 }
983 if (errCode != E_OK) {
984 (void)SendDataAck(context, message, errCode, 0);
985 return errCode;
986 }
987 errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
988 if (errCode != E_OK) {
989 (void)SendDataAck(context, message, errCode, 0);
990 }
991 return errCode;
992 }
993
DataRequestRecvInner(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)994 int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, const Message *message,
995 WaterMark &pullEndWatermark)
996 {
997 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
998 if (packet == nullptr) {
999 LOGE("[DataSync][DataRequestRecv] get packet object failed");
1000 return -E_INVALID_ARGS;
1001 }
1002 const std::vector<SendDataItem> &data = packet->GetData();
1003 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1004 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
1005 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
1006 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
1007 context->SetReceiveWaterMarkErr(false);
1008 UpdateWaterMark isUpdateWaterMark;
1009 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1010 int errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1011 if (errCode != E_OK) {
1012 return errCode;
1013 }
1014 Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1015 if (WaterMarkErrHandle(curType, context, message)) {
1016 return E_OK;
1017 }
1018 GetPullEndWatermark(context, packet, pullEndWatermark);
1019 // save data first
1020 errCode = SaveData(context, data, curType,
1021 SingleVerDataSyncUtils::GetQueryFromDataRequest(*packet, *context, message->GetSessionId()));
1022 if (errCode != E_OK) {
1023 (void)SendDataAck(context, message, errCode, dataTime.endTime);
1024 return errCode;
1025 }
1026 SingleVerDataSyncUtils::UpdateSyncProcess(context, packet);
1027
1028 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1029 pullEndWatermark = 0;
1030 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1031 } else {
1032 // if data is empty, we don't know the max timestap of this packet.
1033 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1034 }
1035 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1036 context->GetRequestSessionId());
1037 UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1038 context->RefreshSaveTime(packet->IsLastSequence());
1039 if (errCode != E_OK) {
1040 return errCode;
1041 }
1042 if (packet->GetSendCode() == SEND_FINISHED) {
1043 return -E_RECV_FINISHED;
1044 }
1045 return errCode;
1046 }
1047
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)1048 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
1049 WaterMark &pullEndWatermark)
1050 {
1051 int errCode = DataRequestRecvPre(context, message);
1052 if (errCode != E_OK) {
1053 return errCode;
1054 }
1055 return DataRequestRecvInner(context, message, pullEndWatermark);
1056 }
1057
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1058 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1059 SingleVerSyncTaskContext *context)
1060 {
1061 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1062 if (message == nullptr) {
1063 LOGE("[DataSync][SendDataPacket] new message error");
1064 delete packet;
1065 packet = nullptr;
1066 return -E_OUT_OF_MEMORY;
1067 }
1068 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1069 int errCode = message->SetExternalObject(packet);
1070 if (errCode != E_OK) {
1071 delete packet;
1072 packet = nullptr;
1073 delete message;
1074 message = nullptr;
1075 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1076 return errCode;
1077 }
1078 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1079 context->GetSequenceId(), context->GetRequestSessionId());
1080 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1081 if (performance != nullptr) {
1082 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1083 }
1084 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1085 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1086 };
1087 errCode = Send(context, message, handler, packetLen);
1088 if (errCode != E_OK) {
1089 delete message;
1090 message = nullptr;
1091 }
1092
1093 return errCode;
1094 }
1095
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1096 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1097 SingleVerSyncTaskContext *context)
1098 {
1099 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1100 if (packet == nullptr) {
1101 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1102 return -E_OUT_OF_MEMORY;
1103 }
1104 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1105 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1106
1107 if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1108 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1109 uint32_t total = 0u;
1110 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1111 LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1112 packet->SetTotalDataCount(total);
1113 }
1114
1115 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1116 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1117 if (message == nullptr) {
1118 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1119 delete packet;
1120 packet = nullptr;
1121 return -E_OUT_OF_MEMORY;
1122 }
1123 int errCode = message->SetExternalObject(packet);
1124 if (errCode != E_OK) {
1125 delete packet;
1126 packet = nullptr;
1127 delete message;
1128 message = nullptr;
1129 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1130 return errCode;
1131 }
1132 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1133 context->GetSequenceId(), context->GetResponseSessionId());
1134 SendResetWatchDogPacket(context, packetLen);
1135 errCode = Send(context, message, nullptr, packetLen);
1136 if (errCode != E_OK) {
1137 delete message;
1138 message = nullptr;
1139 }
1140 return errCode;
1141 }
1142
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1143 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1144 {
1145 (void)SendDataAck(context, message, E_OK, 0);
1146 }
1147
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1148 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1149 WaterMark maxSendDataTime)
1150 {
1151 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1152 if (packet == nullptr) {
1153 return -E_INVALID_ARGS;
1154 }
1155 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1156 if (ackMessage == nullptr) {
1157 LOGE("[DataSync][SendDataAck] new message error");
1158 return -E_OUT_OF_MEMORY;
1159 }
1160 DataAckPacket ack;
1161 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1162 int errCode = ackMessage->SetCopiedObject(ack);
1163 if (errCode != E_OK) {
1164 delete ackMessage;
1165 ackMessage = nullptr;
1166 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1167 return errCode;
1168 }
1169 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1170 message->GetSequenceId(), message->GetSessionId());
1171
1172 errCode = Send(context, ackMessage, nullptr, 0);
1173 if (errCode != E_OK) {
1174 delete ackMessage;
1175 ackMessage = nullptr;
1176 }
1177 return errCode;
1178 }
1179
AckPacketIdCheck(const Message * message)1180 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1181 {
1182 if (message == nullptr) {
1183 LOGE("[DataSync] AckRecv message nullptr");
1184 return false;
1185 }
1186 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1187 return true;
1188 }
1189 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1190 if (packet == nullptr) {
1191 return false;
1192 }
1193 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1194 std::lock_guard<std::mutex> lock(lock_);
1195 uint32_t sequenceId = message->GetSequenceId();
1196 if (reSendMap_.count(sequenceId) != 0) {
1197 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1198 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1199 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1200 originalPacketId);
1201 return false;
1202 }
1203 }
1204 return true;
1205 }
1206
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1207 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1208 {
1209 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1210 if (errCode != E_OK) {
1211 return errCode;
1212 }
1213 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1214 if (packet == nullptr) {
1215 return -E_INVALID_ARGS;
1216 }
1217 int32_t recvCode = packet->GetRecvCode();
1218 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1219 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1220 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1221 LOGE("[DataSync][AckRecv] Version mismatch");
1222 return -E_VERSION_NOT_SUPPORT;
1223 }
1224
1225 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1226 // we should ReleaseContinueToken, avoid crash
1227 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1228 STR_MASK(GetDeviceId()));
1229 context->ReleaseContinueToken();
1230 return recvCode;
1231 }
1232 uint64_t data = packet->GetData();
1233 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1234 return DealWaterMarkException(context, data, packet->GetReserved());
1235 }
1236
1237 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1238 // data only use low 32bit
1239 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1240 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1241 STR_MASK(GetDeviceId()));
1242 }
1243
1244 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1245 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1246 label_.c_str(), STR_MASK(GetDeviceId()));
1247 return recvCode;
1248 }
1249
1250 // Judge if send finished
1251 ContinueToken token;
1252 context->GetContinueToken(token);
1253 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1254 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1255 return -E_NO_DATA_SEND;
1256 }
1257
1258 // send next data
1259 return -E_SEND_DATA;
1260 }
1261
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1262 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1263 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1264 {
1265 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1266 LOGE("[SingleVerDataSync] messageId not available.");
1267 return;
1268 }
1269 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1270 if (ackMessage == nullptr) {
1271 LOGE("[DataSync][SaveDataNotify] new message failed");
1272 return;
1273 }
1274
1275 DataAckPacket ack;
1276 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1277 ack.SetVersion(pktVersion);
1278 int errCode = ackMessage->SetCopiedObject(ack);
1279 if (errCode != E_OK) {
1280 delete ackMessage;
1281 ackMessage = nullptr;
1282 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1283 return;
1284 }
1285 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1286
1287 errCode = Send(context, ackMessage, nullptr, 0);
1288 if (errCode != E_OK) {
1289 delete ackMessage;
1290 ackMessage = nullptr;
1291 }
1292 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1293 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1294 }
1295
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1296 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1297 WaterMark &pullEndWatermark) const
1298 {
1299 if (packet == nullptr) {
1300 return;
1301 }
1302 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1303 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1304 WaterMark endMark = packet->GetEndWaterMark();
1305 TimeOffset offset;
1306 metadata_->GetTimeOffset(context->GetDeviceId(), context->GetTargetUserId(), offset);
1307 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1308 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1309 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1310 }
1311 }
1312
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1313 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1314 const std::vector<uint64_t> &reserved)
1315 {
1316 WaterMark deletedWaterMark = 0;
1317 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1318 if (curType == SyncType::QUERY_SYNC_TYPE) {
1319 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1320 LOGE("[DataSync] get packet reserve size failed");
1321 return -E_INVALID_ARGS;
1322 }
1323 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1324 }
1325 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1326 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1327 int errCode = SaveLocalWaterMark(curType, context,
1328 {0, 0, ackWaterMark, deletedWaterMark});
1329 if (errCode != E_OK) {
1330 return errCode;
1331 }
1332 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1333 context->IncNegotiationCount();
1334 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1335 if (!context->IsNeedClearRemoteStaleData()) {
1336 return -E_RE_SEND_DATA;
1337 }
1338 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1339 if (errCode != E_OK) {
1340 return errCode;
1341 }
1342 return -E_RE_SEND_DATA;
1343 }
1344
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1345 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1346 const DataRequestPacket *packet)
1347 {
1348 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1349 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1350 if (errCode != E_OK) {
1351 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1352 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1353 }
1354 return -E_NOT_PERMIT;
1355 }
1356 const std::vector<SendDataItem> &data = packet->GetData();
1357 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1358 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1359 auto securityOption = packet->GetSecurityOption();
1360 if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1361 securityOption.securityLabel != SecurityLabel::NOT_SET) {
1362 context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1363 }
1364 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1365 !context->GetReceivcPermitCheck()) {
1366 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1367 if (permitReceive) {
1368 context->SetReceivcPermitCheck(true);
1369 } else {
1370 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1371 return -E_SECURITY_OPTION_CHECK_ERROR;
1372 }
1373 }
1374 return errCode;
1375 }
1376
1377 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1378 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1379 {
1380 // When mtu less than 30k, we send data with bluetooth
1381 // In order not to block the bluetooth channel, we don't send notify packet here
1382 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1383 return;
1384 }
1385 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1386
1387 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1388 if (ackMessage == nullptr) {
1389 LOGE("[DataSync][ResetWatchDog] new message failed");
1390 return;
1391 }
1392
1393 DataAckPacket ack;
1394 ack.SetData(data);
1395 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1396 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1397 int errCode = ackMessage->SetCopiedObject(ack);
1398 if (errCode != E_OK) {
1399 delete ackMessage;
1400 ackMessage = nullptr;
1401 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1402 return;
1403 }
1404 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1405 context->GetSequenceId(), context->GetResponseSessionId());
1406
1407 errCode = Send(context, ackMessage, nullptr, 0);
1408 if (errCode != E_OK) {
1409 delete ackMessage;
1410 ackMessage = nullptr;
1411 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1412 STR_MASK(GetDeviceId()));
1413 } else {
1414 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1415 STR_MASK(GetDeviceId()));
1416 }
1417 }
1418
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1419 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1420 {
1421 if (context == nullptr) {
1422 return -E_INVALID_ARGS;
1423 }
1424 int errCode = RemoveDeviceDataIfNeed(context);
1425 if (errCode != E_OK) {
1426 context->SetTaskErrCode(errCode);
1427 return errCode;
1428 }
1429 SyncEntry syncData;
1430 errCode = GetReSendData(syncData, context, reSendInfo);
1431 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1432 return errCode;
1433 }
1434 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1435 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1436 if (packet == nullptr) {
1437 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1438 return -E_OUT_OF_MEMORY;
1439 }
1440 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1441 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1442 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1443 LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1444 return errCode;
1445 }
1446 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1447 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1448 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1449 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1450 dataTime.deleteEndTime += 1;
1451 }
1452 if (reSendInfo.end > reSendInfo.start) {
1453 dataTime.endTime += 1;
1454 }
1455 errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1456 if (errCode != E_OK) {
1457 LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1458 }
1459 }
1460 return errCode;
1461 }
1462
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1463 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1464 uint32_t sessionId, uint32_t sequenceId)
1465 {
1466 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1467 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1468 if (message == nullptr) {
1469 LOGE("[DataSync][SendDataPacket] new message error");
1470 delete packet;
1471 packet = nullptr;
1472 return -E_OUT_OF_MEMORY;
1473 }
1474 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1475 int errCode = message->SetExternalObject(packet);
1476 if (errCode != E_OK) {
1477 delete packet;
1478 packet = nullptr;
1479 delete message;
1480 message = nullptr;
1481 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1482 return errCode;
1483 }
1484 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1485 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1486 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1487 };
1488 errCode = Send(context, message, handler, packetLen);
1489 if (errCode != E_OK) {
1490 delete message;
1491 message = nullptr;
1492 }
1493 return errCode;
1494 }
1495
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1496 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1497 {
1498 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1499 int mode = SyncOperation::TransferSyncMode(inMode);
1500 // for pull mode it just need to get data, no need to send data.
1501 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1502 return E_OK;
1503 }
1504 if (context->GetSendPermitCheck()) {
1505 return E_OK;
1506 }
1507 bool isPermitSync = true;
1508 std::string deviceId = context->GetDeviceId();
1509 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1510 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1511 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1512 }
1513 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1514 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1515 if (isPermitSync) {
1516 context->SetSendPermitCheck(true);
1517 return E_OK;
1518 }
1519 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1520 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1521 return -E_SECURITY_OPTION_CHECK_ERROR;
1522 }
1523 if (mode == SyncModeType::RESPONSE_PULL) {
1524 SyncEntry syncData;
1525 (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1526 return -E_SECURITY_OPTION_CHECK_ERROR;
1527 }
1528 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1529 return -E_SECURITY_OPTION_CHECK_ERROR;
1530 }
1531 return E_OK;
1532 }
1533
GetLabel() const1534 std::string SingleVerDataSync::GetLabel() const
1535 {
1536 return label_;
1537 }
1538
GetDeviceId() const1539 std::string SingleVerDataSync::GetDeviceId() const
1540 {
1541 return deviceId_;
1542 }
1543
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1544 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1545 {
1546 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1547 if (packet == nullptr) {
1548 LOGE("[WaterMarkErrHandle] get packet object failed");
1549 return -E_INVALID_ARGS;
1550 }
1551 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1552 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1553 WaterMark peerMark = 0;
1554 WaterMark deletedMark = 0;
1555 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), context->GetTargetUserId(), peerMark);
1556 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1557 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), deletedMark);
1558 }
1559 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1560 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1561 context->SetReceiveWaterMarkErr(true);
1562 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1563 return true;
1564 }
1565 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1566 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1567 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1568 peerMark);
1569 context->SetReceiveWaterMarkErr(true);
1570 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1571 return true;
1572 }
1573 return false;
1574 }
1575
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1576 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1577 {
1578 auto *packet = message->GetObject<DataRequestPacket>();
1579 if (packet == nullptr) {
1580 return -E_INVALID_ARGS;
1581 }
1582 if (metadata_->IsAbilitySyncFinish(deviceId_, context->GetTargetUserId())) {
1583 return E_OK;
1584 }
1585 auto query = packet->GetQuery();
1586 std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1587 if (!schemaSyncStatus.second) {
1588 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1589 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1590 return -E_NEED_ABILITY_SYNC;
1591 }
1592 if (!schemaSyncStatus.first) {
1593 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1594 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1595 return -E_SCHEMA_MISMATCH;
1596 }
1597 return E_OK;
1598 }
1599
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1600 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1601 {
1602 int mode = SyncOperation::TransferSyncMode(inMode);
1603 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1604 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1605 return;
1606 }
1607 if (storage_ == nullptr) {
1608 LOGE("RemotePushFinished fail, storage is nullptr.");
1609 return;
1610 }
1611
1612 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1613 storage_->NotifyRemotePushFinished(deviceId_);
1614 }
1615 }
1616
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1617 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1618 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1619 {
1620 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1621 WaterMark localMark = 0;
1622 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1623 ackPacket.SetRecvCode(recvCode);
1624 WaterMark mark = 0;
1625 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), context->GetTargetUserId(), mark);
1626 WaterMark deletedPeerMark = 0;
1627 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), deletedPeerMark);
1628 SyncTimeRange dataTime = {.endTime = mark, .deleteEndTime = deletedPeerMark};
1629 bool isPacketWaterLower = false;
1630 // send ack packet
1631 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1632 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1633 } else if (recvCode != WATER_MARK_INVALID) {
1634 if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetLocalWaterMark() < mark) {
1635 LOGI("[DataSync][SetAckPacket] packetLocalMark=%" PRIu64 ",lockMark=%" PRIu64, packet->GetLocalWaterMark(),
1636 mark);
1637 isPacketWaterLower = true;
1638 dataTime.endTime = packet->GetLocalWaterMark();
1639 mark = packet->GetLocalWaterMark();
1640 }
1641 ackPacket.SetData(mark);
1642 }
1643 std::vector<uint64_t> reserved {localMark};
1644 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1645 uint64_t packetId = 0;
1646 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1647 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1648 }
1649 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1650 reserved.push_back(packetId);
1651 }
1652 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1653 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1654 if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetDeletedWaterMark() < deletedPeerMark) {
1655 LOGI("[DataSync][SetAckPacket] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64,
1656 packet->GetDeletedWaterMark(), deletedPeerMark);
1657 isPacketWaterLower = true;
1658 dataTime.deleteEndTime = packet->GetDeletedWaterMark();
1659 deletedPeerMark = packet->GetDeletedWaterMark();
1660 }
1661 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1662 }
1663 ackPacket.SetReserved(reserved);
1664 ackPacket.SetVersion(version);
1665 UpdateWaterMark isUpdateWaterMark = {true, true};
1666 if (isPacketWaterLower) {
1667 UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1668 }
1669 }
1670
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1671 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1672 DataSyncReSendInfo reSendInfo)
1673 {
1674 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1675 if (mode == SyncModeType::PULL) {
1676 return E_OK;
1677 }
1678 ContinueToken token = nullptr;
1679 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1680 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1681 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1682 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1683 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1684 int errCode;
1685 if (curType != SyncType::QUERY_SYNC_TYPE) {
1686 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1687 reSendDataSizeInfo);
1688 } else {
1689 QuerySyncObject queryObj = context->GetQuery();
1690 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1691 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1692 }
1693 if (token != nullptr) {
1694 storage_->ReleaseContinueToken(token);
1695 }
1696 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1697 context->SetTaskErrCode(errCode);
1698 return errCode;
1699 }
1700 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1701 return errCode;
1702 }
1703 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1704 DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1705
1706 int innerCode = InterceptData(syncData);
1707 if (innerCode != E_OK) {
1708 context->SetTaskErrCode(innerCode);
1709 return innerCode;
1710 }
1711
1712 bool needCompressOnSync = false;
1713 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1714 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1715 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1716 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1717 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1718 { remoteAlgo, version });
1719 if (compressCode != E_OK) {
1720 return compressCode;
1721 }
1722 }
1723 return errCode;
1724 }
1725
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1726 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1727 {
1728 if (context == nullptr) {
1729 LOGE("[SingleVerDataSync][RemoveDeviceDataIfNeed] context is nullptr.");
1730 return -E_INVALID_ARGS;
1731 }
1732 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1733 return E_OK;
1734 }
1735 uint64_t clearRemoteDataMark = 0;
1736 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1737 metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearRemoteDataMark);
1738 if (clearRemoteDataMark == 0) {
1739 return E_OK;
1740 }
1741 int errCode = E_OK;
1742 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1743 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1744 if (errCode != E_OK) {
1745 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1746 return errCode;
1747 }
1748 }
1749 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1750 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
1751 if (errCode != E_OK) {
1752 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1753 return errCode;
1754 }
1755 }
1756 return E_OK;
1757 }
1758
UpdateMtuSize()1759 void SingleVerDataSync::UpdateMtuSize()
1760 {
1761 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1762 }
1763
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1764 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1765 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1766 {
1767 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1768 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1769 WaterMark peerMark = 0;
1770 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), context->GetTargetUserId(),
1771 peerMark);
1772 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1773 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1774 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1775 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1776 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1777 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1778 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1779 packet->SetLastSequence();
1780 }
1781 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1782 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1783 sendCode = SEND_FINISHED;
1784 }
1785 packet->SetData(syncData.entries);
1786 packet->SetCompressData(syncData.compressedEntries);
1787 packet->SetBasicInfo(sendCode, version, reSendMode);
1788 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1789 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1790 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1791 packet->SetEndWaterMark(context->GetEndMark());
1792 packet->SetQuery(context->GetQuery());
1793 }
1794 packet->SetQueryId(context->GetQuerySyncId());
1795 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1796 std::vector<uint64_t> reserved {reSendInfo.packetId};
1797 packet->SetReserved(reserved);
1798 }
1799 if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1800 // resend pull packet dont set compress type
1801 return;
1802 }
1803 bool needCompressOnSync = false;
1804 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1805 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1806 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1807 if (!packet->GetCompressData().empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1808 packet->SetCompressDataMark();
1809 packet->SetCompressAlgo(curAlgo);
1810 }
1811 FillRequestReSendPacketV2(context, packet);
1812 }
1813
FillRequestReSendPacketV2(const SingleVerSyncTaskContext * context,DataRequestPacket * packet)1814 void SingleVerDataSync::FillRequestReSendPacketV2(const SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1815 {
1816 if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1817 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1818 uint32_t total = 0u;
1819 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1820 packet->SetTotalDataCount(total);
1821 }
1822 }
1823
GetDataSizeSpecInfo(size_t packetSize)1824 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1825 {
1826 bool needCompressOnSync = false;
1827 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1828 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1829 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1830 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1831 return {blockSize, packetSize};
1832 }
1833
InterceptData(SyncEntry & syncEntry)1834 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1835 {
1836 if (storage_ == nullptr) {
1837 LOGE("Invalid DB. Can not intercept data.");
1838 return -E_INVALID_DB;
1839 }
1840
1841 // GetLocalDeviceName get local device ID.
1842 // GetDeviceId get remote device ID.
1843 // If intercept data fail, entries will be released.
1844 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1845 }
1846
ControlCmdStart(SingleVerSyncTaskContext * context)1847 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1848 {
1849 if (context == nullptr) {
1850 return -E_INVALID_ARGS;
1851 }
1852 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1853 if (subManager == nullptr) {
1854 return -E_INVALID_ARGS;
1855 }
1856 int errCode = ControlCmdStartCheck(context);
1857 if (errCode != E_OK) {
1858 return errCode;
1859 }
1860 auto packet = new (std::nothrow) SubscribeRequest();
1861 if (packet == nullptr) {
1862 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1863 return -E_OUT_OF_MEMORY;
1864 }
1865 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1866 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1867 if (errCode != E_OK) {
1868 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1869 delete packet;
1870 packet = nullptr;
1871 return errCode;
1872 }
1873 }
1874 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1875 errCode = SendControlPacket(packet, context);
1876 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1877 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1878 }
1879 return errCode;
1880 }
1881
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1882 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1883 {
1884 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1885 if (packet == nullptr) {
1886 return -E_INVALID_ARGS;
1887 }
1888 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1889 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1890 int errCode = ControlCmdRequestRecvPre(context, message);
1891 if (errCode != E_OK) {
1892 return errCode;
1893 }
1894 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1895 errCode = SubscribeRequestRecv(context, message);
1896 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1897 errCode = UnsubscribeRequestRecv(context, message);
1898 }
1899 return errCode;
1900 }
1901
UpdatePeerWaterMarkInner(const DataRequestPacket & packet,const SyncTimeRange & dataTime,const UpdateWaterMark & isUpdateWaterMark,const SingleVerSyncTaskContext * context)1902 void SingleVerDataSync::UpdatePeerWaterMarkInner(const DataRequestPacket &packet, const SyncTimeRange &dataTime,
1903 const UpdateWaterMark &isUpdateWaterMark, const SingleVerSyncTaskContext *context)
1904 {
1905 SyncType curType = SyncOperation::GetSyncType(packet.GetMode());
1906 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1907 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1908 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet.IsNeedUpdateWaterMark()) {
1909 UpdateQueryPeerWaterMark(curType, packet.GetQueryId(), dataTime, context, isUpdateWaterMark);
1910 }
1911 }
1912 } // namespace DistributedDB
1913