• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 }
44 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)45 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
46     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
47 {
48     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
49         return -E_INVALID_ARGS;
50     }
51     storage_ = static_cast<SyncGenericInterface *>(inStorage);
52     communicateHandle_ = inCommunicateHandle;
53     metadata_ = inMetadata;
54     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
55     std::vector<uint8_t> label = inStorage->GetIdentifier();
56     label.resize(3); // only show 3 Bytes enough
57     label_ = DBCommon::VectorToHexString(label);
58     deviceId_ = deviceId;
59     msgSchedule_.Initialize(label_, deviceId_);
60     return E_OK;
61 }
62 
SyncStart(int mode,SingleVerSyncTaskContext * context)63 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
64 {
65     std::lock_guard<std::mutex> lock(lock_);
66     int errCode = CheckPermitSendData(mode, context);
67     if (errCode != E_OK) {
68         return errCode;
69     }
70     if (sessionId_ != 0) { // auto sync timeout resend
71         return ReSendData(context);
72     }
73     ResetSyncStatus(mode, context);
74     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
75     int tmpMode = SyncOperation::TransferSyncMode(mode);
76     if (tmpMode == SyncModeType::PUSH) {
77         errCode = PushStart(context);
78     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79         errCode = PushPullStart(context);
80     } else if (tmpMode == SyncModeType::PULL) {
81         errCode = PullRequestStart(context);
82     } else {
83         SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
84         errCode = PullResponseStart(context);
85     }
86     if (context->IsSkipTimeoutError(errCode)) {
87         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
88         // just return to avoid higher pressure for send.
89         return E_OK;
90     }
91     if (errCode != E_OK) {
92         LOGE("[DataSync] SendStart errCode=%d", errCode);
93         return errCode;
94     }
95     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
96         LOGE("wait for recv finished for push and pull mode");
97         return -E_EKEYREVOKED;
98     }
99     return InnerSyncStart(context);
100 }
101 
InnerSyncStart(SingleVerSyncTaskContext * context)102 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
103 {
104     int errCode = CheckPermitSendData(mode_, context);
105     if (errCode != E_OK) {
106         return errCode;
107     }
108     while (true) {
109         if (windowSize_ <= 0 || isAllDataHasSent_) {
110             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
111                 label_.c_str(), STR_MASK(deviceId_));
112             return E_OK;
113         }
114         int mode = SyncOperation::TransferSyncMode(mode_);
115         if (mode == SyncModeType::PULL) {
116             LOGE("[DataSync] unexpected error");
117             return -E_INVALID_ARGS;
118         }
119         context->IncSequenceId();
120         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
121             errCode = PushStart(context);
122         } else {
123             errCode = PullResponseStart(context);
124         }
125         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
126             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
127             isAllDataHasSent_ = true;
128             return -E_EKEYREVOKED;
129         }
130         if (context->IsSkipTimeoutError(errCode)) {
131             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
132             // just return to avoid higher pressure for send.
133             return E_OK;
134         }
135         if (errCode != E_OK) {
136             LOGE("[DataSync] InnerSend errCode=%d", errCode);
137             return errCode;
138         }
139     }
140     return E_OK;
141 }
142 
InnerClearSyncStatus()143 void SingleVerDataSync::InnerClearSyncStatus()
144 {
145     sessionId_ = 0;
146     reSendMap_.clear();
147     windowSize_ = 0;
148     maxSequenceIdHasSent_ = 0;
149     isAllDataHasSent_ = false;
150 }
151 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)152 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
153 {
154     if (message == nullptr) {
155         LOGE("[DataSync] AckRecv message nullptr");
156         return -E_INVALID_ARGS;
157     }
158     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
159     if (packet == nullptr) {
160         return -E_INVALID_ARGS;
161     }
162     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
163     uint32_t sessionId = message->GetSessionId();
164     uint32_t sequenceId = message->GetSequenceId();
165 
166     std::lock_guard<std::mutex> lock(lock_);
167     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
168         windowSize_, label_.c_str(), STR_MASK(deviceId_));
169     if (sessionId != sessionId_) {
170         LOGI("[DataSync] ignore ack,sessionId is different");
171         return E_OK;
172     }
173     Timestamp lastQueryTime = 0;
174     if (reSendMap_.count(sequenceId) != 0) {
175         lastQueryTime = reSendMap_[sequenceId].end;
176         reSendMap_.erase(sequenceId);
177         windowSize_++;
178     } else {
179         LOGI("[DataSync] ack seqId not in map");
180         return E_OK;
181     }
182     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
183         Timestamp dbLastQueryTime = 0;
184         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
185         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
186             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
187         }
188         if (errCode != E_OK) {
189             return errCode;
190         }
191     }
192     if (!isAllDataHasSent_) {
193         return InnerSyncStart(context);
194     } else if (reSendMap_.empty()) {
195         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
196         InnerClearSyncStatus();
197         return -E_FINISHED;
198     }
199     return E_OK;
200 }
201 
ClearSyncStatus()202 void SingleVerDataSync::ClearSyncStatus()
203 {
204     std::lock_guard<std::mutex> lock(lock_);
205     InnerClearSyncStatus();
206 }
207 
ReSendData(SingleVerSyncTaskContext * context)208 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
209 {
210     if (reSendMap_.empty()) {
211         LOGI("[DataSync] ReSend map empty");
212         return -E_INTERNAL_ERROR;
213     }
214     uint32_t sequenceId = reSendMap_.begin()->first;
215     ReSendInfo reSendInfo = reSendMap_.begin()->second;
216     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
217         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
218         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
219         windowSize_, label_.c_str(), STR_MASK(deviceId_));
220     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
221         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
222     return ReSend(context, dataReSendInfo);
223 }
224 
GetLocalDeviceName()225 std::string SingleVerDataSync::GetLocalDeviceName()
226 {
227     std::string deviceInfo;
228     if (communicateHandle_ != nullptr) {
229         communicateHandle_->GetLocalIdentity(deviceInfo);
230     }
231     return deviceInfo;
232 }
233 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)234 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
235     uint32_t packetLen)
236 {
237     bool startFeedDogRet = false;
238     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
239         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
240             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
241         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
242     }
243     SendConfig sendConfig;
244     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
245     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
246     if (errCode != E_OK) {
247         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
248         if (startFeedDogRet) {
249             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
250         }
251     }
252     return errCode;
253 }
254 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)255 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
256     std::vector<SendDataItem> &outData, size_t packetSize)
257 {
258     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
259     if (performance != nullptr) {
260         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
261     }
262     // start a watch dog before get data
263     // it will send ack util get data finished
264     context->StartFeedDogForGetData(context->GetResponseSessionId());
265     int errCode = GetData(context, packetSize, outData);
266     context->StopFeedDogForGetData();
267     if (performance != nullptr) {
268         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
269     }
270     if (!outData.empty()) {
271         SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
272     }
273     return errCode;
274 }
275 
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)276 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
277 {
278     int errCode;
279     UpdateMtuSize();
280     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
281         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
282         LOGI("[DataSync][GetData] resend data");
283         errCode = GetUnsyncData(context, outData, packetSize);
284     } else {
285         ContinueToken token;
286         context->GetContinueToken(token);
287         if (token == nullptr) {
288             errCode = GetUnsyncData(context, outData, packetSize);
289         } else {
290             LOGD("[DataSync][GetData] get data from token");
291             // if there is data to send, read out data, and update local watermark, send data
292             errCode = GetNextUnsyncData(context, outData, packetSize);
293         }
294     }
295     if (errCode == -E_UNFINISHED) {
296         LOGD("[DataSync][GetData] not finished.");
297     }
298     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
299         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
300         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
301     }
302     return errCode;
303 }
304 
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)305 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
306 {
307     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
308     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
309         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
310     bool needCompressOnSync = false;
311     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
312     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
313     int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
314     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
315         context->SetTaskErrCode(errCode);
316         return errCode;
317     }
318 
319     int innerCode = InterceptData(syncOutData);
320     if (innerCode != E_OK) {
321         context->SetTaskErrCode(innerCode);
322         return innerCode;
323     }
324 
325     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
326     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
327         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
328             { remoteAlgo, version });
329         if (compressCode != E_OK) {
330             return compressCode;
331         }
332     }
333     return errCode;
334 }
335 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)336 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
337     size_t packetSize)
338 {
339     SyncTimeRange waterRange;
340     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
341     WaterMark startMark = 0;
342     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
343     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
344     if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
345         return E_OK;
346     }
347     waterRange.beginTime = startMark;
348     if (curType == SyncType::QUERY_SYNC_TYPE) {
349         WaterMark deletedStartMark = 0;
350         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
351         Timestamp lastQueryTimestamp = 0;
352         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
353             lastQueryTimestamp);
354         if (errCode != E_OK) {
355             return errCode;
356         }
357         waterRange.deleteBeginTime = deletedStartMark;
358         waterRange.lastQueryTime = lastQueryTimestamp;
359     }
360     return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
361 }
362 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)363 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
364     DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
365 {
366     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
367     ContinueToken token = nullptr;
368     context->GetContinueToken(token);
369     if (token != nullptr) {
370         storage_->ReleaseContinueToken(token);
371     }
372     int errCode;
373     if (curType != SyncType::QUERY_SYNC_TYPE) {
374         errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
375             syncDataSizeInfo);
376     } else {
377         QuerySyncObject queryObj = context->GetQuery();
378         errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
379     }
380     context->SetContinueToken(token);
381     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
382         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
383     }
384     return errCode;
385 }
386 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)387 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
388     size_t packetSize)
389 {
390     ContinueToken token;
391     context->GetContinueToken(token);
392     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
393     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
394     context->SetContinueToken(token);
395     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
396         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
397     }
398     return errCode;
399 }
400 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)401 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
402     SyncType curType, const QuerySyncObject &query)
403 {
404     if (inData.empty()) {
405         return E_OK;
406     }
407     SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
408     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
409     if (performance != nullptr) {
410         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
411     }
412     const auto localDeviceName = GetLocalDeviceName();
413     const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
414     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
415     std::vector<SendDataItem> copyData = inData;
416     int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
417     if (errCode != E_OK) {
418         LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
419         return errCode;
420     }
421     // query only support prefix key and don't have query in packet in 104 version
422     errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
423     if (performance != nullptr) {
424         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
425     }
426     if (errCode != E_OK) {
427         LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
428     }
429     return errCode;
430 }
431 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)432 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
433 {
434     mode_ = inMode;
435     maxSequenceIdHasSent_ = 0;
436     isAllDataHasSent_ = false;
437     context->ReSetSequenceId();
438     reSendMap_.clear();
439     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
440         windowSize_ = LOW_VERSION_WINDOW_SIZE;
441     } else {
442         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
443     }
444     int mode = SyncOperation::TransferSyncMode(inMode);
445     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
446         sessionId_ = context->GetRequestSessionId();
447     } else {
448         sessionId_ = context->GetResponseSessionId();
449     }
450 }
451 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)452 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
453     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
454 {
455     WaterMark localMark = 0;
456     WaterMark deleteMark = 0;
457     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
458     GetLocalDeleteSyncWaterMark(context, deleteMark);
459     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
460 }
461 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const462 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
463     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
464 {
465     WaterMark localMark = 0;
466     int errCode = E_OK;
467     const std::string &deviceId = context->GetDeviceId();
468     std::string queryId = context->GetQuerySyncId();
469     if (syncType != SyncType::QUERY_SYNC_TYPE) {
470         if (isCheckBeforUpdate) {
471             GetLocalWaterMark(syncType, queryId, context, localMark);
472             if (localMark >= dataTimeRange.endTime) {
473                 return E_OK;
474             }
475         }
476         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
477     } else {
478         bool isNeedUpdateMark = true;
479         bool isNeedUpdateDeleteMark = true;
480         if (isCheckBeforUpdate) {
481             WaterMark deleteDataWaterMark = 0;
482             GetLocalWaterMark(syncType, queryId, context, localMark);
483             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
484             if (localMark >= dataTimeRange.endTime) {
485                 isNeedUpdateMark = false;
486             }
487             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
488                 isNeedUpdateDeleteMark = false;
489             }
490         }
491         if (isNeedUpdateMark) {
492             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
493             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
494             if (errCode != E_OK) {
495                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
496                 return errCode;
497             }
498         }
499         if (isNeedUpdateDeleteMark) {
500             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
501                 dataTimeRange.deleteEndTime);
502             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
503         }
504     }
505     if (errCode != E_OK) {
506         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
507     }
508     return errCode;
509 }
510 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const511 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
512     const DeviceID &deviceId, WaterMark &waterMark) const
513 {
514     if (syncType != SyncType::QUERY_SYNC_TYPE) {
515         metadata_->GetPeerWaterMark(deviceId, waterMark);
516         return;
517     }
518     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
519 }
520 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)521 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
522 {
523     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
524 }
525 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const526 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
527     WaterMark &waterMark) const
528 {
529     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
530 }
531 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const532 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
533     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
534 {
535     if (syncType != SyncType::QUERY_SYNC_TYPE) {
536         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
537         return;
538     }
539     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
540         waterMark, context->IsAutoLiftWaterMark());
541 }
542 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)543 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
544     WaterMark maxSendDataTime)
545 {
546     bool isNeedClearRemoteData = false;
547     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
548     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
549         uint64_t clearDeviceDataMark = 0;
550         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
551         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
552     } else {
553         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
554         if (packet == nullptr) {
555             LOGE("[RemoveDeviceDataHandle] get packet object failed");
556             return -E_INVALID_ARGS;
557         }
558         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
559         WaterMark packetLocalMark = packet->GetLocalWaterMark();
560         WaterMark peerMark = 0;
561         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
562         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
563     }
564     if (!isNeedClearRemoteData) {
565         return E_OK;
566     }
567     int errCode = E_OK;
568     if (context->IsNeedClearRemoteStaleData()) {
569         // need to clear remote device history data
570         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
571         if (errCode != E_OK) {
572             (void)SendDataAck(context, message, errCode, maxSendDataTime);
573             return errCode;
574         }
575         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
576             // avoid repeat clear in ack
577             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
578         }
579     }
580     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
581         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
582         if (errCode != E_OK) {
583             (void)SendDataAck(context, message, errCode, maxSendDataTime);
584             return errCode;
585         }
586     }
587     return E_OK;
588 }
589 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)590 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
591     const std::vector<uint64_t> &reserved)
592 {
593     bool isNeedClearRemoteData = false;
594     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
595     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
596     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
597         uint64_t clearDeviceDataMark = 0;
598         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
599         isNeedClearRemoteData = (clearDeviceDataMark != 0);
600     } else if (reserved.empty()) {
601         WaterMark localMark = 0;
602         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
603         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
604     } else {
605         WaterMark peerMark = 0;
606         GetPeerWaterMark(curType, context->GetQuerySyncId(),
607             context->GetDeviceId(), peerMark);
608         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
609     }
610     if (!isNeedClearRemoteData) {
611         return E_OK;
612     }
613     // need to clear remote historydata
614     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
615         label_.c_str(), STR_MASK(GetDeviceId()));
616     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
617     if (errCode != E_OK) {
618         return errCode;
619     }
620     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
621         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
622     }
623     return errCode;
624 }
625 
SetSessionEndTimestamp(Timestamp end)626 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
627 {
628     sessionEndTimestamp_ = end;
629 }
630 
GetSessionEndTimestamp() const631 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
632 {
633     return sessionEndTimestamp_;
634 }
635 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)636 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
637 {
638     ReSendInfo reSendInfo;
639     reSendInfo.start = dataTimeRange.beginTime;
640     reSendInfo.end = dataTimeRange.endTime;
641     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
642     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
643     reSendInfo.packetId = context->GetPacketId();
644     maxSequenceIdHasSent_++;
645     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
646     windowSize_--;
647     ContinueToken token;
648     context->GetContinueToken(token);
649     if (token == nullptr) {
650         isAllDataHasSent_ = true;
651     }
652     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
653         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
654         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
655         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
656 }
657 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)658 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
659     SyncEntry &syncData, int sendCode, int mode)
660 {
661     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
662     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
663     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
664     WaterMark localMark = 0;
665     WaterMark peerMark = 0;
666     WaterMark deleteMark = 0;
667     bool needCompressOnSync = false;
668     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
669     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
670     std::string id = context->GetQuerySyncId();
671     GetLocalWaterMark(curType, id, context, localMark);
672     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
673     GetLocalDeleteSyncWaterMark(context, deleteMark);
674     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
675         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
676         packet->SetLastSequence();
677     }
678     int tmpMode = mode;
679     if (mode == SyncModeType::RESPONSE_PULL) {
680         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
681     }
682     packet->SetData(syncData.entries);
683     packet->SetCompressData(syncData.compressedEntries);
684     packet->SetBasicInfo(sendCode, version, tmpMode);
685     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
686     packet->SetWaterMark(localMark, peerMark, deleteMark);
687     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
688         packet->SetEndWaterMark(context->GetEndMark());
689         packet->SetSessionId(context->GetRequestSessionId());
690     }
691     packet->SetQuery(context->GetQuery());
692     packet->SetQueryId(context->GetQuerySyncId());
693     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
694     // empty compress data should not mark compress
695     if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
696         packet->SetCompressDataMark();
697         packet->SetCompressAlgo(curAlgo);
698     }
699     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
700     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
701         context->GetQuery().HasOrderBy())) {
702         packet->SetUpdateWaterMark();
703     }
704     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
705         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
706         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
707 }
708 
RequestStart(SingleVerSyncTaskContext * context,int mode)709 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
710 {
711     int errCode = QuerySyncCheck(context);
712     if (errCode != E_OK) {
713         LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
714         return errCode;
715     }
716     errCode = RemoveDeviceDataIfNeed(context);
717     if (errCode != E_OK) {
718         context->SetTaskErrCode(errCode);
719         return errCode;
720     }
721     SyncEntry syncData;
722     // get data
723     errCode = GetMatchData(context, syncData);
724     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
725 
726     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
727         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
728         return errCode;
729     }
730 
731     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
732     if (packet == nullptr) {
733         LOGE("[DataSync][PushStart] new DataRequestPacket error");
734         return -E_OUT_OF_MEMORY;
735     }
736     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
737     UpdateWaterMark isUpdateWaterMark;
738     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
739     if (errCode == E_OK) {
740         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
741     }
742     FillDataRequestPacket(packet, context, syncData, errCode, mode);
743     errCode = SendDataPacket(curType, packet, context);
744     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
745     if (performance != nullptr) {
746         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
747     }
748     if (errCode == E_OK || errCode == -E_TIMEOUT) {
749         UpdateSendInfo(dataTime, context);
750     }
751     if (errCode == E_OK) {
752         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
753             context->GetQuery().HasOrderBy())) {
754             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
755             return E_OK;
756         }
757         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
758         SaveLocalWaterMark(curType, context, tmpDataTime);
759     }
760     return errCode;
761 }
762 
PushStart(SingleVerSyncTaskContext * context)763 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
764 {
765     if (context == nullptr) {
766         return -E_INVALID_ARGS;
767     }
768     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
769     return RequestStart(context,
770         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
771 }
772 
PushPullStart(SingleVerSyncTaskContext * context)773 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
774 {
775     if (context == nullptr) {
776         return -E_INVALID_ARGS;
777     }
778     return RequestStart(context, context->GetMode());
779 }
780 
PullRequestStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
782 {
783     if (context == nullptr) {
784         return -E_INVALID_ARGS;
785     }
786     int errCode = QuerySyncCheck(context);
787     if (errCode != E_OK) {
788         return errCode;
789     }
790     errCode = RemoveDeviceDataIfNeed(context);
791     if (errCode != E_OK) {
792         context->SetTaskErrCode(errCode);
793         return errCode;
794     }
795     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
796     if (packet == nullptr) {
797         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
798         return -E_OUT_OF_MEMORY;
799     }
800     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
801     WaterMark peerMark = 0;
802     WaterMark localMark = 0;
803     WaterMark deleteMark = 0;
804     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
805         context->GetDeviceId(), peerMark);
806     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
807     GetLocalDeleteSyncWaterMark(context, deleteMark);
808     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
809     WaterMark endMark = context->GetEndMark();
810     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
811     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
812     packet->SetBasicInfo(E_OK, version, context->GetMode());
813     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
814     packet->SetWaterMark(localMark, peerMark, deleteMark);
815     packet->SetEndWaterMark(endMark);
816     packet->SetSessionId(context->GetRequestSessionId());
817     packet->SetQuery(context->GetQuery());
818     packet->SetQueryId(context->GetQuerySyncId());
819     packet->SetLastSequence();
820     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
821     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
822     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
823         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
824         label_.c_str(), STR_MASK(GetDeviceId()));
825     UpdateSendInfo(dataTime, context);
826     return SendDataPacket(syncType, packet, context);
827 }
828 
PullResponseStart(SingleVerSyncTaskContext * context)829 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
830 {
831     if (context == nullptr) {
832         return -E_INVALID_ARGS;
833     }
834     SyncEntry syncData;
835     // get data
836     int errCode = GetMatchData(context, syncData);
837     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
838         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
839             (void)SendPullResponseDataPkt(errCode, syncData, context);
840         }
841         return errCode;
842     }
843     // if send finished
844     int ackCode = E_OK;
845     ContinueToken token = nullptr;
846     context->GetContinueToken(token);
847     if (errCode == E_OK && token == nullptr) {
848         LOGD("[DataSync][PullResponse] send last frame end");
849         ackCode = SEND_FINISHED;
850     }
851     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
852     UpdateWaterMark isUpdateWaterMark;
853     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
854     if (errCode == E_OK) {
855         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
856     }
857     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
858     if (errCode == E_OK || errCode == -E_TIMEOUT) {
859         UpdateSendInfo(dataTime, context);
860     }
861     if (errCode == E_OK) {
862         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
863             context->GetQuery().HasOrderBy())) {
864             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
865             return E_OK;
866         }
867         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
868         SaveLocalWaterMark(curType, context, tmpDataTime);
869     }
870     return errCode;
871 }
872 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)873 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
874     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
875 {
876     WaterMark tmpPeerWatermark = dataTime.endTime;
877     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
878     if (isUpdateWaterMark.normalUpdateMark) {
879         tmpPeerWatermark++;
880     }
881     if (isUpdateWaterMark.deleteUpdateMark) {
882         tmpPeerDeletedWatermark++;
883     }
884     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
885 }
886 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)887 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
888     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
889 {
890     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
891         return;
892     }
893     int errCode = E_OK;
894     if (syncType != SyncType::QUERY_SYNC_TYPE) {
895         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
896     } else {
897         if (peerWatermark != 0) {
898             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
899             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
900             if (errCode != E_OK) {
901                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
902             }
903         }
904         if (peerDeletedWatermark != 0) {
905             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
906                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
907             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
908         }
909     }
910     if (errCode != E_OK) {
911         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
912     }
913 }
914 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)915 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
916 {
917     uint16_t remoteCommunicatorVersion = 0;
918     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
919         -E_NOT_FOUND) {
920         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
921         return -E_VERSION_NOT_SUPPORT;
922     }
923     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
924     if (remoteCommunicatorVersion == 0) {
925         LOGI("[DataSync] set remote version 0");
926         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
927         return E_OK;
928     } else {
929         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
930         if (isControlMsg) {
931             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
932         } else {
933             (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
934         }
935         return -E_NEED_ABILITY_SYNC;
936     }
937 }
938 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)939 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
940 {
941     if (context == nullptr || message == nullptr) {
942         return -E_INVALID_ARGS;
943     }
944     auto *packet = message->GetObject<DataRequestPacket>();
945     if (packet == nullptr) {
946         return -E_INVALID_ARGS;
947     }
948     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
949         return DoAbilitySyncIfNeed(context, message);
950     }
951     int32_t sendCode = packet->GetSendCode();
952     if (sendCode == -E_VERSION_NOT_SUPPORT) {
953         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
954         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
955         return -E_WAIT_NEXT_MESSAGE;
956     }
957     // only deal with pull response packet errCode
958     if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
959         message->GetSessionId() == context->GetRequestSessionId()) {
960         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
961         return sendCode;
962     }
963     int errCode = RunPermissionCheck(context, message, packet);
964     if (errCode != E_OK) {
965         return errCode;
966     }
967     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
968         errCode = CheckSchemaStrategy(context, message);
969     }
970     if (errCode == E_OK) {
971         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
972     }
973     if (errCode != E_OK) {
974         (void)SendDataAck(context, message, errCode, 0);
975         return errCode;
976     }
977     errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
978     if (errCode != E_OK) {
979         (void)SendDataAck(context, message, errCode, 0);
980     }
981     return errCode;
982 }
983 
DataRequestRecvInner(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)984 int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, const Message *message,
985     WaterMark &pullEndWatermark)
986 {
987     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
988     if (packet == nullptr) {
989         LOGE("[DataSync][DataRequestRecv] get packet object failed");
990         return -E_INVALID_ARGS;
991     }
992     const std::vector<SendDataItem> &data = packet->GetData();
993     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
994     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
995         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
996         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
997     context->SetReceiveWaterMarkErr(false);
998     UpdateWaterMark isUpdateWaterMark;
999     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1000     int errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1001     if (errCode != E_OK) {
1002         return errCode;
1003     }
1004     Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1005     if (WaterMarkErrHandle(curType, context, message)) {
1006         return E_OK;
1007     }
1008     GetPullEndWatermark(context, packet, pullEndWatermark);
1009     // save data first
1010     errCode = SaveData(context, data, curType,
1011         SingleVerDataSyncUtils::GetQueryFromDataRequest(*packet, *context, message->GetSessionId()));
1012     if (errCode != E_OK) {
1013         (void)SendDataAck(context, message, errCode, dataTime.endTime);
1014         return errCode;
1015     }
1016     SingleVerDataSyncUtils::UpdateSyncProcess(context, packet);
1017 
1018     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1019         pullEndWatermark = 0;
1020         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1021     } else {
1022         // if data is empty, we don't know the max timestap of this packet.
1023         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1024     }
1025     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1026         context->GetRequestSessionId());
1027     UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1028 
1029     if (errCode != E_OK) {
1030         return errCode;
1031     }
1032     if (packet->GetSendCode() == SEND_FINISHED) {
1033         return -E_RECV_FINISHED;
1034     }
1035     return errCode;
1036 }
1037 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)1038 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
1039     WaterMark &pullEndWatermark)
1040 {
1041     int errCode = DataRequestRecvPre(context, message);
1042     if (errCode != E_OK) {
1043         return errCode;
1044     }
1045     return DataRequestRecvInner(context, message, pullEndWatermark);
1046 }
1047 
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1048 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1049     SingleVerSyncTaskContext *context)
1050 {
1051     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1052     if (message == nullptr) {
1053         LOGE("[DataSync][SendDataPacket] new message error");
1054         delete packet;
1055         packet = nullptr;
1056         return -E_OUT_OF_MEMORY;
1057     }
1058     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1059     int errCode = message->SetExternalObject(packet);
1060     if (errCode != E_OK) {
1061         delete packet;
1062         packet = nullptr;
1063         delete message;
1064         message = nullptr;
1065         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1066         return errCode;
1067     }
1068     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1069         context->GetSequenceId(), context->GetRequestSessionId());
1070     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1071     if (performance != nullptr) {
1072         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1073     }
1074     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1075         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1076     };
1077     errCode = Send(context, message, handler, packetLen);
1078     if (errCode != E_OK) {
1079         delete message;
1080         message = nullptr;
1081     }
1082 
1083     return errCode;
1084 }
1085 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1086 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1087     SingleVerSyncTaskContext *context)
1088 {
1089     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1090     if (packet == nullptr) {
1091         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1092         return -E_OUT_OF_MEMORY;
1093     }
1094     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1095     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1096 
1097     if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1098         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1099         uint32_t total = 0u;
1100         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1101         LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1102         packet->SetTotalDataCount(total);
1103     }
1104 
1105     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1106     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1107     if (message == nullptr) {
1108         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1109         delete packet;
1110         packet = nullptr;
1111         return -E_OUT_OF_MEMORY;
1112     }
1113     int errCode = message->SetExternalObject(packet);
1114     if (errCode != E_OK) {
1115         delete packet;
1116         packet = nullptr;
1117         delete message;
1118         message = nullptr;
1119         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1120         return errCode;
1121     }
1122     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1123         context->GetSequenceId(), context->GetResponseSessionId());
1124     SendResetWatchDogPacket(context, packetLen);
1125     errCode = Send(context, message, nullptr, packetLen);
1126     if (errCode != E_OK) {
1127         delete message;
1128         message = nullptr;
1129     }
1130     return errCode;
1131 }
1132 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1133 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1134 {
1135     (void)SendDataAck(context, message, E_OK, 0);
1136 }
1137 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1138 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1139     WaterMark maxSendDataTime)
1140 {
1141     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1142     if (packet == nullptr) {
1143         return -E_INVALID_ARGS;
1144     }
1145     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1146     if (ackMessage == nullptr) {
1147         LOGE("[DataSync][SendDataAck] new message error");
1148         return -E_OUT_OF_MEMORY;
1149     }
1150     DataAckPacket ack;
1151     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1152     int errCode = ackMessage->SetCopiedObject(ack);
1153     if (errCode != E_OK) {
1154         delete ackMessage;
1155         ackMessage = nullptr;
1156         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1157         return errCode;
1158     }
1159     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1160         message->GetSequenceId(), message->GetSessionId());
1161 
1162     errCode = Send(context, ackMessage, nullptr, 0);
1163     if (errCode != E_OK) {
1164         delete ackMessage;
1165         ackMessage = nullptr;
1166     }
1167     return errCode;
1168 }
1169 
AckPacketIdCheck(const Message * message)1170 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1171 {
1172     if (message == nullptr) {
1173         LOGE("[DataSync] AckRecv message nullptr");
1174         return false;
1175     }
1176     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1177         return true;
1178     }
1179     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1180     if (packet == nullptr) {
1181         return false;
1182     }
1183     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1184     std::lock_guard<std::mutex> lock(lock_);
1185     uint32_t sequenceId = message->GetSequenceId();
1186     if (reSendMap_.count(sequenceId) != 0) {
1187         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1188         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1189             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1190                 originalPacketId);
1191             return false;
1192         }
1193     }
1194     return true;
1195 }
1196 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1197 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1198 {
1199     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1200     if (errCode != E_OK) {
1201         return errCode;
1202     }
1203     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1204     if (packet == nullptr) {
1205         return -E_INVALID_ARGS;
1206     }
1207     int32_t recvCode = packet->GetRecvCode();
1208     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1209         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1210     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1211         LOGE("[DataSync][AckRecv] Version mismatch");
1212         return -E_VERSION_NOT_SUPPORT;
1213     }
1214 
1215     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1216         // we should ReleaseContinueToken, avoid crash
1217         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1218             STR_MASK(GetDeviceId()));
1219         context->ReleaseContinueToken();
1220         return recvCode;
1221     }
1222     uint64_t data = packet->GetData();
1223     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1224         return DealWaterMarkException(context, data, packet->GetReserved());
1225     }
1226 
1227     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1228         // data only use low 32bit
1229         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1230         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1231             STR_MASK(GetDeviceId()));
1232     }
1233 
1234     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1235         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1236             label_.c_str(), STR_MASK(GetDeviceId()));
1237         return recvCode;
1238     }
1239 
1240     // Judge if send finished
1241     ContinueToken token;
1242     context->GetContinueToken(token);
1243     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1244         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1245         return -E_NO_DATA_SEND;
1246     }
1247 
1248     // send next data
1249     return -E_SEND_DATA;
1250 }
1251 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1252 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1253     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1254 {
1255     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1256         LOGE("[SingleVerDataSync] messageId not available.");
1257         return;
1258     }
1259     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1260     if (ackMessage == nullptr) {
1261         LOGE("[DataSync][SaveDataNotify] new message failed");
1262         return;
1263     }
1264 
1265     DataAckPacket ack;
1266     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1267     ack.SetVersion(pktVersion);
1268     int errCode = ackMessage->SetCopiedObject(ack);
1269     if (errCode != E_OK) {
1270         delete ackMessage;
1271         ackMessage = nullptr;
1272         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1273         return;
1274     }
1275     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1276 
1277     errCode = Send(context, ackMessage, nullptr, 0);
1278     if (errCode != E_OK) {
1279         delete ackMessage;
1280         ackMessage = nullptr;
1281     }
1282     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1283         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1284 }
1285 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1286 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1287     WaterMark &pullEndWatermark) const
1288 {
1289     if (packet == nullptr) {
1290         return;
1291     }
1292     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1293     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1294         WaterMark endMark = packet->GetEndWaterMark();
1295         TimeOffset offset;
1296         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1297         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1298         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1299             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1300     }
1301 }
1302 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1303 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1304     const std::vector<uint64_t> &reserved)
1305 {
1306     WaterMark deletedWaterMark = 0;
1307     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1308     if (curType == SyncType::QUERY_SYNC_TYPE) {
1309         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1310             LOGE("[DataSync] get packet reserve size failed");
1311             return -E_INVALID_ARGS;
1312         }
1313         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1314     }
1315     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1316         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1317     int errCode = SaveLocalWaterMark(curType, context,
1318         {0, 0, ackWaterMark, deletedWaterMark});
1319     if (errCode != E_OK) {
1320         return errCode;
1321     }
1322     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1323     context->IncNegotiationCount();
1324     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1325     if (!context->IsNeedClearRemoteStaleData()) {
1326         return -E_RE_SEND_DATA;
1327     }
1328     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1329     if (errCode != E_OK) {
1330         return errCode;
1331     }
1332     return -E_RE_SEND_DATA;
1333 }
1334 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1335 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1336     const DataRequestPacket *packet)
1337 {
1338     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1339     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1340     if (errCode != E_OK) {
1341         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1342             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1343         }
1344         return -E_NOT_PERMIT;
1345     }
1346     const std::vector<SendDataItem> &data = packet->GetData();
1347     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1348     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1349     auto securityOption = packet->GetSecurityOption();
1350     if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1351         securityOption.securityLabel != SecurityLabel::NOT_SET) {
1352         context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1353     }
1354     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1355         !context->GetReceivcPermitCheck()) {
1356         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1357         if (permitReceive) {
1358             context->SetReceivcPermitCheck(true);
1359         } else {
1360             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1361             return -E_SECURITY_OPTION_CHECK_ERROR;
1362         }
1363     }
1364     return errCode;
1365 }
1366 
1367 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1368 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1369 {
1370     // When mtu less than 30k, we send data with bluetooth
1371     // In order not to block the bluetooth channel, we don't send notify packet here
1372     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1373         return;
1374     }
1375     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1376 
1377     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1378     if (ackMessage == nullptr) {
1379         LOGE("[DataSync][ResetWatchDog] new message failed");
1380         return;
1381     }
1382 
1383     DataAckPacket ack;
1384     ack.SetData(data);
1385     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1386     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1387     int errCode = ackMessage->SetCopiedObject(ack);
1388     if (errCode != E_OK) {
1389         delete ackMessage;
1390         ackMessage = nullptr;
1391         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1392         return;
1393     }
1394     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1395         context->GetSequenceId(), context->GetResponseSessionId());
1396 
1397     errCode = Send(context, ackMessage, nullptr, 0);
1398     if (errCode != E_OK) {
1399         delete ackMessage;
1400         ackMessage = nullptr;
1401         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1402             STR_MASK(GetDeviceId()));
1403     } else {
1404         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1405             STR_MASK(GetDeviceId()));
1406     }
1407 }
1408 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1409 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1410 {
1411     if (context == nullptr) {
1412         return -E_INVALID_ARGS;
1413     }
1414     int errCode = RemoveDeviceDataIfNeed(context);
1415     if (errCode != E_OK) {
1416         context->SetTaskErrCode(errCode);
1417         return errCode;
1418     }
1419     SyncEntry syncData;
1420     errCode = GetReSendData(syncData, context, reSendInfo);
1421     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1422         return errCode;
1423     }
1424     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1425     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1426     if (packet == nullptr) {
1427         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1428         return -E_OUT_OF_MEMORY;
1429     }
1430     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1431     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1432     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1433         LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1434         return errCode;
1435     }
1436     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1437         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1438         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1439         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1440             dataTime.deleteEndTime += 1;
1441         }
1442         if (reSendInfo.end > reSendInfo.start) {
1443             dataTime.endTime += 1;
1444         }
1445         errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1446         if (errCode != E_OK) {
1447             LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1448         }
1449     }
1450     return errCode;
1451 }
1452 
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1453 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1454     uint32_t sessionId, uint32_t sequenceId)
1455 {
1456     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1457     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1458     if (message == nullptr) {
1459         LOGE("[DataSync][SendDataPacket] new message error");
1460         delete packet;
1461         packet = nullptr;
1462         return -E_OUT_OF_MEMORY;
1463     }
1464     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1465     int errCode = message->SetExternalObject(packet);
1466     if (errCode != E_OK) {
1467         delete packet;
1468         packet = nullptr;
1469         delete message;
1470         message = nullptr;
1471         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1472         return errCode;
1473     }
1474     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1475     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1476         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1477     };
1478     errCode = Send(context, message, handler, packetLen);
1479     if (errCode != E_OK) {
1480         delete message;
1481         message = nullptr;
1482     }
1483     return errCode;
1484 }
1485 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1486 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1487 {
1488     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1489     int mode = SyncOperation::TransferSyncMode(inMode);
1490     // for pull mode it just need to get data, no need to send data.
1491     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1492         return E_OK;
1493     }
1494     if (context->GetSendPermitCheck()) {
1495         return E_OK;
1496     }
1497     bool isPermitSync = true;
1498     std::string deviceId = context->GetDeviceId();
1499     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1500     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1501         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1502     }
1503     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1504         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1505     if (isPermitSync) {
1506         context->SetSendPermitCheck(true);
1507         return E_OK;
1508     }
1509     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1510         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1511         return -E_SECURITY_OPTION_CHECK_ERROR;
1512     }
1513     if (mode == SyncModeType::RESPONSE_PULL) {
1514         SyncEntry syncData;
1515         (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1516         return -E_SECURITY_OPTION_CHECK_ERROR;
1517     }
1518     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1519         return -E_SECURITY_OPTION_CHECK_ERROR;
1520     }
1521     return E_OK;
1522 }
1523 
GetLabel() const1524 std::string SingleVerDataSync::GetLabel() const
1525 {
1526     return label_;
1527 }
1528 
GetDeviceId() const1529 std::string SingleVerDataSync::GetDeviceId() const
1530 {
1531     return deviceId_;
1532 }
1533 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1534 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1535 {
1536     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1537     if (packet == nullptr) {
1538         LOGE("[WaterMarkErrHandle] get packet object failed");
1539         return -E_INVALID_ARGS;
1540     }
1541     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1542     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1543     WaterMark peerMark = 0;
1544     WaterMark deletedMark = 0;
1545     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1546     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1547         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1548     }
1549     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1550         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1551         context->SetReceiveWaterMarkErr(true);
1552         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1553         return true;
1554     }
1555     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1556         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1557             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1558             peerMark);
1559         context->SetReceiveWaterMarkErr(true);
1560         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1561         return true;
1562     }
1563     return false;
1564 }
1565 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1566 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1567 {
1568     auto *packet = message->GetObject<DataRequestPacket>();
1569     if (packet == nullptr) {
1570         return -E_INVALID_ARGS;
1571     }
1572     if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1573         return E_OK;
1574     }
1575     auto query = packet->GetQuery();
1576     std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1577     if (!schemaSyncStatus.second) {
1578         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1579         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1580         return -E_NEED_ABILITY_SYNC;
1581     }
1582     if (!schemaSyncStatus.first) {
1583         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1584         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1585         return -E_SCHEMA_MISMATCH;
1586     }
1587     return E_OK;
1588 }
1589 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1590 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1591 {
1592     int mode = SyncOperation::TransferSyncMode(inMode);
1593     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1594         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1595         return;
1596     }
1597     if (storage_ == nullptr) {
1598         LOGE("RemotePushFinished fail, storage is nullptr.");
1599         return;
1600     }
1601 
1602     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1603         storage_->NotifyRemotePushFinished(deviceId_);
1604     }
1605 }
1606 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1607 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1608     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1609 {
1610     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1611     WaterMark localMark = 0;
1612     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1613     ackPacket.SetRecvCode(recvCode);
1614     UpdateWaterMark isUpdateWaterMark;
1615     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(
1616         SyncOperation::GetSyncType(packet->GetMode()), packet->GetData(), isUpdateWaterMark);
1617     bool isPacketWaterLower = false;
1618     // send ack packet
1619     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1620         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1621     } else if (recvCode != WATER_MARK_INVALID) {
1622         WaterMark mark = 0;
1623         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1624         if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetLocalWaterMark() < mark) {
1625             LOGI("[DataSync][SetAckPacket] packetLocalMark=%" PRIu64 ",lockMark=%" PRIu64, packet->GetLocalWaterMark(),
1626                 mark);
1627             isPacketWaterLower = true;
1628             dataTime.endTime = packet->GetLocalWaterMark();
1629             mark = packet->GetLocalWaterMark();
1630         }
1631         ackPacket.SetData(mark);
1632     }
1633     std::vector<uint64_t> reserved {localMark};
1634     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1635     uint64_t packetId = 0;
1636     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1637         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1638     }
1639     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1640         reserved.push_back(packetId);
1641     }
1642     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1643     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1644         WaterMark deletedPeerMark;
1645         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1646         if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetDeletedWaterMark() < deletedPeerMark) {
1647             LOGI("[DataSync][SetAckPacket] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64,
1648                 packet->GetDeletedWaterMark(), deletedPeerMark);
1649             isPacketWaterLower = true;
1650             dataTime.deleteEndTime = packet->GetDeletedWaterMark();
1651             deletedPeerMark = packet->GetDeletedWaterMark();
1652         }
1653         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1654     }
1655     ackPacket.SetReserved(reserved);
1656     ackPacket.SetVersion(version);
1657     if (isPacketWaterLower) {
1658         UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1659     }
1660 }
1661 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1662 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1663     DataSyncReSendInfo reSendInfo)
1664 {
1665     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1666     if (mode == SyncModeType::PULL) {
1667         return E_OK;
1668     }
1669     ContinueToken token = nullptr;
1670     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1671     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1672         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1673     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1674     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1675     int errCode;
1676     if (curType != SyncType::QUERY_SYNC_TYPE) {
1677         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1678             reSendDataSizeInfo);
1679     } else {
1680         QuerySyncObject queryObj = context->GetQuery();
1681         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1682             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1683     }
1684     if (token != nullptr) {
1685         storage_->ReleaseContinueToken(token);
1686     }
1687     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1688         context->SetTaskErrCode(errCode);
1689         return errCode;
1690     }
1691     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1692         return errCode;
1693     }
1694     SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1695         DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1696 
1697     int innerCode = InterceptData(syncData);
1698     if (innerCode != E_OK) {
1699         context->SetTaskErrCode(innerCode);
1700         return innerCode;
1701     }
1702 
1703     bool needCompressOnSync = false;
1704     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1705     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1706     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1707     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1708         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1709             { remoteAlgo, version });
1710         if (compressCode != E_OK) {
1711             return compressCode;
1712         }
1713     }
1714     return errCode;
1715 }
1716 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1717 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1718 {
1719     if (context == nullptr) {
1720         LOGE("[SingleVerDataSync][RemoveDeviceDataIfNeed] context is nullptr.");
1721         return -E_INVALID_ARGS;
1722     }
1723     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1724         return E_OK;
1725     }
1726     uint64_t clearRemoteDataMark = 0;
1727     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1728     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1729     if (clearRemoteDataMark == 0) {
1730         return E_OK;
1731     }
1732     int errCode = E_OK;
1733     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1734         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1735         if (errCode != E_OK) {
1736             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1737             return errCode;
1738         }
1739     }
1740     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1741         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1742         if (errCode != E_OK) {
1743             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1744             return errCode;
1745         }
1746     }
1747     return E_OK;
1748 }
1749 
UpdateMtuSize()1750 void SingleVerDataSync::UpdateMtuSize()
1751 {
1752     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1753 }
1754 
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1755 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1756     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1757 {
1758     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1759     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1760     WaterMark peerMark = 0;
1761     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1762         peerMark);
1763     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1764     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1765     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1766     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1767     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1768         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1769         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1770         packet->SetLastSequence();
1771     }
1772     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1773         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1774         sendCode = SEND_FINISHED;
1775     }
1776     packet->SetData(syncData.entries);
1777     packet->SetCompressData(syncData.compressedEntries);
1778     packet->SetBasicInfo(sendCode, version, reSendMode);
1779     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1780     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1781     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1782         packet->SetEndWaterMark(context->GetEndMark());
1783         packet->SetQuery(context->GetQuery());
1784     }
1785     packet->SetQueryId(context->GetQuerySyncId());
1786     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1787         std::vector<uint64_t> reserved {reSendInfo.packetId};
1788         packet->SetReserved(reserved);
1789     }
1790     if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1791         // resend pull packet dont set compress type
1792         return;
1793     }
1794     bool needCompressOnSync = false;
1795     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1796     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1797     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1798     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1799         packet->SetCompressDataMark();
1800         packet->SetCompressAlgo(curAlgo);
1801     }
1802     FillRequestReSendPacketV2(context, packet);
1803 }
1804 
FillRequestReSendPacketV2(const SingleVerSyncTaskContext * context,DataRequestPacket * packet)1805 void SingleVerDataSync::FillRequestReSendPacketV2(const SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1806 {
1807     if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1808         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1809         uint32_t total = 0u;
1810         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1811         packet->SetTotalDataCount(total);
1812     }
1813 }
1814 
GetDataSizeSpecInfo(size_t packetSize)1815 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1816 {
1817     bool needCompressOnSync = false;
1818     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1819     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1820     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1821         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1822     return {blockSize, packetSize};
1823 }
1824 
InterceptData(SyncEntry & syncEntry)1825 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1826 {
1827     if (storage_ == nullptr) {
1828         LOGE("Invalid DB. Can not intercept data.");
1829         return -E_INVALID_DB;
1830     }
1831 
1832     // GetLocalDeviceName get local device ID.
1833     // GetDeviceId get remote device ID.
1834     // If intercept data fail, entries will be released.
1835     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1836 }
1837 
ControlCmdStart(SingleVerSyncTaskContext * context)1838 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1839 {
1840     if (context == nullptr) {
1841         return -E_INVALID_ARGS;
1842     }
1843     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1844     if (subManager == nullptr) {
1845         return -E_INVALID_ARGS;
1846     }
1847     int errCode = ControlCmdStartCheck(context);
1848     if (errCode != E_OK) {
1849         return errCode;
1850     }
1851     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1852     if (packet == nullptr) {
1853         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1854         return -E_OUT_OF_MEMORY;
1855     }
1856     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1857         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1858         if (errCode != E_OK) {
1859             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1860             delete packet;
1861             packet = nullptr;
1862             return errCode;
1863         }
1864     }
1865     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1866     errCode = SendControlPacket(packet, context);
1867     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1868         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1869     }
1870     return errCode;
1871 }
1872 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1873 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1874 {
1875     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1876     if (packet == nullptr) {
1877         return -E_INVALID_ARGS;
1878     }
1879     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1880         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1881     int errCode = ControlCmdRequestRecvPre(context, message);
1882     if (errCode != E_OK) {
1883         return errCode;
1884     }
1885     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1886         errCode = SubscribeRequestRecv(context, message);
1887     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1888         errCode = UnsubscribeRequestRecv(context, message);
1889     }
1890     return errCode;
1891 }
1892 
UpdatePeerWaterMarkInner(const DataRequestPacket & packet,const SyncTimeRange & dataTime,const UpdateWaterMark & isUpdateWaterMark,const SingleVerSyncTaskContext * context)1893 void SingleVerDataSync::UpdatePeerWaterMarkInner(const DataRequestPacket &packet, const SyncTimeRange &dataTime,
1894     const UpdateWaterMark &isUpdateWaterMark, const SingleVerSyncTaskContext *context)
1895 {
1896     SyncType curType = SyncOperation::GetSyncType(packet.GetMode());
1897     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1898         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1899     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet.IsNeedUpdateWaterMark()) {
1900         UpdateQueryPeerWaterMark(curType, packet.GetQueryId(), dataTime, context, isUpdateWaterMark);
1901     }
1902 }
1903 } // namespace DistributedDB
1904