1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 }
44
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)45 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
46 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
47 {
48 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
49 return -E_INVALID_ARGS;
50 }
51 storage_ = static_cast<SyncGenericInterface *>(inStorage);
52 communicateHandle_ = inCommunicateHandle;
53 metadata_ = inMetadata;
54 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
55 std::vector<uint8_t> label = inStorage->GetIdentifier();
56 label.resize(3); // only show 3 Bytes enough
57 label_ = DBCommon::VectorToHexString(label);
58 deviceId_ = deviceId;
59 msgSchedule_.Initialize(label_, deviceId_);
60 return E_OK;
61 }
62
SyncStart(int mode,SingleVerSyncTaskContext * context)63 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
64 {
65 std::lock_guard<std::mutex> lock(lock_);
66 int errCode = CheckPermitSendData(mode, context);
67 if (errCode != E_OK) {
68 return errCode;
69 }
70 if (sessionId_ != 0) { // auto sync timeout resend
71 return ReSendData(context);
72 }
73 ResetSyncStatus(mode, context);
74 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
75 int tmpMode = SyncOperation::TransferSyncMode(mode);
76 if (tmpMode == SyncModeType::PUSH) {
77 errCode = PushStart(context);
78 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79 errCode = PushPullStart(context);
80 } else if (tmpMode == SyncModeType::PULL) {
81 errCode = PullRequestStart(context);
82 } else {
83 SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
84 errCode = PullResponseStart(context);
85 }
86 if (context->IsSkipTimeoutError(errCode)) {
87 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
88 // just return to avoid higher pressure for send.
89 return E_OK;
90 }
91 if (errCode != E_OK) {
92 LOGE("[DataSync] SendStart errCode=%d", errCode);
93 return errCode;
94 }
95 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
96 LOGE("wait for recv finished for push and pull mode");
97 return -E_EKEYREVOKED;
98 }
99 return InnerSyncStart(context);
100 }
101
InnerSyncStart(SingleVerSyncTaskContext * context)102 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
103 {
104 int errCode = CheckPermitSendData(mode_, context);
105 if (errCode != E_OK) {
106 return errCode;
107 }
108 while (true) {
109 if (windowSize_ <= 0 || isAllDataHasSent_) {
110 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
111 label_.c_str(), STR_MASK(deviceId_));
112 return E_OK;
113 }
114 int mode = SyncOperation::TransferSyncMode(mode_);
115 if (mode == SyncModeType::PULL) {
116 LOGE("[DataSync] unexpected error");
117 return -E_INVALID_ARGS;
118 }
119 context->IncSequenceId();
120 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
121 errCode = PushStart(context);
122 } else {
123 errCode = PullResponseStart(context);
124 }
125 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
126 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
127 isAllDataHasSent_ = true;
128 return -E_EKEYREVOKED;
129 }
130 if (context->IsSkipTimeoutError(errCode)) {
131 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
132 // just return to avoid higher pressure for send.
133 return E_OK;
134 }
135 if (errCode != E_OK) {
136 LOGE("[DataSync] InnerSend errCode=%d", errCode);
137 return errCode;
138 }
139 }
140 return E_OK;
141 }
142
InnerClearSyncStatus()143 void SingleVerDataSync::InnerClearSyncStatus()
144 {
145 sessionId_ = 0;
146 reSendMap_.clear();
147 windowSize_ = 0;
148 maxSequenceIdHasSent_ = 0;
149 isAllDataHasSent_ = false;
150 }
151
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)152 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
153 {
154 if (message == nullptr) {
155 LOGE("[DataSync] AckRecv message nullptr");
156 return -E_INVALID_ARGS;
157 }
158 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
159 if (packet == nullptr) {
160 return -E_INVALID_ARGS;
161 }
162 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
163 uint32_t sessionId = message->GetSessionId();
164 uint32_t sequenceId = message->GetSequenceId();
165
166 std::lock_guard<std::mutex> lock(lock_);
167 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
168 windowSize_, label_.c_str(), STR_MASK(deviceId_));
169 if (sessionId != sessionId_) {
170 LOGI("[DataSync] ignore ack,sessionId is different");
171 return E_OK;
172 }
173 Timestamp lastQueryTime = 0;
174 if (reSendMap_.count(sequenceId) != 0) {
175 lastQueryTime = reSendMap_[sequenceId].end;
176 reSendMap_.erase(sequenceId);
177 windowSize_++;
178 } else {
179 LOGI("[DataSync] ack seqId not in map");
180 return E_OK;
181 }
182 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
183 Timestamp dbLastQueryTime = 0;
184 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
185 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
186 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
187 }
188 if (errCode != E_OK) {
189 return errCode;
190 }
191 }
192 if (!isAllDataHasSent_) {
193 return InnerSyncStart(context);
194 } else if (reSendMap_.empty()) {
195 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
196 InnerClearSyncStatus();
197 return -E_FINISHED;
198 }
199 return E_OK;
200 }
201
ClearSyncStatus()202 void SingleVerDataSync::ClearSyncStatus()
203 {
204 std::lock_guard<std::mutex> lock(lock_);
205 InnerClearSyncStatus();
206 }
207
ReSendData(SingleVerSyncTaskContext * context)208 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
209 {
210 if (reSendMap_.empty()) {
211 LOGI("[DataSync] ReSend map empty");
212 return -E_INTERNAL_ERROR;
213 }
214 uint32_t sequenceId = reSendMap_.begin()->first;
215 ReSendInfo reSendInfo = reSendMap_.begin()->second;
216 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
217 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
218 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
219 windowSize_, label_.c_str(), STR_MASK(deviceId_));
220 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
221 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
222 return ReSend(context, dataReSendInfo);
223 }
224
GetLocalDeviceName()225 std::string SingleVerDataSync::GetLocalDeviceName()
226 {
227 std::string deviceInfo;
228 if (communicateHandle_ != nullptr) {
229 communicateHandle_->GetLocalIdentity(deviceInfo);
230 }
231 return deviceInfo;
232 }
233
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)234 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
235 uint32_t packetLen)
236 {
237 bool startFeedDogRet = false;
238 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
239 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
240 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
241 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
242 }
243 SendConfig sendConfig;
244 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
245 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
246 if (errCode != E_OK) {
247 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
248 if (startFeedDogRet) {
249 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
250 }
251 }
252 return errCode;
253 }
254
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)255 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
256 std::vector<SendDataItem> &outData, size_t packetSize)
257 {
258 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
259 if (performance != nullptr) {
260 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
261 }
262 // start a watch dog before get data
263 // it will send ack util get data finished
264 context->StartFeedDogForGetData(context->GetResponseSessionId());
265 int errCode = GetData(context, packetSize, outData);
266 context->StopFeedDogForGetData();
267 if (performance != nullptr) {
268 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
269 }
270 if (!outData.empty()) {
271 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
272 }
273 return errCode;
274 }
275
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)276 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
277 {
278 int errCode;
279 UpdateMtuSize();
280 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
281 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
282 LOGI("[DataSync][GetData] resend data");
283 errCode = GetUnsyncData(context, outData, packetSize);
284 } else {
285 ContinueToken token;
286 context->GetContinueToken(token);
287 if (token == nullptr) {
288 errCode = GetUnsyncData(context, outData, packetSize);
289 } else {
290 LOGD("[DataSync][GetData] get data from token");
291 // if there is data to send, read out data, and update local watermark, send data
292 errCode = GetNextUnsyncData(context, outData, packetSize);
293 }
294 }
295 if (errCode == -E_UNFINISHED) {
296 LOGD("[DataSync][GetData] not finished.");
297 }
298 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
299 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
300 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
301 }
302 return errCode;
303 }
304
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)305 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
306 {
307 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
308 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
309 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
310 bool needCompressOnSync = false;
311 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
312 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
313 int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
314 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
315 context->SetTaskErrCode(errCode);
316 return errCode;
317 }
318
319 int innerCode = InterceptData(syncOutData);
320 if (innerCode != E_OK) {
321 context->SetTaskErrCode(innerCode);
322 return innerCode;
323 }
324
325 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
326 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
327 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
328 { remoteAlgo, version });
329 if (compressCode != E_OK) {
330 return compressCode;
331 }
332 }
333 return errCode;
334 }
335
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)336 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
337 size_t packetSize)
338 {
339 SyncTimeRange waterRange;
340 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
341 WaterMark startMark = 0;
342 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
343 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
344 if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
345 return E_OK;
346 }
347 waterRange.beginTime = startMark;
348 if (curType == SyncType::QUERY_SYNC_TYPE) {
349 WaterMark deletedStartMark = 0;
350 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
351 Timestamp lastQueryTimestamp = 0;
352 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
353 lastQueryTimestamp);
354 if (errCode != E_OK) {
355 return errCode;
356 }
357 waterRange.deleteBeginTime = deletedStartMark;
358 waterRange.lastQueryTime = lastQueryTimestamp;
359 }
360 return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
361 }
362
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)363 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
364 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
365 {
366 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
367 ContinueToken token = nullptr;
368 context->GetContinueToken(token);
369 if (token != nullptr) {
370 storage_->ReleaseContinueToken(token);
371 }
372 int errCode;
373 if (curType != SyncType::QUERY_SYNC_TYPE) {
374 errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
375 syncDataSizeInfo);
376 } else {
377 QuerySyncObject queryObj = context->GetQuery();
378 errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
379 }
380 context->SetContinueToken(token);
381 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
382 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
383 }
384 return errCode;
385 }
386
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)387 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
388 size_t packetSize)
389 {
390 ContinueToken token;
391 context->GetContinueToken(token);
392 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
393 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
394 context->SetContinueToken(token);
395 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
396 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
397 }
398 return errCode;
399 }
400
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)401 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
402 SyncType curType, const QuerySyncObject &query)
403 {
404 if (inData.empty()) {
405 return E_OK;
406 }
407 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
408 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
409 if (performance != nullptr) {
410 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
411 }
412 const auto localDeviceName = GetLocalDeviceName();
413 const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
414 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
415 std::vector<SendDataItem> copyData = inData;
416 int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
417 if (errCode != E_OK) {
418 LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
419 return errCode;
420 }
421 // query only support prefix key and don't have query in packet in 104 version
422 errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
423 if (performance != nullptr) {
424 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
425 }
426 if (errCode != E_OK) {
427 LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
428 }
429 return errCode;
430 }
431
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)432 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
433 {
434 mode_ = inMode;
435 maxSequenceIdHasSent_ = 0;
436 isAllDataHasSent_ = false;
437 context->ReSetSequenceId();
438 reSendMap_.clear();
439 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
440 windowSize_ = LOW_VERSION_WINDOW_SIZE;
441 } else {
442 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
443 }
444 int mode = SyncOperation::TransferSyncMode(inMode);
445 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
446 sessionId_ = context->GetRequestSessionId();
447 } else {
448 sessionId_ = context->GetResponseSessionId();
449 }
450 }
451
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)452 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
453 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
454 {
455 WaterMark localMark = 0;
456 WaterMark deleteMark = 0;
457 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
458 GetLocalDeleteSyncWaterMark(context, deleteMark);
459 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
460 }
461
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const462 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
463 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
464 {
465 WaterMark localMark = 0;
466 int errCode = E_OK;
467 const std::string &deviceId = context->GetDeviceId();
468 std::string queryId = context->GetQuerySyncId();
469 if (syncType != SyncType::QUERY_SYNC_TYPE) {
470 if (isCheckBeforUpdate) {
471 GetLocalWaterMark(syncType, queryId, context, localMark);
472 if (localMark >= dataTimeRange.endTime) {
473 return E_OK;
474 }
475 }
476 errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
477 } else {
478 bool isNeedUpdateMark = true;
479 bool isNeedUpdateDeleteMark = true;
480 if (isCheckBeforUpdate) {
481 WaterMark deleteDataWaterMark = 0;
482 GetLocalWaterMark(syncType, queryId, context, localMark);
483 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
484 if (localMark >= dataTimeRange.endTime) {
485 isNeedUpdateMark = false;
486 }
487 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
488 isNeedUpdateDeleteMark = false;
489 }
490 }
491 if (isNeedUpdateMark) {
492 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
493 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
494 if (errCode != E_OK) {
495 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
496 return errCode;
497 }
498 }
499 if (isNeedUpdateDeleteMark) {
500 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
501 dataTimeRange.deleteEndTime);
502 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
503 }
504 }
505 if (errCode != E_OK) {
506 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
507 }
508 return errCode;
509 }
510
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const511 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
512 const DeviceID &deviceId, WaterMark &waterMark) const
513 {
514 if (syncType != SyncType::QUERY_SYNC_TYPE) {
515 metadata_->GetPeerWaterMark(deviceId, waterMark);
516 return;
517 }
518 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
519 }
520
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)521 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
522 {
523 metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
524 }
525
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const526 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
527 WaterMark &waterMark) const
528 {
529 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
530 }
531
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const532 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
533 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
534 {
535 if (syncType != SyncType::QUERY_SYNC_TYPE) {
536 metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
537 return;
538 }
539 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
540 waterMark, context->IsAutoLiftWaterMark());
541 }
542
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)543 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
544 WaterMark maxSendDataTime)
545 {
546 bool isNeedClearRemoteData = false;
547 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
548 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
549 uint64_t clearDeviceDataMark = 0;
550 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
551 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
552 } else {
553 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
554 if (packet == nullptr) {
555 LOGE("[RemoveDeviceDataHandle] get packet object failed");
556 return -E_INVALID_ARGS;
557 }
558 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
559 WaterMark packetLocalMark = packet->GetLocalWaterMark();
560 WaterMark peerMark = 0;
561 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
562 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
563 }
564 if (!isNeedClearRemoteData) {
565 return E_OK;
566 }
567 int errCode = E_OK;
568 if (context->IsNeedClearRemoteStaleData()) {
569 // need to clear remote device history data
570 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
571 if (errCode != E_OK) {
572 (void)SendDataAck(context, message, errCode, maxSendDataTime);
573 return errCode;
574 }
575 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
576 // avoid repeat clear in ack
577 metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
578 }
579 }
580 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
581 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
582 if (errCode != E_OK) {
583 (void)SendDataAck(context, message, errCode, maxSendDataTime);
584 return errCode;
585 }
586 }
587 return E_OK;
588 }
589
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)590 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
591 const std::vector<uint64_t> &reserved)
592 {
593 bool isNeedClearRemoteData = false;
594 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
595 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
596 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
597 uint64_t clearDeviceDataMark = 0;
598 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
599 isNeedClearRemoteData = (clearDeviceDataMark != 0);
600 } else if (reserved.empty()) {
601 WaterMark localMark = 0;
602 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
603 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
604 } else {
605 WaterMark peerMark = 0;
606 GetPeerWaterMark(curType, context->GetQuerySyncId(),
607 context->GetDeviceId(), peerMark);
608 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
609 }
610 if (!isNeedClearRemoteData) {
611 return E_OK;
612 }
613 // need to clear remote historydata
614 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
615 label_.c_str(), STR_MASK(GetDeviceId()));
616 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
617 if (errCode != E_OK) {
618 return errCode;
619 }
620 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
621 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
622 }
623 return errCode;
624 }
625
SetSessionEndTimestamp(Timestamp end)626 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
627 {
628 sessionEndTimestamp_ = end;
629 }
630
GetSessionEndTimestamp() const631 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
632 {
633 return sessionEndTimestamp_;
634 }
635
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)636 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
637 {
638 ReSendInfo reSendInfo;
639 reSendInfo.start = dataTimeRange.beginTime;
640 reSendInfo.end = dataTimeRange.endTime;
641 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
642 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
643 reSendInfo.packetId = context->GetPacketId();
644 maxSequenceIdHasSent_++;
645 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
646 windowSize_--;
647 ContinueToken token;
648 context->GetContinueToken(token);
649 if (token == nullptr) {
650 isAllDataHasSent_ = true;
651 }
652 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
653 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
654 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
655 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
656 }
657
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)658 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
659 SyncEntry &syncData, int sendCode, int mode)
660 {
661 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
662 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
663 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
664 WaterMark localMark = 0;
665 WaterMark peerMark = 0;
666 WaterMark deleteMark = 0;
667 bool needCompressOnSync = false;
668 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
669 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
670 std::string id = context->GetQuerySyncId();
671 GetLocalWaterMark(curType, id, context, localMark);
672 GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
673 GetLocalDeleteSyncWaterMark(context, deleteMark);
674 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
675 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
676 packet->SetLastSequence();
677 }
678 int tmpMode = mode;
679 if (mode == SyncModeType::RESPONSE_PULL) {
680 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
681 }
682 packet->SetData(syncData.entries);
683 packet->SetCompressData(syncData.compressedEntries);
684 packet->SetBasicInfo(sendCode, version, tmpMode);
685 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
686 packet->SetWaterMark(localMark, peerMark, deleteMark);
687 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
688 packet->SetEndWaterMark(context->GetEndMark());
689 packet->SetSessionId(context->GetRequestSessionId());
690 }
691 packet->SetQuery(context->GetQuery());
692 packet->SetQueryId(context->GetQuerySyncId());
693 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
694 // empty compress data should not mark compress
695 if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
696 packet->SetCompressDataMark();
697 packet->SetCompressAlgo(curAlgo);
698 }
699 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
700 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
701 context->GetQuery().HasOrderBy())) {
702 packet->SetUpdateWaterMark();
703 }
704 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
705 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
706 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
707 }
708
RequestStart(SingleVerSyncTaskContext * context,int mode)709 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
710 {
711 int errCode = QuerySyncCheck(context);
712 if (errCode != E_OK) {
713 LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
714 return errCode;
715 }
716 errCode = RemoveDeviceDataIfNeed(context);
717 if (errCode != E_OK) {
718 context->SetTaskErrCode(errCode);
719 return errCode;
720 }
721 SyncEntry syncData;
722 // get data
723 errCode = GetMatchData(context, syncData);
724 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
725
726 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
727 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
728 return errCode;
729 }
730
731 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
732 if (packet == nullptr) {
733 LOGE("[DataSync][PushStart] new DataRequestPacket error");
734 return -E_OUT_OF_MEMORY;
735 }
736 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
737 UpdateWaterMark isUpdateWaterMark;
738 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
739 if (errCode == E_OK) {
740 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
741 }
742 FillDataRequestPacket(packet, context, syncData, errCode, mode);
743 errCode = SendDataPacket(curType, packet, context);
744 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
745 if (performance != nullptr) {
746 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
747 }
748 if (errCode == E_OK || errCode == -E_TIMEOUT) {
749 UpdateSendInfo(dataTime, context);
750 }
751 if (errCode == E_OK) {
752 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
753 context->GetQuery().HasOrderBy())) {
754 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
755 return E_OK;
756 }
757 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
758 SaveLocalWaterMark(curType, context, tmpDataTime);
759 }
760 return errCode;
761 }
762
PushStart(SingleVerSyncTaskContext * context)763 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
764 {
765 if (context == nullptr) {
766 return -E_INVALID_ARGS;
767 }
768 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
769 return RequestStart(context,
770 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
771 }
772
PushPullStart(SingleVerSyncTaskContext * context)773 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
774 {
775 if (context == nullptr) {
776 return -E_INVALID_ARGS;
777 }
778 return RequestStart(context, context->GetMode());
779 }
780
PullRequestStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
782 {
783 if (context == nullptr) {
784 return -E_INVALID_ARGS;
785 }
786 int errCode = QuerySyncCheck(context);
787 if (errCode != E_OK) {
788 return errCode;
789 }
790 errCode = RemoveDeviceDataIfNeed(context);
791 if (errCode != E_OK) {
792 context->SetTaskErrCode(errCode);
793 return errCode;
794 }
795 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
796 if (packet == nullptr) {
797 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
798 return -E_OUT_OF_MEMORY;
799 }
800 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
801 WaterMark peerMark = 0;
802 WaterMark localMark = 0;
803 WaterMark deleteMark = 0;
804 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
805 context->GetDeviceId(), peerMark);
806 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
807 GetLocalDeleteSyncWaterMark(context, deleteMark);
808 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
809 WaterMark endMark = context->GetEndMark();
810 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
811 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
812 packet->SetBasicInfo(E_OK, version, context->GetMode());
813 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
814 packet->SetWaterMark(localMark, peerMark, deleteMark);
815 packet->SetEndWaterMark(endMark);
816 packet->SetSessionId(context->GetRequestSessionId());
817 packet->SetQuery(context->GetQuery());
818 packet->SetQueryId(context->GetQuerySyncId());
819 packet->SetLastSequence();
820 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
821 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
822 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
823 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
824 label_.c_str(), STR_MASK(GetDeviceId()));
825 UpdateSendInfo(dataTime, context);
826 return SendDataPacket(syncType, packet, context);
827 }
828
PullResponseStart(SingleVerSyncTaskContext * context)829 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
830 {
831 if (context == nullptr) {
832 return -E_INVALID_ARGS;
833 }
834 SyncEntry syncData;
835 // get data
836 int errCode = GetMatchData(context, syncData);
837 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
838 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
839 (void)SendPullResponseDataPkt(errCode, syncData, context);
840 }
841 return errCode;
842 }
843 // if send finished
844 int ackCode = E_OK;
845 ContinueToken token = nullptr;
846 context->GetContinueToken(token);
847 if (errCode == E_OK && token == nullptr) {
848 LOGD("[DataSync][PullResponse] send last frame end");
849 ackCode = SEND_FINISHED;
850 }
851 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
852 UpdateWaterMark isUpdateWaterMark;
853 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
854 if (errCode == E_OK) {
855 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
856 }
857 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
858 if (errCode == E_OK || errCode == -E_TIMEOUT) {
859 UpdateSendInfo(dataTime, context);
860 }
861 if (errCode == E_OK) {
862 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
863 context->GetQuery().HasOrderBy())) {
864 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
865 return E_OK;
866 }
867 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
868 SaveLocalWaterMark(curType, context, tmpDataTime);
869 }
870 return errCode;
871 }
872
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)873 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
874 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
875 {
876 WaterMark tmpPeerWatermark = dataTime.endTime;
877 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
878 if (isUpdateWaterMark.normalUpdateMark) {
879 tmpPeerWatermark++;
880 }
881 if (isUpdateWaterMark.deleteUpdateMark) {
882 tmpPeerDeletedWatermark++;
883 }
884 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
885 }
886
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)887 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
888 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
889 {
890 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
891 return;
892 }
893 int errCode = E_OK;
894 if (syncType != SyncType::QUERY_SYNC_TYPE) {
895 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
896 } else {
897 if (peerWatermark != 0) {
898 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
899 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
900 if (errCode != E_OK) {
901 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
902 }
903 }
904 if (peerDeletedWatermark != 0) {
905 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
906 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
907 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
908 }
909 }
910 if (errCode != E_OK) {
911 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
912 }
913 }
914
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)915 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
916 {
917 uint16_t remoteCommunicatorVersion = 0;
918 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
919 -E_NOT_FOUND) {
920 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
921 return -E_VERSION_NOT_SUPPORT;
922 }
923 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
924 if (remoteCommunicatorVersion == 0) {
925 LOGI("[DataSync] set remote version 0");
926 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
927 return E_OK;
928 } else {
929 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
930 if (isControlMsg) {
931 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
932 } else {
933 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
934 }
935 return -E_NEED_ABILITY_SYNC;
936 }
937 }
938
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)939 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
940 {
941 if (context == nullptr || message == nullptr) {
942 return -E_INVALID_ARGS;
943 }
944 auto *packet = message->GetObject<DataRequestPacket>();
945 if (packet == nullptr) {
946 return -E_INVALID_ARGS;
947 }
948 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
949 return DoAbilitySyncIfNeed(context, message);
950 }
951 int32_t sendCode = packet->GetSendCode();
952 if (sendCode == -E_VERSION_NOT_SUPPORT) {
953 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
954 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
955 return -E_WAIT_NEXT_MESSAGE;
956 }
957 // only deal with pull response packet errCode
958 if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
959 message->GetSessionId() == context->GetRequestSessionId()) {
960 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
961 return sendCode;
962 }
963 int errCode = RunPermissionCheck(context, message, packet);
964 if (errCode != E_OK) {
965 return errCode;
966 }
967 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
968 errCode = CheckSchemaStrategy(context, message);
969 }
970 if (errCode == E_OK) {
971 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
972 }
973 if (errCode != E_OK) {
974 (void)SendDataAck(context, message, errCode, 0);
975 return errCode;
976 }
977 errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
978 if (errCode != E_OK) {
979 (void)SendDataAck(context, message, errCode, 0);
980 }
981 return errCode;
982 }
983
DataRequestRecvInner(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)984 int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, const Message *message,
985 WaterMark &pullEndWatermark)
986 {
987 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
988 if (packet == nullptr) {
989 LOGE("[DataSync][DataRequestRecv] get packet object failed");
990 return -E_INVALID_ARGS;
991 }
992 const std::vector<SendDataItem> &data = packet->GetData();
993 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
994 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
995 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
996 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
997 context->SetReceiveWaterMarkErr(false);
998 UpdateWaterMark isUpdateWaterMark;
999 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1000 int errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1001 if (errCode != E_OK) {
1002 return errCode;
1003 }
1004 Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1005 if (WaterMarkErrHandle(curType, context, message)) {
1006 return E_OK;
1007 }
1008 GetPullEndWatermark(context, packet, pullEndWatermark);
1009 // save data first
1010 errCode = SaveData(context, data, curType,
1011 SingleVerDataSyncUtils::GetQueryFromDataRequest(*packet, *context, message->GetSessionId()));
1012 if (errCode != E_OK) {
1013 (void)SendDataAck(context, message, errCode, dataTime.endTime);
1014 return errCode;
1015 }
1016 SingleVerDataSyncUtils::UpdateSyncProcess(context, packet);
1017
1018 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1019 pullEndWatermark = 0;
1020 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1021 } else {
1022 // if data is empty, we don't know the max timestap of this packet.
1023 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1024 }
1025 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1026 context->GetRequestSessionId());
1027 UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1028
1029 if (errCode != E_OK) {
1030 return errCode;
1031 }
1032 if (packet->GetSendCode() == SEND_FINISHED) {
1033 return -E_RECV_FINISHED;
1034 }
1035 return errCode;
1036 }
1037
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)1038 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
1039 WaterMark &pullEndWatermark)
1040 {
1041 int errCode = DataRequestRecvPre(context, message);
1042 if (errCode != E_OK) {
1043 return errCode;
1044 }
1045 return DataRequestRecvInner(context, message, pullEndWatermark);
1046 }
1047
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1048 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1049 SingleVerSyncTaskContext *context)
1050 {
1051 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1052 if (message == nullptr) {
1053 LOGE("[DataSync][SendDataPacket] new message error");
1054 delete packet;
1055 packet = nullptr;
1056 return -E_OUT_OF_MEMORY;
1057 }
1058 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1059 int errCode = message->SetExternalObject(packet);
1060 if (errCode != E_OK) {
1061 delete packet;
1062 packet = nullptr;
1063 delete message;
1064 message = nullptr;
1065 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1066 return errCode;
1067 }
1068 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1069 context->GetSequenceId(), context->GetRequestSessionId());
1070 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1071 if (performance != nullptr) {
1072 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1073 }
1074 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1075 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1076 };
1077 errCode = Send(context, message, handler, packetLen);
1078 if (errCode != E_OK) {
1079 delete message;
1080 message = nullptr;
1081 }
1082
1083 return errCode;
1084 }
1085
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1086 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1087 SingleVerSyncTaskContext *context)
1088 {
1089 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1090 if (packet == nullptr) {
1091 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1092 return -E_OUT_OF_MEMORY;
1093 }
1094 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1095 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1096
1097 if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1098 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1099 uint32_t total = 0u;
1100 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1101 LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1102 packet->SetTotalDataCount(total);
1103 }
1104
1105 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1106 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1107 if (message == nullptr) {
1108 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1109 delete packet;
1110 packet = nullptr;
1111 return -E_OUT_OF_MEMORY;
1112 }
1113 int errCode = message->SetExternalObject(packet);
1114 if (errCode != E_OK) {
1115 delete packet;
1116 packet = nullptr;
1117 delete message;
1118 message = nullptr;
1119 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1120 return errCode;
1121 }
1122 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1123 context->GetSequenceId(), context->GetResponseSessionId());
1124 SendResetWatchDogPacket(context, packetLen);
1125 errCode = Send(context, message, nullptr, packetLen);
1126 if (errCode != E_OK) {
1127 delete message;
1128 message = nullptr;
1129 }
1130 return errCode;
1131 }
1132
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1133 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1134 {
1135 (void)SendDataAck(context, message, E_OK, 0);
1136 }
1137
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1138 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1139 WaterMark maxSendDataTime)
1140 {
1141 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1142 if (packet == nullptr) {
1143 return -E_INVALID_ARGS;
1144 }
1145 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1146 if (ackMessage == nullptr) {
1147 LOGE("[DataSync][SendDataAck] new message error");
1148 return -E_OUT_OF_MEMORY;
1149 }
1150 DataAckPacket ack;
1151 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1152 int errCode = ackMessage->SetCopiedObject(ack);
1153 if (errCode != E_OK) {
1154 delete ackMessage;
1155 ackMessage = nullptr;
1156 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1157 return errCode;
1158 }
1159 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1160 message->GetSequenceId(), message->GetSessionId());
1161
1162 errCode = Send(context, ackMessage, nullptr, 0);
1163 if (errCode != E_OK) {
1164 delete ackMessage;
1165 ackMessage = nullptr;
1166 }
1167 return errCode;
1168 }
1169
AckPacketIdCheck(const Message * message)1170 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1171 {
1172 if (message == nullptr) {
1173 LOGE("[DataSync] AckRecv message nullptr");
1174 return false;
1175 }
1176 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1177 return true;
1178 }
1179 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1180 if (packet == nullptr) {
1181 return false;
1182 }
1183 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1184 std::lock_guard<std::mutex> lock(lock_);
1185 uint32_t sequenceId = message->GetSequenceId();
1186 if (reSendMap_.count(sequenceId) != 0) {
1187 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1188 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1189 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1190 originalPacketId);
1191 return false;
1192 }
1193 }
1194 return true;
1195 }
1196
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1197 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1198 {
1199 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1200 if (errCode != E_OK) {
1201 return errCode;
1202 }
1203 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1204 if (packet == nullptr) {
1205 return -E_INVALID_ARGS;
1206 }
1207 int32_t recvCode = packet->GetRecvCode();
1208 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1209 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1210 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1211 LOGE("[DataSync][AckRecv] Version mismatch");
1212 return -E_VERSION_NOT_SUPPORT;
1213 }
1214
1215 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1216 // we should ReleaseContinueToken, avoid crash
1217 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1218 STR_MASK(GetDeviceId()));
1219 context->ReleaseContinueToken();
1220 return recvCode;
1221 }
1222 uint64_t data = packet->GetData();
1223 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1224 return DealWaterMarkException(context, data, packet->GetReserved());
1225 }
1226
1227 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1228 // data only use low 32bit
1229 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1230 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1231 STR_MASK(GetDeviceId()));
1232 }
1233
1234 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1235 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1236 label_.c_str(), STR_MASK(GetDeviceId()));
1237 return recvCode;
1238 }
1239
1240 // Judge if send finished
1241 ContinueToken token;
1242 context->GetContinueToken(token);
1243 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1244 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1245 return -E_NO_DATA_SEND;
1246 }
1247
1248 // send next data
1249 return -E_SEND_DATA;
1250 }
1251
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1252 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1253 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1254 {
1255 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1256 LOGE("[SingleVerDataSync] messageId not available.");
1257 return;
1258 }
1259 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1260 if (ackMessage == nullptr) {
1261 LOGE("[DataSync][SaveDataNotify] new message failed");
1262 return;
1263 }
1264
1265 DataAckPacket ack;
1266 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1267 ack.SetVersion(pktVersion);
1268 int errCode = ackMessage->SetCopiedObject(ack);
1269 if (errCode != E_OK) {
1270 delete ackMessage;
1271 ackMessage = nullptr;
1272 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1273 return;
1274 }
1275 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1276
1277 errCode = Send(context, ackMessage, nullptr, 0);
1278 if (errCode != E_OK) {
1279 delete ackMessage;
1280 ackMessage = nullptr;
1281 }
1282 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1283 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1284 }
1285
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1286 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1287 WaterMark &pullEndWatermark) const
1288 {
1289 if (packet == nullptr) {
1290 return;
1291 }
1292 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1293 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1294 WaterMark endMark = packet->GetEndWaterMark();
1295 TimeOffset offset;
1296 metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1297 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1298 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1299 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1300 }
1301 }
1302
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1303 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1304 const std::vector<uint64_t> &reserved)
1305 {
1306 WaterMark deletedWaterMark = 0;
1307 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1308 if (curType == SyncType::QUERY_SYNC_TYPE) {
1309 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1310 LOGE("[DataSync] get packet reserve size failed");
1311 return -E_INVALID_ARGS;
1312 }
1313 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1314 }
1315 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1316 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1317 int errCode = SaveLocalWaterMark(curType, context,
1318 {0, 0, ackWaterMark, deletedWaterMark});
1319 if (errCode != E_OK) {
1320 return errCode;
1321 }
1322 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1323 context->IncNegotiationCount();
1324 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1325 if (!context->IsNeedClearRemoteStaleData()) {
1326 return -E_RE_SEND_DATA;
1327 }
1328 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1329 if (errCode != E_OK) {
1330 return errCode;
1331 }
1332 return -E_RE_SEND_DATA;
1333 }
1334
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1335 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1336 const DataRequestPacket *packet)
1337 {
1338 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1339 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1340 if (errCode != E_OK) {
1341 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1342 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1343 }
1344 return -E_NOT_PERMIT;
1345 }
1346 const std::vector<SendDataItem> &data = packet->GetData();
1347 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1348 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1349 auto securityOption = packet->GetSecurityOption();
1350 if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1351 securityOption.securityLabel != SecurityLabel::NOT_SET) {
1352 context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1353 }
1354 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1355 !context->GetReceivcPermitCheck()) {
1356 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1357 if (permitReceive) {
1358 context->SetReceivcPermitCheck(true);
1359 } else {
1360 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1361 return -E_SECURITY_OPTION_CHECK_ERROR;
1362 }
1363 }
1364 return errCode;
1365 }
1366
1367 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1368 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1369 {
1370 // When mtu less than 30k, we send data with bluetooth
1371 // In order not to block the bluetooth channel, we don't send notify packet here
1372 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1373 return;
1374 }
1375 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1376
1377 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1378 if (ackMessage == nullptr) {
1379 LOGE("[DataSync][ResetWatchDog] new message failed");
1380 return;
1381 }
1382
1383 DataAckPacket ack;
1384 ack.SetData(data);
1385 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1386 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1387 int errCode = ackMessage->SetCopiedObject(ack);
1388 if (errCode != E_OK) {
1389 delete ackMessage;
1390 ackMessage = nullptr;
1391 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1392 return;
1393 }
1394 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1395 context->GetSequenceId(), context->GetResponseSessionId());
1396
1397 errCode = Send(context, ackMessage, nullptr, 0);
1398 if (errCode != E_OK) {
1399 delete ackMessage;
1400 ackMessage = nullptr;
1401 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1402 STR_MASK(GetDeviceId()));
1403 } else {
1404 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1405 STR_MASK(GetDeviceId()));
1406 }
1407 }
1408
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1409 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1410 {
1411 if (context == nullptr) {
1412 return -E_INVALID_ARGS;
1413 }
1414 int errCode = RemoveDeviceDataIfNeed(context);
1415 if (errCode != E_OK) {
1416 context->SetTaskErrCode(errCode);
1417 return errCode;
1418 }
1419 SyncEntry syncData;
1420 errCode = GetReSendData(syncData, context, reSendInfo);
1421 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1422 return errCode;
1423 }
1424 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1425 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1426 if (packet == nullptr) {
1427 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1428 return -E_OUT_OF_MEMORY;
1429 }
1430 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1431 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1432 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1433 LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1434 return errCode;
1435 }
1436 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1437 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1438 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1439 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1440 dataTime.deleteEndTime += 1;
1441 }
1442 if (reSendInfo.end > reSendInfo.start) {
1443 dataTime.endTime += 1;
1444 }
1445 errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1446 if (errCode != E_OK) {
1447 LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1448 }
1449 }
1450 return errCode;
1451 }
1452
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1453 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1454 uint32_t sessionId, uint32_t sequenceId)
1455 {
1456 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1457 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1458 if (message == nullptr) {
1459 LOGE("[DataSync][SendDataPacket] new message error");
1460 delete packet;
1461 packet = nullptr;
1462 return -E_OUT_OF_MEMORY;
1463 }
1464 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1465 int errCode = message->SetExternalObject(packet);
1466 if (errCode != E_OK) {
1467 delete packet;
1468 packet = nullptr;
1469 delete message;
1470 message = nullptr;
1471 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1472 return errCode;
1473 }
1474 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1475 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1476 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1477 };
1478 errCode = Send(context, message, handler, packetLen);
1479 if (errCode != E_OK) {
1480 delete message;
1481 message = nullptr;
1482 }
1483 return errCode;
1484 }
1485
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1486 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1487 {
1488 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1489 int mode = SyncOperation::TransferSyncMode(inMode);
1490 // for pull mode it just need to get data, no need to send data.
1491 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1492 return E_OK;
1493 }
1494 if (context->GetSendPermitCheck()) {
1495 return E_OK;
1496 }
1497 bool isPermitSync = true;
1498 std::string deviceId = context->GetDeviceId();
1499 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1500 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1501 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1502 }
1503 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1504 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1505 if (isPermitSync) {
1506 context->SetSendPermitCheck(true);
1507 return E_OK;
1508 }
1509 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1510 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1511 return -E_SECURITY_OPTION_CHECK_ERROR;
1512 }
1513 if (mode == SyncModeType::RESPONSE_PULL) {
1514 SyncEntry syncData;
1515 (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1516 return -E_SECURITY_OPTION_CHECK_ERROR;
1517 }
1518 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1519 return -E_SECURITY_OPTION_CHECK_ERROR;
1520 }
1521 return E_OK;
1522 }
1523
GetLabel() const1524 std::string SingleVerDataSync::GetLabel() const
1525 {
1526 return label_;
1527 }
1528
GetDeviceId() const1529 std::string SingleVerDataSync::GetDeviceId() const
1530 {
1531 return deviceId_;
1532 }
1533
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1534 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1535 {
1536 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1537 if (packet == nullptr) {
1538 LOGE("[WaterMarkErrHandle] get packet object failed");
1539 return -E_INVALID_ARGS;
1540 }
1541 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1542 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1543 WaterMark peerMark = 0;
1544 WaterMark deletedMark = 0;
1545 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1546 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1547 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1548 }
1549 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1550 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1551 context->SetReceiveWaterMarkErr(true);
1552 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1553 return true;
1554 }
1555 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1556 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1557 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1558 peerMark);
1559 context->SetReceiveWaterMarkErr(true);
1560 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1561 return true;
1562 }
1563 return false;
1564 }
1565
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1566 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1567 {
1568 auto *packet = message->GetObject<DataRequestPacket>();
1569 if (packet == nullptr) {
1570 return -E_INVALID_ARGS;
1571 }
1572 if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1573 return E_OK;
1574 }
1575 auto query = packet->GetQuery();
1576 std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1577 if (!schemaSyncStatus.second) {
1578 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1579 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1580 return -E_NEED_ABILITY_SYNC;
1581 }
1582 if (!schemaSyncStatus.first) {
1583 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1584 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1585 return -E_SCHEMA_MISMATCH;
1586 }
1587 return E_OK;
1588 }
1589
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1590 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1591 {
1592 int mode = SyncOperation::TransferSyncMode(inMode);
1593 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1594 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1595 return;
1596 }
1597 if (storage_ == nullptr) {
1598 LOGE("RemotePushFinished fail, storage is nullptr.");
1599 return;
1600 }
1601
1602 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1603 storage_->NotifyRemotePushFinished(deviceId_);
1604 }
1605 }
1606
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1607 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1608 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1609 {
1610 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1611 WaterMark localMark = 0;
1612 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1613 ackPacket.SetRecvCode(recvCode);
1614 UpdateWaterMark isUpdateWaterMark;
1615 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(
1616 SyncOperation::GetSyncType(packet->GetMode()), packet->GetData(), isUpdateWaterMark);
1617 bool isPacketWaterLower = false;
1618 // send ack packet
1619 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1620 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1621 } else if (recvCode != WATER_MARK_INVALID) {
1622 WaterMark mark = 0;
1623 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1624 if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetLocalWaterMark() < mark) {
1625 LOGI("[DataSync][SetAckPacket] packetLocalMark=%" PRIu64 ",lockMark=%" PRIu64, packet->GetLocalWaterMark(),
1626 mark);
1627 isPacketWaterLower = true;
1628 dataTime.endTime = packet->GetLocalWaterMark();
1629 mark = packet->GetLocalWaterMark();
1630 }
1631 ackPacket.SetData(mark);
1632 }
1633 std::vector<uint64_t> reserved {localMark};
1634 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1635 uint64_t packetId = 0;
1636 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1637 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1638 }
1639 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1640 reserved.push_back(packetId);
1641 }
1642 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1643 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1644 WaterMark deletedPeerMark;
1645 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1646 if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetDeletedWaterMark() < deletedPeerMark) {
1647 LOGI("[DataSync][SetAckPacket] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64,
1648 packet->GetDeletedWaterMark(), deletedPeerMark);
1649 isPacketWaterLower = true;
1650 dataTime.deleteEndTime = packet->GetDeletedWaterMark();
1651 deletedPeerMark = packet->GetDeletedWaterMark();
1652 }
1653 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1654 }
1655 ackPacket.SetReserved(reserved);
1656 ackPacket.SetVersion(version);
1657 if (isPacketWaterLower) {
1658 UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1659 }
1660 }
1661
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1662 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1663 DataSyncReSendInfo reSendInfo)
1664 {
1665 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1666 if (mode == SyncModeType::PULL) {
1667 return E_OK;
1668 }
1669 ContinueToken token = nullptr;
1670 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1671 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1672 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1673 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1674 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1675 int errCode;
1676 if (curType != SyncType::QUERY_SYNC_TYPE) {
1677 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1678 reSendDataSizeInfo);
1679 } else {
1680 QuerySyncObject queryObj = context->GetQuery();
1681 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1682 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1683 }
1684 if (token != nullptr) {
1685 storage_->ReleaseContinueToken(token);
1686 }
1687 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1688 context->SetTaskErrCode(errCode);
1689 return errCode;
1690 }
1691 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1692 return errCode;
1693 }
1694 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1695 DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1696
1697 int innerCode = InterceptData(syncData);
1698 if (innerCode != E_OK) {
1699 context->SetTaskErrCode(innerCode);
1700 return innerCode;
1701 }
1702
1703 bool needCompressOnSync = false;
1704 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1705 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1706 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1707 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1708 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1709 { remoteAlgo, version });
1710 if (compressCode != E_OK) {
1711 return compressCode;
1712 }
1713 }
1714 return errCode;
1715 }
1716
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1717 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1718 {
1719 if (context == nullptr) {
1720 LOGE("[SingleVerDataSync][RemoveDeviceDataIfNeed] context is nullptr.");
1721 return -E_INVALID_ARGS;
1722 }
1723 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1724 return E_OK;
1725 }
1726 uint64_t clearRemoteDataMark = 0;
1727 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1728 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1729 if (clearRemoteDataMark == 0) {
1730 return E_OK;
1731 }
1732 int errCode = E_OK;
1733 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1734 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1735 if (errCode != E_OK) {
1736 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1737 return errCode;
1738 }
1739 }
1740 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1741 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1742 if (errCode != E_OK) {
1743 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1744 return errCode;
1745 }
1746 }
1747 return E_OK;
1748 }
1749
UpdateMtuSize()1750 void SingleVerDataSync::UpdateMtuSize()
1751 {
1752 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1753 }
1754
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1755 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1756 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1757 {
1758 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1759 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1760 WaterMark peerMark = 0;
1761 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1762 peerMark);
1763 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1764 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1765 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1766 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1767 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1768 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1769 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1770 packet->SetLastSequence();
1771 }
1772 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1773 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1774 sendCode = SEND_FINISHED;
1775 }
1776 packet->SetData(syncData.entries);
1777 packet->SetCompressData(syncData.compressedEntries);
1778 packet->SetBasicInfo(sendCode, version, reSendMode);
1779 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1780 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1781 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1782 packet->SetEndWaterMark(context->GetEndMark());
1783 packet->SetQuery(context->GetQuery());
1784 }
1785 packet->SetQueryId(context->GetQuerySyncId());
1786 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1787 std::vector<uint64_t> reserved {reSendInfo.packetId};
1788 packet->SetReserved(reserved);
1789 }
1790 if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1791 // resend pull packet dont set compress type
1792 return;
1793 }
1794 bool needCompressOnSync = false;
1795 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1796 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1797 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1798 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1799 packet->SetCompressDataMark();
1800 packet->SetCompressAlgo(curAlgo);
1801 }
1802 FillRequestReSendPacketV2(context, packet);
1803 }
1804
FillRequestReSendPacketV2(const SingleVerSyncTaskContext * context,DataRequestPacket * packet)1805 void SingleVerDataSync::FillRequestReSendPacketV2(const SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1806 {
1807 if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1808 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1809 uint32_t total = 0u;
1810 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1811 packet->SetTotalDataCount(total);
1812 }
1813 }
1814
GetDataSizeSpecInfo(size_t packetSize)1815 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1816 {
1817 bool needCompressOnSync = false;
1818 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1819 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1820 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1821 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1822 return {blockSize, packetSize};
1823 }
1824
InterceptData(SyncEntry & syncEntry)1825 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1826 {
1827 if (storage_ == nullptr) {
1828 LOGE("Invalid DB. Can not intercept data.");
1829 return -E_INVALID_DB;
1830 }
1831
1832 // GetLocalDeviceName get local device ID.
1833 // GetDeviceId get remote device ID.
1834 // If intercept data fail, entries will be released.
1835 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1836 }
1837
ControlCmdStart(SingleVerSyncTaskContext * context)1838 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1839 {
1840 if (context == nullptr) {
1841 return -E_INVALID_ARGS;
1842 }
1843 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1844 if (subManager == nullptr) {
1845 return -E_INVALID_ARGS;
1846 }
1847 int errCode = ControlCmdStartCheck(context);
1848 if (errCode != E_OK) {
1849 return errCode;
1850 }
1851 ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1852 if (packet == nullptr) {
1853 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1854 return -E_OUT_OF_MEMORY;
1855 }
1856 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1857 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1858 if (errCode != E_OK) {
1859 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1860 delete packet;
1861 packet = nullptr;
1862 return errCode;
1863 }
1864 }
1865 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1866 errCode = SendControlPacket(packet, context);
1867 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1868 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1869 }
1870 return errCode;
1871 }
1872
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1873 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1874 {
1875 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1876 if (packet == nullptr) {
1877 return -E_INVALID_ARGS;
1878 }
1879 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1880 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1881 int errCode = ControlCmdRequestRecvPre(context, message);
1882 if (errCode != E_OK) {
1883 return errCode;
1884 }
1885 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1886 errCode = SubscribeRequestRecv(context, message);
1887 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1888 errCode = UnsubscribeRequestRecv(context, message);
1889 }
1890 return errCode;
1891 }
1892
UpdatePeerWaterMarkInner(const DataRequestPacket & packet,const SyncTimeRange & dataTime,const UpdateWaterMark & isUpdateWaterMark,const SingleVerSyncTaskContext * context)1893 void SingleVerDataSync::UpdatePeerWaterMarkInner(const DataRequestPacket &packet, const SyncTimeRange &dataTime,
1894 const UpdateWaterMark &isUpdateWaterMark, const SingleVerSyncTaskContext *context)
1895 {
1896 SyncType curType = SyncOperation::GetSyncType(packet.GetMode());
1897 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1898 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1899 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet.IsNeedUpdateWaterMark()) {
1900 UpdateQueryPeerWaterMark(curType, packet.GetQueryId(), dataTime, context, isUpdateWaterMark);
1901 }
1902 }
1903 } // namespace DistributedDB
1904