1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 storage_ = nullptr;
44 communicateHandle_ = nullptr;
45 metadata_ = nullptr;
46 }
47
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52 return -E_INVALID_ARGS;
53 }
54 storage_ = static_cast<SyncGenericInterface *>(inStorage);
55 communicateHandle_ = inCommunicateHandle;
56 metadata_ = inMetadata;
57 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58 std::vector<uint8_t> label = inStorage->GetIdentifier();
59 label.resize(3); // only show 3 Bytes enough
60 label_ = DBCommon::VectorToHexString(label);
61 deviceId_ = deviceId;
62 msgSchedule_.Initialize(label_, deviceId_);
63 return E_OK;
64 }
65
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68 std::lock_guard<std::mutex> lock(lock_);
69 int errCode = CheckPermitSendData(mode, context);
70 if (errCode != E_OK) {
71 return errCode;
72 }
73 if (sessionId_ != 0) { // auto sync timeout resend
74 return ReSendData(context);
75 }
76 ResetSyncStatus(mode, context);
77 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78 int tmpMode = SyncOperation::TransferSyncMode(mode);
79 if (tmpMode == SyncModeType::PUSH) {
80 errCode = PushStart(context);
81 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82 errCode = PushPullStart(context);
83 } else if (tmpMode == SyncModeType::PULL) {
84 errCode = PullRequestStart(context);
85 } else {
86 SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
87 errCode = PullResponseStart(context);
88 }
89 if (context->IsSkipTimeoutError(errCode)) {
90 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
91 // just return to avoid higher pressure for send.
92 return E_OK;
93 }
94 if (errCode != E_OK) {
95 LOGE("[DataSync] SendStart errCode=%d", errCode);
96 return errCode;
97 }
98 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
99 LOGE("wait for recv finished for push and pull mode");
100 return -E_EKEYREVOKED;
101 }
102 return InnerSyncStart(context);
103 }
104
InnerSyncStart(SingleVerSyncTaskContext * context)105 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
106 {
107 int errCode = CheckPermitSendData(mode_, context);
108 if (errCode != E_OK) {
109 return errCode;
110 }
111 while (true) {
112 if (windowSize_ <= 0 || isAllDataHasSent_) {
113 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
114 label_.c_str(), STR_MASK(deviceId_));
115 return E_OK;
116 }
117 int mode = SyncOperation::TransferSyncMode(mode_);
118 if (mode == SyncModeType::PULL) {
119 LOGE("[DataSync] unexpected error");
120 return -E_INVALID_ARGS;
121 }
122 context->IncSequenceId();
123 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
124 errCode = PushStart(context);
125 } else {
126 errCode = PullResponseStart(context);
127 }
128 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
129 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
130 isAllDataHasSent_ = true;
131 return -E_EKEYREVOKED;
132 }
133 if (context->IsSkipTimeoutError(errCode)) {
134 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
135 // just return to avoid higher pressure for send.
136 return E_OK;
137 }
138 if (errCode != E_OK) {
139 LOGE("[DataSync] InnerSend errCode=%d", errCode);
140 return errCode;
141 }
142 }
143 return E_OK;
144 }
145
InnerClearSyncStatus()146 void SingleVerDataSync::InnerClearSyncStatus()
147 {
148 sessionId_ = 0;
149 reSendMap_.clear();
150 windowSize_ = 0;
151 maxSequenceIdHasSent_ = 0;
152 isAllDataHasSent_ = false;
153 }
154
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)155 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
156 {
157 if (message == nullptr) {
158 LOGE("[DataSync] AckRecv message nullptr");
159 return -E_INVALID_ARGS;
160 }
161 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
162 if (packet == nullptr) {
163 return -E_INVALID_ARGS;
164 }
165 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
166 uint32_t sessionId = message->GetSessionId();
167 uint32_t sequenceId = message->GetSequenceId();
168
169 std::lock_guard<std::mutex> lock(lock_);
170 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
171 windowSize_, label_.c_str(), STR_MASK(deviceId_));
172 if (sessionId != sessionId_) {
173 LOGI("[DataSync] ignore ack,sessionId is different");
174 return E_OK;
175 }
176 Timestamp lastQueryTime = 0;
177 if (reSendMap_.count(sequenceId) != 0) {
178 lastQueryTime = reSendMap_[sequenceId].end;
179 reSendMap_.erase(sequenceId);
180 windowSize_++;
181 } else {
182 LOGI("[DataSync] ack seqId not in map");
183 return E_OK;
184 }
185 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
186 Timestamp dbLastQueryTime = 0;
187 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
188 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
189 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
190 }
191 if (errCode != E_OK) {
192 return errCode;
193 }
194 }
195 if (!isAllDataHasSent_) {
196 return InnerSyncStart(context);
197 } else if (reSendMap_.empty()) {
198 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
199 InnerClearSyncStatus();
200 return -E_FINISHED;
201 }
202 return E_OK;
203 }
204
ClearSyncStatus()205 void SingleVerDataSync::ClearSyncStatus()
206 {
207 std::lock_guard<std::mutex> lock(lock_);
208 InnerClearSyncStatus();
209 }
210
ReSendData(SingleVerSyncTaskContext * context)211 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
212 {
213 if (reSendMap_.empty()) {
214 LOGI("[DataSync] ReSend map empty");
215 return -E_INTERNAL_ERROR;
216 }
217 uint32_t sequenceId = reSendMap_.begin()->first;
218 ReSendInfo reSendInfo = reSendMap_.begin()->second;
219 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
220 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
221 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
222 windowSize_, label_.c_str(), STR_MASK(deviceId_));
223 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
224 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
225 return ReSend(context, dataReSendInfo);
226 }
227
GetLocalDeviceName()228 std::string SingleVerDataSync::GetLocalDeviceName()
229 {
230 std::string deviceInfo;
231 if (communicateHandle_ != nullptr) {
232 communicateHandle_->GetLocalIdentity(deviceInfo);
233 }
234 return deviceInfo;
235 }
236
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)237 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
238 uint32_t packetLen)
239 {
240 bool startFeedDogRet = false;
241 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
242 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
243 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
244 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
245 }
246 SendConfig sendConfig;
247 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
248 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
249 if (errCode != E_OK) {
250 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
251 if (startFeedDogRet) {
252 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
253 }
254 }
255 return errCode;
256 }
257
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)258 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
259 std::vector<SendDataItem> &outData, size_t packetSize)
260 {
261 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
262 if (performance != nullptr) {
263 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
264 }
265 // start a watch dog before get data
266 // it will send ack util get data finished
267 context->StartFeedDogForGetData(context->GetResponseSessionId());
268 int errCode = GetData(context, packetSize, outData);
269 context->StopFeedDogForGetData();
270 if (performance != nullptr) {
271 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
272 }
273 if (!outData.empty()) {
274 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
275 }
276 return errCode;
277 }
278
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)279 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
280 {
281 int errCode;
282 UpdateMtuSize();
283 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
284 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
285 LOGI("[DataSync][GetData] resend data");
286 errCode = GetUnsyncData(context, outData, packetSize);
287 } else {
288 ContinueToken token;
289 context->GetContinueToken(token);
290 if (token == nullptr) {
291 errCode = GetUnsyncData(context, outData, packetSize);
292 } else {
293 LOGD("[DataSync][GetData] get data from token");
294 // if there is data to send, read out data, and update local watermark, send data
295 errCode = GetNextUnsyncData(context, outData, packetSize);
296 }
297 }
298 if (errCode == -E_UNFINISHED) {
299 LOGD("[DataSync][GetData] not finished.");
300 }
301 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
303 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
304 }
305 return errCode;
306 }
307
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)308 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
309 {
310 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
311 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
312 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
313 bool needCompressOnSync = false;
314 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
315 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
316 int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
317 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
318 context->SetTaskErrCode(errCode);
319 return errCode;
320 }
321
322 int innerCode = InterceptData(syncOutData);
323 if (innerCode != E_OK) {
324 context->SetTaskErrCode(innerCode);
325 return innerCode;
326 }
327
328 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
329 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
330 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
331 { remoteAlgo, version });
332 if (compressCode != E_OK) {
333 return compressCode;
334 }
335 }
336 return errCode;
337 }
338
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)339 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
340 size_t packetSize)
341 {
342 SyncTimeRange waterRange;
343 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
344 WaterMark startMark = 0;
345 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
346 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
347 if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
348 return E_OK;
349 }
350 waterRange.beginTime = startMark;
351 if (curType == SyncType::QUERY_SYNC_TYPE) {
352 WaterMark deletedStartMark = 0;
353 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
354 Timestamp lastQueryTimestamp = 0;
355 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
356 lastQueryTimestamp);
357 if (errCode != E_OK) {
358 return errCode;
359 }
360 waterRange.deleteBeginTime = deletedStartMark;
361 waterRange.lastQueryTime = lastQueryTimestamp;
362 }
363 return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
364 }
365
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)366 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
367 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
368 {
369 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
370 ContinueToken token = nullptr;
371 context->GetContinueToken(token);
372 if (token != nullptr) {
373 storage_->ReleaseContinueToken(token);
374 }
375 int errCode;
376 if (curType != SyncType::QUERY_SYNC_TYPE) {
377 errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
378 syncDataSizeInfo);
379 } else {
380 QuerySyncObject queryObj = context->GetQuery();
381 errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
382 }
383 context->SetContinueToken(token);
384 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
385 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
386 }
387 return errCode;
388 }
389
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)390 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
391 size_t packetSize)
392 {
393 ContinueToken token;
394 context->GetContinueToken(token);
395 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
396 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
397 context->SetContinueToken(token);
398 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
399 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
400 }
401 return errCode;
402 }
403
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)404 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
405 SyncType curType, const QuerySyncObject &query)
406 {
407 if (inData.empty()) {
408 return E_OK;
409 }
410 SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
411 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
412 if (performance != nullptr) {
413 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
414 }
415 const auto localDeviceName = GetLocalDeviceName();
416 const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
417 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
418 std::vector<SendDataItem> copyData = inData;
419 int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
420 if (errCode != E_OK) {
421 LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
422 return errCode;
423 }
424 // query only support prefix key and don't have query in packet in 104 version
425 errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
426 if (performance != nullptr) {
427 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428 }
429 if (errCode != E_OK) {
430 LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
431 }
432 return errCode;
433 }
434
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437 mode_ = inMode;
438 maxSequenceIdHasSent_ = 0;
439 isAllDataHasSent_ = false;
440 context->ReSetSequenceId();
441 reSendMap_.clear();
442 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443 windowSize_ = LOW_VERSION_WINDOW_SIZE;
444 } else {
445 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446 }
447 int mode = SyncOperation::TransferSyncMode(inMode);
448 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449 sessionId_ = context->GetRequestSessionId();
450 } else {
451 sessionId_ = context->GetResponseSessionId();
452 }
453 }
454
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458 WaterMark localMark = 0;
459 WaterMark deleteMark = 0;
460 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461 GetLocalDeleteSyncWaterMark(context, deleteMark);
462 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468 WaterMark localMark = 0;
469 int errCode = E_OK;
470 const std::string &deviceId = context->GetDeviceId();
471 std::string queryId = context->GetQuerySyncId();
472 if (syncType != SyncType::QUERY_SYNC_TYPE) {
473 if (isCheckBeforUpdate) {
474 GetLocalWaterMark(syncType, queryId, context, localMark);
475 if (localMark >= dataTimeRange.endTime) {
476 return E_OK;
477 }
478 }
479 errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
480 } else {
481 bool isNeedUpdateMark = true;
482 bool isNeedUpdateDeleteMark = true;
483 if (isCheckBeforUpdate) {
484 WaterMark deleteDataWaterMark = 0;
485 GetLocalWaterMark(syncType, queryId, context, localMark);
486 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487 if (localMark >= dataTimeRange.endTime) {
488 isNeedUpdateMark = false;
489 }
490 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491 isNeedUpdateDeleteMark = false;
492 }
493 }
494 if (isNeedUpdateMark) {
495 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
497 if (errCode != E_OK) {
498 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
499 return errCode;
500 }
501 }
502 if (isNeedUpdateDeleteMark) {
503 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
504 dataTimeRange.deleteEndTime);
505 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
506 }
507 }
508 if (errCode != E_OK) {
509 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
510 }
511 return errCode;
512 }
513
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const514 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
515 const DeviceID &deviceId, WaterMark &waterMark) const
516 {
517 if (syncType != SyncType::QUERY_SYNC_TYPE) {
518 metadata_->GetPeerWaterMark(deviceId, waterMark);
519 return;
520 }
521 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
522 }
523
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)524 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
525 {
526 metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
527 }
528
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const529 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
530 WaterMark &waterMark) const
531 {
532 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
533 }
534
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const535 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
536 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
537 {
538 if (syncType != SyncType::QUERY_SYNC_TYPE) {
539 metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
540 return;
541 }
542 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
543 waterMark, context->IsAutoLiftWaterMark());
544 }
545
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)546 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
547 WaterMark maxSendDataTime)
548 {
549 bool isNeedClearRemoteData = false;
550 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
551 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
552 uint64_t clearDeviceDataMark = 0;
553 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
554 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
555 } else {
556 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
557 if (packet == nullptr) {
558 LOGE("[RemoveDeviceDataHandle] get packet object failed");
559 return -E_INVALID_ARGS;
560 }
561 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
562 WaterMark packetLocalMark = packet->GetLocalWaterMark();
563 WaterMark peerMark = 0;
564 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
565 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
566 }
567 if (!isNeedClearRemoteData) {
568 return E_OK;
569 }
570 int errCode = E_OK;
571 if (context->IsNeedClearRemoteStaleData()) {
572 // need to clear remote device history data
573 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
574 if (errCode != E_OK) {
575 (void)SendDataAck(context, message, errCode, maxSendDataTime);
576 return errCode;
577 }
578 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
579 // avoid repeat clear in ack
580 metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
581 }
582 }
583 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
584 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
585 if (errCode != E_OK) {
586 (void)SendDataAck(context, message, errCode, maxSendDataTime);
587 return errCode;
588 }
589 }
590 return E_OK;
591 }
592
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)593 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
594 const std::vector<uint64_t> &reserved)
595 {
596 bool isNeedClearRemoteData = false;
597 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
598 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
599 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
600 uint64_t clearDeviceDataMark = 0;
601 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
602 isNeedClearRemoteData = (clearDeviceDataMark != 0);
603 } else if (reserved.empty()) {
604 WaterMark localMark = 0;
605 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
606 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
607 } else {
608 WaterMark peerMark = 0;
609 GetPeerWaterMark(curType, context->GetQuerySyncId(),
610 context->GetDeviceId(), peerMark);
611 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
612 }
613 if (!isNeedClearRemoteData) {
614 return E_OK;
615 }
616 // need to clear remote historydata
617 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
618 label_.c_str(), STR_MASK(GetDeviceId()));
619 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
620 if (errCode != E_OK) {
621 return errCode;
622 }
623 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
624 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
625 }
626 return errCode;
627 }
628
SetSessionEndTimestamp(Timestamp end)629 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
630 {
631 sessionEndTimestamp_ = end;
632 }
633
GetSessionEndTimestamp() const634 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
635 {
636 return sessionEndTimestamp_;
637 }
638
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)639 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
640 {
641 ReSendInfo reSendInfo;
642 reSendInfo.start = dataTimeRange.beginTime;
643 reSendInfo.end = dataTimeRange.endTime;
644 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
645 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
646 reSendInfo.packetId = context->GetPacketId();
647 maxSequenceIdHasSent_++;
648 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
649 windowSize_--;
650 ContinueToken token;
651 context->GetContinueToken(token);
652 if (token == nullptr) {
653 isAllDataHasSent_ = true;
654 }
655 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
656 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
657 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
658 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
659 }
660
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)661 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
662 SyncEntry &syncData, int sendCode, int mode)
663 {
664 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
665 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
666 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
667 WaterMark localMark = 0;
668 WaterMark peerMark = 0;
669 WaterMark deleteMark = 0;
670 bool needCompressOnSync = false;
671 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
672 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
673 std::string id = context->GetQuerySyncId();
674 GetLocalWaterMark(curType, id, context, localMark);
675 GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
676 GetLocalDeleteSyncWaterMark(context, deleteMark);
677 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
678 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
679 packet->SetLastSequence();
680 }
681 int tmpMode = mode;
682 if (mode == SyncModeType::RESPONSE_PULL) {
683 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
684 }
685 packet->SetData(syncData.entries);
686 packet->SetCompressData(syncData.compressedEntries);
687 packet->SetBasicInfo(sendCode, version, tmpMode);
688 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
689 packet->SetWaterMark(localMark, peerMark, deleteMark);
690 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
691 packet->SetEndWaterMark(context->GetEndMark());
692 packet->SetSessionId(context->GetRequestSessionId());
693 }
694 packet->SetQuery(context->GetQuery());
695 packet->SetQueryId(context->GetQuerySyncId());
696 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
697 // empty compress data should not mark compress
698 if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
699 packet->SetCompressDataMark();
700 packet->SetCompressAlgo(curAlgo);
701 }
702 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
703 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
704 context->GetQuery().HasOrderBy())) {
705 packet->SetUpdateWaterMark();
706 }
707 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
708 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
709 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
710 }
711
RequestStart(SingleVerSyncTaskContext * context,int mode)712 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
713 {
714 int errCode = QuerySyncCheck(context);
715 if (errCode != E_OK) {
716 LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
717 return errCode;
718 }
719 errCode = RemoveDeviceDataIfNeed(context);
720 if (errCode != E_OK) {
721 context->SetTaskErrCode(errCode);
722 return errCode;
723 }
724 SyncEntry syncData;
725 // get data
726 errCode = GetMatchData(context, syncData);
727 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
728
729 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
730 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
731 return errCode;
732 }
733
734 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
735 if (packet == nullptr) {
736 LOGE("[DataSync][PushStart] new DataRequestPacket error");
737 return -E_OUT_OF_MEMORY;
738 }
739 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
740 UpdateWaterMark isUpdateWaterMark;
741 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
742 if (errCode == E_OK) {
743 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
744 }
745 FillDataRequestPacket(packet, context, syncData, errCode, mode);
746 errCode = SendDataPacket(curType, packet, context);
747 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
748 if (performance != nullptr) {
749 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
750 }
751 if (errCode == E_OK || errCode == -E_TIMEOUT) {
752 UpdateSendInfo(dataTime, context);
753 }
754 if (errCode == E_OK) {
755 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
756 context->GetQuery().HasOrderBy())) {
757 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
758 return E_OK;
759 }
760 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
761 SaveLocalWaterMark(curType, context, tmpDataTime);
762 }
763 return errCode;
764 }
765
PushStart(SingleVerSyncTaskContext * context)766 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
767 {
768 if (context == nullptr) {
769 return -E_INVALID_ARGS;
770 }
771 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
772 return RequestStart(context,
773 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
774 }
775
PushPullStart(SingleVerSyncTaskContext * context)776 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
777 {
778 if (context == nullptr) {
779 return -E_INVALID_ARGS;
780 }
781 return RequestStart(context, context->GetMode());
782 }
783
PullRequestStart(SingleVerSyncTaskContext * context)784 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
785 {
786 if (context == nullptr) {
787 return -E_INVALID_ARGS;
788 }
789 int errCode = QuerySyncCheck(context);
790 if (errCode != E_OK) {
791 return errCode;
792 }
793 errCode = RemoveDeviceDataIfNeed(context);
794 if (errCode != E_OK) {
795 context->SetTaskErrCode(errCode);
796 return errCode;
797 }
798 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
799 if (packet == nullptr) {
800 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
801 return -E_OUT_OF_MEMORY;
802 }
803 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
804 WaterMark peerMark = 0;
805 WaterMark localMark = 0;
806 WaterMark deleteMark = 0;
807 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
808 context->GetDeviceId(), peerMark);
809 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
810 GetLocalDeleteSyncWaterMark(context, deleteMark);
811 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
812 WaterMark endMark = context->GetEndMark();
813 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
814 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
815 packet->SetBasicInfo(E_OK, version, context->GetMode());
816 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
817 packet->SetWaterMark(localMark, peerMark, deleteMark);
818 packet->SetEndWaterMark(endMark);
819 packet->SetSessionId(context->GetRequestSessionId());
820 packet->SetQuery(context->GetQuery());
821 packet->SetQueryId(context->GetQuerySyncId());
822 packet->SetLastSequence();
823 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
824 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
825 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
826 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
827 label_.c_str(), STR_MASK(GetDeviceId()));
828 UpdateSendInfo(dataTime, context);
829 return SendDataPacket(syncType, packet, context);
830 }
831
PullResponseStart(SingleVerSyncTaskContext * context)832 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
833 {
834 if (context == nullptr) {
835 return -E_INVALID_ARGS;
836 }
837 SyncEntry syncData;
838 // get data
839 int errCode = GetMatchData(context, syncData);
840 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
841 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
842 (void)SendPullResponseDataPkt(errCode, syncData, context);
843 }
844 return errCode;
845 }
846 // if send finished
847 int ackCode = E_OK;
848 ContinueToken token = nullptr;
849 context->GetContinueToken(token);
850 if (errCode == E_OK && token == nullptr) {
851 LOGD("[DataSync][PullResponse] send last frame end");
852 ackCode = SEND_FINISHED;
853 }
854 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
855 UpdateWaterMark isUpdateWaterMark;
856 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
857 if (errCode == E_OK) {
858 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
859 }
860 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
861 if (errCode == E_OK || errCode == -E_TIMEOUT) {
862 UpdateSendInfo(dataTime, context);
863 }
864 if (errCode == E_OK) {
865 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
866 context->GetQuery().HasOrderBy())) {
867 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
868 return E_OK;
869 }
870 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
871 SaveLocalWaterMark(curType, context, tmpDataTime);
872 }
873 return errCode;
874 }
875
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)876 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
877 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
878 {
879 WaterMark tmpPeerWatermark = dataTime.endTime;
880 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
881 if (isUpdateWaterMark.normalUpdateMark) {
882 tmpPeerWatermark++;
883 }
884 if (isUpdateWaterMark.deleteUpdateMark) {
885 tmpPeerDeletedWatermark++;
886 }
887 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
888 }
889
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)890 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
891 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
892 {
893 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
894 return;
895 }
896 int errCode = E_OK;
897 if (syncType != SyncType::QUERY_SYNC_TYPE) {
898 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
899 } else {
900 if (peerWatermark != 0) {
901 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
902 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
903 if (errCode != E_OK) {
904 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
905 }
906 }
907 if (peerDeletedWatermark != 0) {
908 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
909 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
910 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
911 }
912 }
913 if (errCode != E_OK) {
914 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
915 }
916 }
917
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)918 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
919 {
920 uint16_t remoteCommunicatorVersion = 0;
921 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
922 -E_NOT_FOUND) {
923 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
924 return -E_VERSION_NOT_SUPPORT;
925 }
926 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
927 if (remoteCommunicatorVersion == 0) {
928 LOGI("[DataSync] set remote version 0");
929 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
930 return E_OK;
931 } else {
932 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
933 if (isControlMsg) {
934 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
935 } else {
936 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
937 }
938 return -E_NEED_ABILITY_SYNC;
939 }
940 }
941
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)942 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
943 {
944 if (context == nullptr || message == nullptr) {
945 return -E_INVALID_ARGS;
946 }
947 auto *packet = message->GetObject<DataRequestPacket>();
948 if (packet == nullptr) {
949 return -E_INVALID_ARGS;
950 }
951 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
952 return DoAbilitySyncIfNeed(context, message);
953 }
954 int32_t sendCode = packet->GetSendCode();
955 if (sendCode == -E_VERSION_NOT_SUPPORT) {
956 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
957 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
958 return -E_WAIT_NEXT_MESSAGE;
959 }
960 // only deal with pull response packet errCode
961 if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
962 message->GetSessionId() == context->GetRequestSessionId()) {
963 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
964 return sendCode;
965 }
966 int errCode = RunPermissionCheck(context, message, packet);
967 if (errCode != E_OK) {
968 return errCode;
969 }
970 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
971 errCode = CheckSchemaStrategy(context, message);
972 }
973 if (errCode == E_OK) {
974 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
975 }
976 if (errCode != E_OK) {
977 (void)SendDataAck(context, message, errCode, 0);
978 return errCode;
979 }
980 errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
981 if (errCode != E_OK) {
982 (void)SendDataAck(context, message, errCode, 0);
983 }
984 return errCode;
985 }
986
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)987 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
988 WaterMark &pullEndWatermark)
989 {
990 int errCode = DataRequestRecvPre(context, message);
991 if (errCode != E_OK) {
992 return errCode;
993 }
994 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
995 const std::vector<SendDataItem> &data = packet->GetData();
996 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
997 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
998 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
999 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
1000 context->SetReceiveWaterMarkErr(false);
1001 UpdateWaterMark isUpdateWaterMark;
1002 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1003 errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1004 if (errCode != E_OK) {
1005 return errCode;
1006 }
1007 Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1008 if (WaterMarkErrHandle(curType, context, message)) {
1009 return E_OK;
1010 }
1011 GetPullEndWatermark(context, packet, pullEndWatermark);
1012 // save data first
1013 errCode = SaveData(context, data, curType, packet->GetQuery());
1014 if (errCode != E_OK) {
1015 (void)SendDataAck(context, message, errCode, dataTime.endTime);
1016 return errCode;
1017 }
1018 SingleVerDataSyncUtils::UpdateSyncProcess(context, packet, data.size());
1019
1020 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1021 pullEndWatermark = 0;
1022 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1023 } else {
1024 // if data is empty, we don't know the max timestap of this packet.
1025 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1026 }
1027 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1028 context->GetRequestSessionId());
1029 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1030 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1031 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1032 UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1033 }
1034 if (errCode != E_OK) {
1035 return errCode;
1036 }
1037 if (packet->GetSendCode() == SEND_FINISHED) {
1038 return -E_RECV_FINISHED;
1039 }
1040 return errCode;
1041 }
1042
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1043 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1044 SingleVerSyncTaskContext *context)
1045 {
1046 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1047 if (message == nullptr) {
1048 LOGE("[DataSync][SendDataPacket] new message error");
1049 delete packet;
1050 packet = nullptr;
1051 return -E_OUT_OF_MEMORY;
1052 }
1053 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1054 int errCode = message->SetExternalObject(packet);
1055 if (errCode != E_OK) {
1056 delete packet;
1057 packet = nullptr;
1058 delete message;
1059 message = nullptr;
1060 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1061 return errCode;
1062 }
1063 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1064 context->GetSequenceId(), context->GetRequestSessionId());
1065 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1066 if (performance != nullptr) {
1067 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1068 }
1069 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1070 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1071 };
1072 errCode = Send(context, message, handler, packetLen);
1073 if (errCode != E_OK) {
1074 delete message;
1075 message = nullptr;
1076 }
1077
1078 return errCode;
1079 }
1080
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1081 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1082 SingleVerSyncTaskContext *context)
1083 {
1084 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1085 if (packet == nullptr) {
1086 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1087 return -E_OUT_OF_MEMORY;
1088 }
1089 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1090 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1091
1092 if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1093 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1094 uint32_t total = 0u;
1095 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1096 LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1097 packet->SetTotalDataCount(total);
1098 }
1099 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1100 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1101 if (message == nullptr) {
1102 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1103 delete packet;
1104 packet = nullptr;
1105 return -E_OUT_OF_MEMORY;
1106 }
1107 int errCode = message->SetExternalObject(packet);
1108 if (errCode != E_OK) {
1109 delete packet;
1110 packet = nullptr;
1111 delete message;
1112 message = nullptr;
1113 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1114 return errCode;
1115 }
1116 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1117 context->GetSequenceId(), context->GetResponseSessionId());
1118 SendResetWatchDogPacket(context, packetLen);
1119 errCode = Send(context, message, nullptr, packetLen);
1120 if (errCode != E_OK) {
1121 delete message;
1122 message = nullptr;
1123 }
1124 return errCode;
1125 }
1126
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1127 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1128 {
1129 (void)SendDataAck(context, message, E_OK, 0);
1130 }
1131
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1132 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1133 WaterMark maxSendDataTime)
1134 {
1135 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1136 if (packet == nullptr) {
1137 return -E_INVALID_ARGS;
1138 }
1139 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1140 if (ackMessage == nullptr) {
1141 LOGE("[DataSync][SendDataAck] new message error");
1142 return -E_OUT_OF_MEMORY;
1143 }
1144 DataAckPacket ack;
1145 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1146 int errCode = ackMessage->SetCopiedObject(ack);
1147 if (errCode != E_OK) {
1148 delete ackMessage;
1149 ackMessage = nullptr;
1150 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1151 return errCode;
1152 }
1153 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1154 message->GetSequenceId(), message->GetSessionId());
1155
1156 errCode = Send(context, ackMessage, nullptr, 0);
1157 if (errCode != E_OK) {
1158 delete ackMessage;
1159 ackMessage = nullptr;
1160 }
1161 return errCode;
1162 }
1163
AckPacketIdCheck(const Message * message)1164 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1165 {
1166 if (message == nullptr) {
1167 LOGE("[DataSync] AckRecv message nullptr");
1168 return false;
1169 }
1170 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1171 return true;
1172 }
1173 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1174 if (packet == nullptr) {
1175 return false;
1176 }
1177 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1178 std::lock_guard<std::mutex> lock(lock_);
1179 uint32_t sequenceId = message->GetSequenceId();
1180 if (reSendMap_.count(sequenceId) != 0) {
1181 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1182 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1183 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1184 originalPacketId);
1185 return false;
1186 }
1187 }
1188 return true;
1189 }
1190
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1191 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1192 {
1193 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1194 if (errCode != E_OK) {
1195 return errCode;
1196 }
1197 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1198 if (packet == nullptr) {
1199 return -E_INVALID_ARGS;
1200 }
1201 int32_t recvCode = packet->GetRecvCode();
1202 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1203 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1204 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1205 LOGE("[DataSync][AckRecv] Version mismatch");
1206 return -E_VERSION_NOT_SUPPORT;
1207 }
1208
1209 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1210 // we should ReleaseContinueToken, avoid crash
1211 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1212 STR_MASK(GetDeviceId()));
1213 context->ReleaseContinueToken();
1214 return recvCode;
1215 }
1216 uint64_t data = packet->GetData();
1217 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1218 return DealWaterMarkException(context, data, packet->GetReserved());
1219 }
1220
1221 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1222 // data only use low 32bit
1223 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1224 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1225 STR_MASK(GetDeviceId()));
1226 }
1227
1228 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1229 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1230 label_.c_str(), STR_MASK(GetDeviceId()));
1231 return recvCode;
1232 }
1233
1234 // Judge if send finished
1235 ContinueToken token;
1236 context->GetContinueToken(token);
1237 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1238 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1239 return -E_NO_DATA_SEND;
1240 }
1241
1242 // send next data
1243 return -E_SEND_DATA;
1244 }
1245
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1246 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1247 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1248 {
1249 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1250 LOGE("[SingleVerDataSync] messageId not available.");
1251 return;
1252 }
1253 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1254 if (ackMessage == nullptr) {
1255 LOGE("[DataSync][SaveDataNotify] new message failed");
1256 return;
1257 }
1258
1259 DataAckPacket ack;
1260 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1261 ack.SetVersion(pktVersion);
1262 int errCode = ackMessage->SetCopiedObject(ack);
1263 if (errCode != E_OK) {
1264 delete ackMessage;
1265 ackMessage = nullptr;
1266 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1267 return;
1268 }
1269 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1270
1271 errCode = Send(context, ackMessage, nullptr, 0);
1272 if (errCode != E_OK) {
1273 delete ackMessage;
1274 ackMessage = nullptr;
1275 }
1276 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1277 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1278 }
1279
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1280 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1281 WaterMark &pullEndWatermark) const
1282 {
1283 if (packet == nullptr) {
1284 return;
1285 }
1286 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1287 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1288 WaterMark endMark = packet->GetEndWaterMark();
1289 TimeOffset offset;
1290 metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1291 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1292 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1293 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1294 }
1295 }
1296
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1297 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1298 const std::vector<uint64_t> &reserved)
1299 {
1300 WaterMark deletedWaterMark = 0;
1301 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1302 if (curType == SyncType::QUERY_SYNC_TYPE) {
1303 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1304 LOGE("[DataSync] get packet reserve size failed");
1305 return -E_INVALID_ARGS;
1306 }
1307 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1308 }
1309 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1310 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1311 int errCode = SaveLocalWaterMark(curType, context,
1312 {0, 0, ackWaterMark, deletedWaterMark});
1313 if (errCode != E_OK) {
1314 return errCode;
1315 }
1316 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1317 context->IncNegotiationCount();
1318 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1319 if (!context->IsNeedClearRemoteStaleData()) {
1320 return -E_RE_SEND_DATA;
1321 }
1322 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1323 if (errCode != E_OK) {
1324 return errCode;
1325 }
1326 return -E_RE_SEND_DATA;
1327 }
1328
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1329 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1330 const DataRequestPacket *packet)
1331 {
1332 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1333 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1334 if (errCode != E_OK) {
1335 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1336 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1337 }
1338 return -E_NOT_PERMIT;
1339 }
1340 const std::vector<SendDataItem> &data = packet->GetData();
1341 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1342 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1343 auto securityOption = packet->GetSecurityOption();
1344 if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1345 securityOption.securityLabel != SecurityLabel::NOT_SET) {
1346 context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1347 }
1348 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1349 !context->GetReceivcPermitCheck()) {
1350 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1351 if (permitReceive) {
1352 context->SetReceivcPermitCheck(true);
1353 } else {
1354 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1355 return -E_SECURITY_OPTION_CHECK_ERROR;
1356 }
1357 }
1358 return errCode;
1359 }
1360
1361 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1362 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1363 {
1364 // When mtu less than 30k, we send data with bluetooth
1365 // In order not to block the bluetooth channel, we don't send notify packet here
1366 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1367 return;
1368 }
1369 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1370
1371 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1372 if (ackMessage == nullptr) {
1373 LOGE("[DataSync][ResetWatchDog] new message failed");
1374 return;
1375 }
1376
1377 DataAckPacket ack;
1378 ack.SetData(data);
1379 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1380 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1381 int errCode = ackMessage->SetCopiedObject(ack);
1382 if (errCode != E_OK) {
1383 delete ackMessage;
1384 ackMessage = nullptr;
1385 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1386 return;
1387 }
1388 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1389 context->GetSequenceId(), context->GetResponseSessionId());
1390
1391 errCode = Send(context, ackMessage, nullptr, 0);
1392 if (errCode != E_OK) {
1393 delete ackMessage;
1394 ackMessage = nullptr;
1395 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1396 STR_MASK(GetDeviceId()));
1397 } else {
1398 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1399 STR_MASK(GetDeviceId()));
1400 }
1401 }
1402
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1403 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1404 {
1405 if (context == nullptr) {
1406 return -E_INVALID_ARGS;
1407 }
1408 int errCode = RemoveDeviceDataIfNeed(context);
1409 if (errCode != E_OK) {
1410 context->SetTaskErrCode(errCode);
1411 return errCode;
1412 }
1413 SyncEntry syncData;
1414 errCode = GetReSendData(syncData, context, reSendInfo);
1415 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1416 return errCode;
1417 }
1418 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1419 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1420 if (packet == nullptr) {
1421 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1422 return -E_OUT_OF_MEMORY;
1423 }
1424 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1425 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1426 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1427 LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1428 return errCode;
1429 }
1430 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1431 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1432 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1433 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1434 dataTime.deleteEndTime += 1;
1435 }
1436 if (reSendInfo.end > reSendInfo.start) {
1437 dataTime.endTime += 1;
1438 }
1439 errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1440 if (errCode != E_OK) {
1441 LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1442 }
1443 }
1444 return errCode;
1445 }
1446
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1447 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1448 uint32_t sessionId, uint32_t sequenceId)
1449 {
1450 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1451 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1452 if (message == nullptr) {
1453 LOGE("[DataSync][SendDataPacket] new message error");
1454 delete packet;
1455 packet = nullptr;
1456 return -E_OUT_OF_MEMORY;
1457 }
1458 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1459 int errCode = message->SetExternalObject(packet);
1460 if (errCode != E_OK) {
1461 delete packet;
1462 packet = nullptr;
1463 delete message;
1464 message = nullptr;
1465 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1466 return errCode;
1467 }
1468 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1469 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1470 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1471 };
1472 errCode = Send(context, message, handler, packetLen);
1473 if (errCode != E_OK) {
1474 delete message;
1475 message = nullptr;
1476 }
1477 return errCode;
1478 }
1479
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1480 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1481 {
1482 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1483 int mode = SyncOperation::TransferSyncMode(inMode);
1484 // for pull mode it just need to get data, no need to send data.
1485 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1486 return E_OK;
1487 }
1488 if (context->GetSendPermitCheck()) {
1489 return E_OK;
1490 }
1491 bool isPermitSync = true;
1492 std::string deviceId = context->GetDeviceId();
1493 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1494 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1495 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1496 }
1497 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1498 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1499 if (isPermitSync) {
1500 context->SetSendPermitCheck(true);
1501 return E_OK;
1502 }
1503 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1504 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1505 return -E_SECURITY_OPTION_CHECK_ERROR;
1506 }
1507 if (mode == SyncModeType::RESPONSE_PULL) {
1508 SyncEntry syncData;
1509 (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1510 return -E_SECURITY_OPTION_CHECK_ERROR;
1511 }
1512 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1513 return -E_SECURITY_OPTION_CHECK_ERROR;
1514 }
1515 return E_OK;
1516 }
1517
GetLabel() const1518 std::string SingleVerDataSync::GetLabel() const
1519 {
1520 return label_;
1521 }
1522
GetDeviceId() const1523 std::string SingleVerDataSync::GetDeviceId() const
1524 {
1525 return deviceId_;
1526 }
1527
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1528 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1529 {
1530 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1531 if (packet == nullptr) {
1532 LOGE("[WaterMarkErrHandle] get packet object failed");
1533 return -E_INVALID_ARGS;
1534 }
1535 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1536 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1537 WaterMark peerMark = 0;
1538 WaterMark deletedMark = 0;
1539 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1540 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1541 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1542 }
1543 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1544 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1545 context->SetReceiveWaterMarkErr(true);
1546 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1547 return true;
1548 }
1549 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1550 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1551 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1552 peerMark);
1553 context->SetReceiveWaterMarkErr(true);
1554 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1555 return true;
1556 }
1557 return false;
1558 }
1559
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1560 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1561 {
1562 auto *packet = message->GetObject<DataRequestPacket>();
1563 if (packet == nullptr) {
1564 return -E_INVALID_ARGS;
1565 }
1566 if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1567 return E_OK;
1568 }
1569 auto query = packet->GetQuery();
1570 std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1571 if (!schemaSyncStatus.second) {
1572 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1573 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1574 return -E_NEED_ABILITY_SYNC;
1575 }
1576 if (!schemaSyncStatus.first) {
1577 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1578 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1579 return -E_SCHEMA_MISMATCH;
1580 }
1581 return E_OK;
1582 }
1583
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1584 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1585 {
1586 int mode = SyncOperation::TransferSyncMode(inMode);
1587 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1588 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1589 return;
1590 }
1591
1592 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1593 storage_->NotifyRemotePushFinished(deviceId_);
1594 }
1595 }
1596
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1597 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1598 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1599 {
1600 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1601 WaterMark localMark = 0;
1602 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1603 ackPacket.SetRecvCode(recvCode);
1604 // send ack packet
1605 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1606 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1607 } else if (recvCode != WATER_MARK_INVALID) {
1608 WaterMark mark = 0;
1609 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1610 ackPacket.SetData(mark);
1611 }
1612 std::vector<uint64_t> reserved {localMark};
1613 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1614 uint64_t packetId = 0;
1615 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1616 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1617 }
1618 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1619 reserved.push_back(packetId);
1620 }
1621 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1622 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1623 WaterMark deletedPeerMark;
1624 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1625 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1626 }
1627 ackPacket.SetReserved(reserved);
1628 ackPacket.SetVersion(version);
1629 }
1630
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1631 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1632 DataSyncReSendInfo reSendInfo)
1633 {
1634 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1635 if (mode == SyncModeType::PULL) {
1636 return E_OK;
1637 }
1638 ContinueToken token = nullptr;
1639 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1640 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1641 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1642 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1643 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1644 int errCode;
1645 if (curType != SyncType::QUERY_SYNC_TYPE) {
1646 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1647 reSendDataSizeInfo);
1648 } else {
1649 QuerySyncObject queryObj = context->GetQuery();
1650 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1651 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1652 }
1653 if (token != nullptr) {
1654 storage_->ReleaseContinueToken(token);
1655 }
1656 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1657 context->SetTaskErrCode(errCode);
1658 return errCode;
1659 }
1660 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1661 return errCode;
1662 }
1663 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1664 DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1665
1666 int innerCode = InterceptData(syncData);
1667 if (innerCode != E_OK) {
1668 context->SetTaskErrCode(innerCode);
1669 return innerCode;
1670 }
1671
1672 bool needCompressOnSync = false;
1673 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1674 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1675 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1676 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1677 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1678 { remoteAlgo, version });
1679 if (compressCode != E_OK) {
1680 return compressCode;
1681 }
1682 }
1683 return errCode;
1684 }
1685
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1686 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1687 {
1688 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1689 return E_OK;
1690 }
1691 uint64_t clearRemoteDataMark = 0;
1692 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1693 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1694 if (clearRemoteDataMark == 0) {
1695 return E_OK;
1696 }
1697 int errCode = E_OK;
1698 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1699 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1700 if (errCode != E_OK) {
1701 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1702 return errCode;
1703 }
1704 }
1705 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1706 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1707 if (errCode != E_OK) {
1708 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1709 return errCode;
1710 }
1711 }
1712 return E_OK;
1713 }
1714
UpdateMtuSize()1715 void SingleVerDataSync::UpdateMtuSize()
1716 {
1717 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1718 }
1719
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1720 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1721 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1722 {
1723 SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1724 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1725 WaterMark peerMark = 0;
1726 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1727 peerMark);
1728 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1729 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1730 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1731 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1732 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1733 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1734 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1735 packet->SetLastSequence();
1736 }
1737 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1738 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1739 sendCode = SEND_FINISHED;
1740 }
1741 packet->SetData(syncData.entries);
1742 packet->SetCompressData(syncData.compressedEntries);
1743 packet->SetBasicInfo(sendCode, version, reSendMode);
1744 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1745 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1746 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1747 packet->SetEndWaterMark(context->GetEndMark());
1748 packet->SetQuery(context->GetQuery());
1749 }
1750 packet->SetQueryId(context->GetQuerySyncId());
1751 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1752 std::vector<uint64_t> reserved {reSendInfo.packetId};
1753 packet->SetReserved(reserved);
1754 }
1755 if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1756 // resend pull packet dont set compress type
1757 return;
1758 }
1759 bool needCompressOnSync = false;
1760 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1761 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1762 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1763 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1764 packet->SetCompressDataMark();
1765 packet->SetCompressAlgo(curAlgo);
1766 }
1767 FillRequestReSendPacketV2(context, packet);
1768 }
1769
FillRequestReSendPacketV2(SingleVerSyncTaskContext * context,DataRequestPacket * packet)1770 void SingleVerDataSync::FillRequestReSendPacketV2(SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1771 {
1772 if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1773 SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1774 uint32_t total = 0u;
1775 (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1776 packet->SetTotalDataCount(total);
1777 }
1778 }
1779
GetDataSizeSpecInfo(size_t packetSize)1780 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1781 {
1782 bool needCompressOnSync = false;
1783 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1784 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1785 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1786 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1787 return {blockSize, packetSize};
1788 }
1789
InterceptData(SyncEntry & syncEntry)1790 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1791 {
1792 if (storage_ == nullptr) {
1793 LOGE("Invalid DB. Can not intercept data.");
1794 return -E_INVALID_DB;
1795 }
1796
1797 // GetLocalDeviceName get local device ID.
1798 // GetDeviceId get remote device ID.
1799 // If intercept data fail, entries will be released.
1800 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1801 }
1802
ControlCmdStart(SingleVerSyncTaskContext * context)1803 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1804 {
1805 if (context == nullptr) {
1806 return -E_INVALID_ARGS;
1807 }
1808 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1809 if (subManager == nullptr) {
1810 return -E_INVALID_ARGS;
1811 }
1812 int errCode = ControlCmdStartCheck(context);
1813 if (errCode != E_OK) {
1814 return errCode;
1815 }
1816 ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1817 if (packet == nullptr) {
1818 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1819 return -E_OUT_OF_MEMORY;
1820 }
1821 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1822 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1823 if (errCode != E_OK) {
1824 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1825 delete packet;
1826 packet = nullptr;
1827 return errCode;
1828 }
1829 }
1830 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1831 errCode = SendControlPacket(packet, context);
1832 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1833 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1834 }
1835 return errCode;
1836 }
1837
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1838 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1839 {
1840 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1841 if (packet == nullptr) {
1842 return -E_INVALID_ARGS;
1843 }
1844 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1845 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1846 int errCode = ControlCmdRequestRecvPre(context, message);
1847 if (errCode != E_OK) {
1848 return errCode;
1849 }
1850 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1851 errCode = SubscribeRequestRecv(context, message);
1852 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1853 errCode = UnsubscribeRequestRecv(context, message);
1854 }
1855 return errCode;
1856 }
1857
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1858 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1859 {
1860 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1861 if (subManager == nullptr) {
1862 return -E_INVALID_ARGS;
1863 }
1864 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1865 if (errCode != E_OK) {
1866 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1867 return errCode;
1868 }
1869 const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1870 if (packet == nullptr) {
1871 return -E_INVALID_ARGS;
1872 }
1873 int32_t recvCode = packet->GetRecvCode();
1874 uint32_t cmdType = packet->GetcontrolCmdType();
1875 if (recvCode != E_OK) {
1876 LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1877 STR_MASK(GetDeviceId()), cmdType);
1878 // for unsubscribe no need to do something
1879 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1880 return recvCode;
1881 }
1882 if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1883 errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1884 } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1885 subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1886 }
1887 if (errCode != E_OK) {
1888 LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1889 return errCode;
1890 }
1891 return -E_NO_DATA_SEND; // means control msg send finished
1892 }
1893
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1894 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1895 {
1896 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1897 (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1898 LOGE("[ControlCmdStartCheck] not support controlCmd");
1899 return -E_INVALID_ARGS;
1900 }
1901 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1902 context->GetQuery().HasInKeys() &&
1903 context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1904 return -E_NOT_SUPPORT;
1905 }
1906 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1907 return E_OK;
1908 }
1909 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1910 if (permitReceive) {
1911 context->SetReceivcPermitCheck(true);
1912 } else {
1913 return -E_SECURITY_OPTION_CHECK_ERROR;
1914 }
1915 return E_OK;
1916 }
1917
SendControlPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1918 int SingleVerDataSync::SendControlPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1919 {
1920 Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1921 if (message == nullptr) {
1922 LOGE("[DataSync][SendControlPacket] new message error");
1923 delete packet;
1924 packet = nullptr;
1925 return -E_OUT_OF_MEMORY;
1926 }
1927 uint32_t packetLen = packet->CalculateLen();
1928 int errCode = message->SetExternalObject(packet);
1929 if (errCode != E_OK) {
1930 delete packet;
1931 packet = nullptr;
1932 delete message;
1933 message = nullptr;
1934 LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1935 return errCode;
1936 }
1937 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1938 context->GetSequenceId(), context->GetRequestSessionId());
1939 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1940 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1941 };
1942 errCode = Send(context, message, handler, packetLen);
1943 if (errCode != E_OK) {
1944 delete message;
1945 message = nullptr;
1946 }
1947 return errCode;
1948 }
1949
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1950 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1951 uint32_t controlCmdType, const CommErrHandler &handler)
1952 {
1953 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1954 if (ackMessage == nullptr) {
1955 LOGE("[DataSync][SendControlAck] new message error");
1956 return -E_OUT_OF_MEMORY;
1957 }
1958 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1959 ControlAckPacket ack;
1960 ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1961 int errCode = ackMessage->SetCopiedObject(ack);
1962 if (errCode != E_OK) {
1963 delete ackMessage;
1964 ackMessage = nullptr;
1965 LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1966 return errCode;
1967 }
1968 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1969 message->GetSequenceId(), message->GetSessionId());
1970 errCode = Send(context, ackMessage, handler, 0);
1971 if (errCode != E_OK) {
1972 delete ackMessage;
1973 ackMessage = nullptr;
1974 }
1975 return errCode;
1976 }
1977
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1978 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1979 {
1980 if (context == nullptr || message == nullptr) {
1981 return -E_INVALID_ARGS;
1982 }
1983 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1984 if (packet == nullptr) {
1985 return -E_INVALID_ARGS;
1986 }
1987 uint32_t controlCmdType = packet->GetcontrolCmdType();
1988 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1989 return DoAbilitySyncIfNeed(context, message, true);
1990 }
1991 if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1992 SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1993 return -E_WAIT_NEXT_MESSAGE;
1994 }
1995 return E_OK;
1996 }
1997
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1998 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1999 const Message *message)
2000 {
2001 uint32_t controlCmdType = packet->GetcontrolCmdType();
2002 if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
2003 return E_OK;
2004 }
2005 QuerySyncObject syncQuery = packet->GetQuery();
2006 int errCode;
2007 if (!packet->IsAutoSubscribe()) {
2008 errCode = storage_->CheckAndInitQueryCondition(syncQuery);
2009 if (errCode != E_OK) {
2010 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2011 SendControlAck(context, message, errCode, controlCmdType);
2012 return -E_WAIT_NEXT_MESSAGE;
2013 }
2014 }
2015 int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
2016 static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
2017 if (mode >= SyncModeType::INVALID_MODE) {
2018 LOGE("[SingleVerDataSync] invalid mode");
2019 SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
2020 return -E_WAIT_NEXT_MESSAGE;
2021 }
2022 errCode = CheckPermitSendData(mode, context);
2023 if (errCode != E_OK) {
2024 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2025 SendControlAck(context, message, errCode, controlCmdType);
2026 }
2027 return errCode;
2028 }
2029
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2030 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2031 {
2032 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2033 if (packet == nullptr) {
2034 return -E_INVALID_ARGS;
2035 }
2036 int errCode = SubscribeRequestRecvPre(context, packet, message);
2037 if (errCode != E_OK) {
2038 return errCode;
2039 }
2040 uint32_t controlCmdType = packet->GetcontrolCmdType();
2041 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2042 if (subscribeManager == nullptr) {
2043 LOGE("[SingleVerDataSync] subscribeManager check failed");
2044 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2045 return -E_INVALID_ARGS;
2046 }
2047 errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
2048 if (errCode != E_OK) {
2049 LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2050 STR_MASK(GetDeviceId()));
2051 SendControlAck(context, message, errCode, controlCmdType);
2052 return errCode;
2053 }
2054 errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2055 if (errCode != E_OK) {
2056 LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2057 STR_MASK(GetDeviceId()));
2058 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2059 SendControlAck(context, message, errCode, controlCmdType);
2060 return errCode;
2061 }
2062 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2063 if (errCode != E_OK) {
2064 subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2065 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2066 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2067 STR_MASK(GetDeviceId()));
2068 return errCode;
2069 }
2070 subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2071 DBInfo dbInfo;
2072 storage_->GetDBInfo(dbInfo);
2073 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2074 return errCode;
2075 }
2076
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2077 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2078 {
2079 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2080 if (packet == nullptr) {
2081 return -E_INVALID_ARGS;
2082 }
2083 uint32_t controlCmdType = packet->GetcontrolCmdType();
2084 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2085 if (subscribeManager == nullptr) {
2086 LOGE("[SingleVerDataSync] subscribeManager check failed");
2087 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2088 return -E_INVALID_ARGS;
2089 }
2090 int errCode;
2091 std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2092 if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2093 errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2094 if (errCode != E_OK) {
2095 LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2096 STR_MASK(GetDeviceId()));
2097 SendControlAck(context, message, errCode, controlCmdType);
2098 return errCode;
2099 }
2100 }
2101 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2102 if (errCode != E_OK) {
2103 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2104 STR_MASK(GetDeviceId()));
2105 return errCode;
2106 }
2107 subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2108 DBInfo dbInfo;
2109 storage_->GetDBInfo(dbInfo);
2110 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2111 metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2112 return errCode;
2113 }
2114
PutDataMsg(Message * message)2115 void SingleVerDataSync::PutDataMsg(Message *message)
2116 {
2117 return msgSchedule_.PutMsg(message);
2118 }
2119
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2120 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2121 bool &isNeedContinue)
2122 {
2123 return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2124 }
2125
IsNeedReloadQueue()2126 bool SingleVerDataSync::IsNeedReloadQueue()
2127 {
2128 return msgSchedule_.IsNeedReloadQueue();
2129 }
2130
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2131 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2132 {
2133 msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2134 }
2135
ClearDataMsg()2136 void SingleVerDataSync::ClearDataMsg()
2137 {
2138 msgSchedule_.ClearMsg();
2139 }
2140
QuerySyncCheck(SingleVerSyncTaskContext * context)2141 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2142 {
2143 if (context == nullptr) {
2144 return -E_INVALID_ARGS;
2145 }
2146 bool isCheckStatus = false;
2147 int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2148 if (errCode != E_OK) {
2149 return errCode;
2150 }
2151 if (!isCheckStatus) {
2152 context->SetTaskErrCode(-E_NOT_SUPPORT);
2153 return -E_NOT_SUPPORT;
2154 }
2155 return E_OK;
2156 }
2157
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2158 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2159 const std::shared_ptr<SubscribeManager> &subscribeManager)
2160 {
2161 if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2162 storage_->RemoveSubscribe(queryId);
2163 }
2164 }
2165 } // namespace DistributedDB
2166