• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 }
44 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)45 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
46     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
47 {
48     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
49         return -E_INVALID_ARGS;
50     }
51     storage_ = static_cast<SyncGenericInterface *>(inStorage);
52     communicateHandle_ = inCommunicateHandle;
53     metadata_ = inMetadata;
54     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
55     std::vector<uint8_t> label = inStorage->GetIdentifier();
56     label.resize(3); // only show 3 Bytes enough
57     label_ = DBCommon::VectorToHexString(label);
58     deviceId_ = deviceId;
59     msgSchedule_.Initialize(label_, deviceId_);
60     return E_OK;
61 }
62 
SyncStart(int mode,SingleVerSyncTaskContext * context)63 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
64 {
65     std::lock_guard<std::mutex> lock(lock_);
66     int errCode = CheckPermitSendData(mode, context);
67     if (errCode != E_OK) {
68         return errCode;
69     }
70     if (sessionId_ != 0) { // auto sync timeout resend
71         return ReSendData(context);
72     }
73     ResetSyncStatus(mode, context);
74     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
75     int tmpMode = SyncOperation::TransferSyncMode(mode);
76     if (tmpMode == SyncModeType::PUSH) {
77         errCode = PushStart(context);
78     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79         errCode = PushPullStart(context);
80     } else if (tmpMode == SyncModeType::PULL) {
81         errCode = PullRequestStart(context);
82     } else {
83         SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
84         errCode = PullResponseStart(context);
85     }
86     if (context->IsSkipTimeoutError(errCode)) {
87         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
88         // just return to avoid higher pressure for send.
89         return E_OK;
90     }
91     if (errCode != E_OK) {
92         LOGE("[DataSync] SendStart errCode=%d", errCode);
93         return errCode;
94     }
95     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
96         LOGE("wait for recv finished for push and pull mode");
97         return -E_EKEYREVOKED;
98     }
99     return InnerSyncStart(context);
100 }
101 
InnerSyncStart(SingleVerSyncTaskContext * context)102 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
103 {
104     int errCode = CheckPermitSendData(mode_, context);
105     if (errCode != E_OK) {
106         return errCode;
107     }
108     while (true) {
109         if (windowSize_ <= 0 || isAllDataHasSent_) {
110             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
111                 label_.c_str(), STR_MASK(deviceId_));
112             return E_OK;
113         }
114         int mode = SyncOperation::TransferSyncMode(mode_);
115         if (mode == SyncModeType::PULL) {
116             LOGE("[DataSync] unexpected error");
117             return -E_INVALID_ARGS;
118         }
119         context->IncSequenceId();
120         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
121             errCode = PushStart(context);
122         } else {
123             errCode = PullResponseStart(context);
124         }
125         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
126             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
127             isAllDataHasSent_ = true;
128             return -E_EKEYREVOKED;
129         }
130         if (context->IsSkipTimeoutError(errCode)) {
131             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
132             // just return to avoid higher pressure for send.
133             return E_OK;
134         }
135         if (errCode != E_OK) {
136             LOGE("[DataSync] InnerSend errCode=%d", errCode);
137             return errCode;
138         }
139     }
140     return E_OK;
141 }
142 
InnerClearSyncStatus()143 void SingleVerDataSync::InnerClearSyncStatus()
144 {
145     sessionId_ = 0;
146     reSendMap_.clear();
147     windowSize_ = 0;
148     maxSequenceIdHasSent_ = 0;
149     isAllDataHasSent_ = false;
150 }
151 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)152 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
153 {
154     if (message == nullptr) {
155         LOGE("[DataSync] AckRecv message nullptr");
156         return -E_INVALID_ARGS;
157     }
158     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
159     if (packet == nullptr) {
160         return -E_INVALID_ARGS;
161     }
162     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
163     uint32_t sessionId = message->GetSessionId();
164     uint32_t sequenceId = message->GetSequenceId();
165 
166     std::lock_guard<std::mutex> lock(lock_);
167     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
168         windowSize_, label_.c_str(), STR_MASK(deviceId_));
169     if (sessionId != sessionId_) {
170         LOGI("[DataSync] ignore ack,sessionId is different");
171         return E_OK;
172     }
173     Timestamp lastQueryTime = 0;
174     if (reSendMap_.count(sequenceId) != 0) {
175         lastQueryTime = reSendMap_[sequenceId].end;
176         reSendMap_.erase(sequenceId);
177         windowSize_++;
178     } else {
179         LOGI("[DataSync] ack seqId not in map");
180         return E_OK;
181     }
182     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
183         Timestamp dbLastQueryTime = 0;
184         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
185             context->GetTargetUserId(), dbLastQueryTime);
186         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
187             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
188                 context->GetTargetUserId(), lastQueryTime);
189         }
190         if (errCode != E_OK) {
191             return errCode;
192         }
193     }
194     if (!isAllDataHasSent_) {
195         return InnerSyncStart(context);
196     } else if (reSendMap_.empty()) {
197         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198         InnerClearSyncStatus();
199         return -E_FINISHED;
200     }
201     return E_OK;
202 }
203 
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206     std::lock_guard<std::mutex> lock(lock_);
207     InnerClearSyncStatus();
208 }
209 
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212     if (reSendMap_.empty()) {
213         LOGI("[DataSync] ReSend map empty");
214         return -E_INTERNAL_ERROR;
215     }
216     uint32_t sequenceId = reSendMap_.begin()->first;
217     ReSendInfo reSendInfo = reSendMap_.begin()->second;
218     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221         windowSize_, label_.c_str(), STR_MASK(deviceId_));
222     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224     return ReSend(context, dataReSendInfo);
225 }
226 
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229     std::string deviceInfo;
230     if (communicateHandle_ != nullptr) {
231         communicateHandle_->GetLocalIdentity(deviceInfo);
232     }
233     return deviceInfo;
234 }
235 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237     uint32_t packetLen)
238 {
239     bool startFeedDogRet = false;
240     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244     }
245     SendConfig sendConfig;
246     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247     sendConfig.isRetryTask = context->IsRetryTask();
248     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
249     if (errCode != E_OK) {
250         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
251         if (startFeedDogRet) {
252             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
253         }
254     }
255     return errCode;
256 }
257 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)258 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
259     std::vector<SendDataItem> &outData, size_t packetSize)
260 {
261     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
262     if (performance != nullptr) {
263         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
264     }
265     // start a watch dog before get data
266     // it will send ack util get data finished
267     context->StartFeedDogForGetData(context->GetResponseSessionId());
268     int errCode = GetData(context, packetSize, outData);
269     context->StopFeedDogForGetData();
270     if (performance != nullptr) {
271         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
272     }
273     if (!outData.empty()) {
274         SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
275     }
276     return errCode;
277 }
278 
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)279 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
280 {
281     int errCode;
282     UpdateMtuSize();
283     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
284         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
285         LOGI("[DataSync][GetData] resend data");
286         errCode = GetUnsyncData(context, outData, packetSize);
287     } else {
288         ContinueToken token;
289         context->GetContinueToken(token);
290         if (token == nullptr) {
291             errCode = GetUnsyncData(context, outData, packetSize);
292         } else {
293             LOGD("[DataSync][GetData] get data from token");
294             // if there is data to send, read out data, and update local watermark, send data
295             errCode = GetNextUnsyncData(context, outData, packetSize);
296         }
297     }
298     if (errCode == -E_UNFINISHED) {
299         LOGD("[DataSync][GetData] not finished.");
300     }
301     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
303         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
304     }
305     return errCode;
306 }
307 
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)308 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
309 {
310     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
311     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
312         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
313     bool needCompressOnSync = false;
314     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
315     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
316     int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
317     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
318         context->SetTaskErrCode(errCode);
319         return errCode;
320     }
321 
322     int innerCode = InterceptData(syncOutData);
323     if (innerCode != E_OK) {
324         context->SetTaskErrCode(innerCode);
325         return innerCode;
326     }
327 
328     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
329     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
330         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
331             { remoteAlgo, version });
332         if (compressCode != E_OK) {
333             return compressCode;
334         }
335     }
336     return errCode;
337 }
338 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)339 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
340     size_t packetSize)
341 {
342     SyncTimeRange waterRange;
343     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
344     WaterMark startMark = 0;
345     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
346     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
347     if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
348         return E_OK;
349     }
350     waterRange.beginTime = startMark;
351     if (curType == SyncType::QUERY_SYNC_TYPE) {
352         WaterMark deletedStartMark = 0;
353         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
354         Timestamp lastQueryTimestamp = 0;
355         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
356             context->GetTargetUserId(), lastQueryTimestamp);
357         if (errCode != E_OK) {
358             return errCode;
359         }
360         waterRange.deleteBeginTime = deletedStartMark;
361         waterRange.lastQueryTime = lastQueryTimestamp;
362     }
363     return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
364 }
365 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)366 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
367     DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
368 {
369     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
370     ContinueToken token = nullptr;
371     context->GetContinueToken(token);
372     if (token != nullptr) {
373         storage_->ReleaseContinueToken(token);
374     }
375     int errCode;
376     if (curType != SyncType::QUERY_SYNC_TYPE) {
377         errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
378             syncDataSizeInfo);
379     } else {
380         QuerySyncObject queryObj = context->GetQuery();
381         errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
382     }
383     context->SetContinueToken(token);
384     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
385         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
386     }
387     return errCode;
388 }
389 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)390 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
391     size_t packetSize)
392 {
393     ContinueToken token;
394     context->GetContinueToken(token);
395     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
396     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
397     context->SetContinueToken(token);
398     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
399         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
400     }
401     return errCode;
402 }
403 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)404 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
405     SyncType curType, const QuerySyncObject &query)
406 {
407     if (inData.empty()) {
408         return E_OK;
409     }
410     SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
411     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
412     if (performance != nullptr) {
413         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
414     }
415     const auto localDeviceName = GetLocalDeviceName();
416     const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
417     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
418     std::vector<SendDataItem> copyData = inData;
419     int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
420     if (errCode != E_OK) {
421         LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
422         return errCode;
423     }
424     // query only support prefix key and don't have query in packet in 104 version
425     errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
426     if (performance != nullptr) {
427         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428     }
429     if (errCode != E_OK) {
430         LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
431     }
432     return errCode;
433 }
434 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437     mode_ = inMode;
438     maxSequenceIdHasSent_ = 0;
439     isAllDataHasSent_ = false;
440     context->ReSetSequenceId();
441     reSendMap_.clear();
442     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443         windowSize_ = LOW_VERSION_WINDOW_SIZE;
444     } else {
445         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446     }
447     int mode = SyncOperation::TransferSyncMode(inMode);
448     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449         sessionId_ = context->GetRequestSessionId();
450     } else {
451         sessionId_ = context->GetResponseSessionId();
452     }
453 }
454 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458     WaterMark localMark = 0;
459     WaterMark deleteMark = 0;
460     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461     GetLocalDeleteSyncWaterMark(context, deleteMark);
462     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468     WaterMark localMark = 0;
469     int errCode = E_OK;
470     const std::string &deviceId = context->GetDeviceId();
471     std::string queryId = context->GetQuerySyncId();
472     if (syncType != SyncType::QUERY_SYNC_TYPE) {
473         if (isCheckBeforUpdate) {
474             GetLocalWaterMark(syncType, queryId, context, localMark);
475             if (localMark >= dataTimeRange.endTime) {
476                 return E_OK;
477             }
478         }
479         errCode = metadata_->SaveLocalWaterMark(deviceId, context->GetTargetUserId(), dataTimeRange.endTime);
480     } else {
481         bool isNeedUpdateMark = true;
482         bool isNeedUpdateDeleteMark = true;
483         if (isCheckBeforUpdate) {
484             WaterMark deleteDataWaterMark = 0;
485             GetLocalWaterMark(syncType, queryId, context, localMark);
486             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487             if (localMark >= dataTimeRange.endTime) {
488                 isNeedUpdateMark = false;
489             }
490             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491                 isNeedUpdateDeleteMark = false;
492             }
493         }
494         if (isNeedUpdateMark) {
495             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, context->GetTargetUserId(),
497                 dataTimeRange.endTime);
498             if (errCode != E_OK) {
499                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
500                 return errCode;
501             }
502         }
503         if (isNeedUpdateDeleteMark) {
504             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
505                 dataTimeRange.deleteEndTime);
506             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(),
507                 dataTimeRange.deleteEndTime);
508         }
509     }
510     if (errCode != E_OK) {
511         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
512     }
513     return errCode;
514 }
515 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,const DeviceID & userId,WaterMark & waterMark) const516 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
517     const DeviceID &deviceId, const DeviceID &userId, WaterMark &waterMark) const
518 {
519     if (syncType != SyncType::QUERY_SYNC_TYPE) {
520         metadata_->GetPeerWaterMark(deviceId, userId, waterMark);
521         return;
522     }
523     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, userId, waterMark);
524 }
525 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,const DeviceID & userId,WaterMark & waterMark)526 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, const DeviceID &userId,
527     WaterMark &waterMark)
528 {
529     metadata_->GetRecvDeleteSyncWaterMark(deviceId, userId, waterMark);
530 }
531 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const532 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
533     WaterMark &waterMark) const
534 {
535     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), waterMark,
536         context->IsAutoLiftWaterMark());
537 }
538 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const539 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
540     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
541 {
542     if (syncType != SyncType::QUERY_SYNC_TYPE) {
543         metadata_->GetLocalWaterMark(context->GetDeviceId(), context->GetTargetUserId(), waterMark);
544         return;
545     }
546     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(), context->GetTargetUserId(),
547         waterMark, context->IsAutoLiftWaterMark());
548 }
549 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)550 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
551     WaterMark maxSendDataTime)
552 {
553     bool isNeedClearRemoteData = false;
554     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
555     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
556         uint64_t clearDeviceDataMark = 0;
557         metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearDeviceDataMark);
558         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
559     } else {
560         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
561         if (packet == nullptr) {
562             LOGE("[RemoveDeviceDataHandle] get packet object failed");
563             return -E_INVALID_ARGS;
564         }
565         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
566         WaterMark packetLocalMark = packet->GetLocalWaterMark();
567         WaterMark peerMark = 0;
568         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), context->GetTargetUserId(),
569             peerMark);
570         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
571     }
572     if (!isNeedClearRemoteData) {
573         return E_OK;
574     }
575     int errCode = E_OK;
576     if (context->IsNeedClearRemoteStaleData()) {
577         // need to clear remote device history data
578         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
579         if (errCode != E_OK) {
580             (void)SendDataAck(context, message, errCode, maxSendDataTime);
581             return errCode;
582         }
583         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
584             // avoid repeat clear in ack
585             metadata_->SaveLocalWaterMark(context->GetDeviceId(), context->GetTargetUserId(), 0);
586         }
587     }
588     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
589         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
590         if (errCode != E_OK) {
591             (void)SendDataAck(context, message, errCode, maxSendDataTime);
592             return errCode;
593         }
594     }
595     return E_OK;
596 }
597 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)598 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
599     const std::vector<uint64_t> &reserved)
600 {
601     bool isNeedClearRemoteData = false;
602     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
603     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
604     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
605         uint64_t clearDeviceDataMark = 0;
606         metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearDeviceDataMark);
607         isNeedClearRemoteData = (clearDeviceDataMark != 0);
608     } else if (reserved.empty()) {
609         WaterMark localMark = 0;
610         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
611         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
612     } else {
613         WaterMark peerMark = 0;
614         GetPeerWaterMark(curType, context->GetQuerySyncId(),
615             context->GetDeviceId(), context->GetTargetUserId(), peerMark);
616         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
617     }
618     if (!isNeedClearRemoteData) {
619         return E_OK;
620     }
621     // need to clear remote historydata
622     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
623         label_.c_str(), STR_MASK(GetDeviceId()));
624     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
625     if (errCode != E_OK) {
626         return errCode;
627     }
628     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
629         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
630     }
631     return errCode;
632 }
633 
SetSessionEndTimestamp(Timestamp end)634 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
635 {
636     sessionEndTimestamp_ = end;
637 }
638 
GetSessionEndTimestamp() const639 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
640 {
641     return sessionEndTimestamp_;
642 }
643 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)644 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
645 {
646     ReSendInfo reSendInfo;
647     reSendInfo.start = dataTimeRange.beginTime;
648     reSendInfo.end = dataTimeRange.endTime;
649     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
650     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
651     reSendInfo.packetId = context->GetPacketId();
652     maxSequenceIdHasSent_++;
653     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
654     windowSize_--;
655     ContinueToken token;
656     context->GetContinueToken(token);
657     if (token == nullptr) {
658         isAllDataHasSent_ = true;
659     }
660     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
661         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
662         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
663         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
664 }
665 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)666 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
667     SyncEntry &syncData, int sendCode, int mode)
668 {
669     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
670     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
671     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
672     WaterMark localMark = 0;
673     WaterMark peerMark = 0;
674     WaterMark deleteMark = 0;
675     bool needCompressOnSync = false;
676     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
677     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
678     std::string id = context->GetQuerySyncId();
679     GetLocalWaterMark(curType, id, context, localMark);
680     GetPeerWaterMark(curType, id, context->GetDeviceId(), context->GetTargetUserId(), peerMark);
681     GetLocalDeleteSyncWaterMark(context, deleteMark);
682     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
683         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
684         packet->SetLastSequence();
685     }
686     int tmpMode = mode;
687     if (mode == SyncModeType::RESPONSE_PULL) {
688         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
689     }
690     packet->SetData(syncData.entries);
691     packet->SetCompressData(syncData.compressedEntries);
692     packet->SetBasicInfo(sendCode, version, tmpMode);
693     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
694     packet->SetWaterMark(localMark, peerMark, deleteMark);
695     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
696         packet->SetEndWaterMark(context->GetEndMark());
697         packet->SetSessionId(context->GetRequestSessionId());
698     }
699     packet->SetQuery(context->GetQuery());
700     packet->SetQueryId(context->GetQuerySyncId());
701     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
702     // empty compress data should not mark compress
703     if (!packet->GetCompressData().empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
704         packet->SetCompressDataMark();
705         packet->SetCompressAlgo(curAlgo);
706     }
707     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
708     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
709         context->GetQuery().HasOrderBy())) {
710         packet->SetUpdateWaterMark();
711     }
712     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
713         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
714         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
715 }
716 
RequestStart(SingleVerSyncTaskContext * context,int mode)717 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
718 {
719     int errCode = QuerySyncCheck(context);
720     if (errCode != E_OK) {
721         LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
722         return errCode;
723     }
724     errCode = RemoveDeviceDataIfNeed(context);
725     if (errCode != E_OK) {
726         context->SetTaskErrCode(errCode);
727         return errCode;
728     }
729     SyncEntry syncData;
730     // get data
731     errCode = GetMatchData(context, syncData);
732     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
733 
734     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
735         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
736         return errCode;
737     }
738 
739     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
740     if (packet == nullptr) {
741         LOGE("[DataSync][PushStart] new DataRequestPacket error");
742         return -E_OUT_OF_MEMORY;
743     }
744     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
745     UpdateWaterMark isUpdateWaterMark;
746     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
747     if (errCode == E_OK) {
748         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
749     }
750     FillDataRequestPacket(packet, context, syncData, errCode, mode);
751     errCode = SendDataPacket(curType, packet, context);
752     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
753     if (performance != nullptr) {
754         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
755     }
756     if (errCode == E_OK || errCode == -E_TIMEOUT) {
757         UpdateSendInfo(dataTime, context);
758     }
759     if (errCode == E_OK) {
760         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
761             context->GetQuery().HasOrderBy())) {
762             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
763             return E_OK;
764         }
765         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
766         SaveLocalWaterMark(curType, context, tmpDataTime);
767     }
768     return errCode;
769 }
770 
PushStart(SingleVerSyncTaskContext * context)771 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
772 {
773     if (context == nullptr) {
774         return -E_INVALID_ARGS;
775     }
776     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
777     return RequestStart(context,
778         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
779 }
780 
PushPullStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
782 {
783     if (context == nullptr) {
784         return -E_INVALID_ARGS;
785     }
786     return RequestStart(context, context->GetMode());
787 }
788 
PullRequestStart(SingleVerSyncTaskContext * context)789 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
790 {
791     if (context == nullptr) {
792         return -E_INVALID_ARGS;
793     }
794     int errCode = QuerySyncCheck(context);
795     if (errCode != E_OK) {
796         return errCode;
797     }
798     errCode = RemoveDeviceDataIfNeed(context);
799     if (errCode != E_OK) {
800         context->SetTaskErrCode(errCode);
801         return errCode;
802     }
803     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
804     if (packet == nullptr) {
805         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
806         return -E_OUT_OF_MEMORY;
807     }
808     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
809     WaterMark peerMark = 0;
810     WaterMark localMark = 0;
811     WaterMark deleteMark = 0;
812     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
813         context->GetDeviceId(), context->GetTargetUserId(), peerMark);
814     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
815     GetLocalDeleteSyncWaterMark(context, deleteMark);
816     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
817     WaterMark endMark = context->GetEndMark();
818     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
819     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
820     packet->SetBasicInfo(E_OK, version, context->GetMode());
821     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
822     packet->SetWaterMark(localMark, peerMark, deleteMark);
823     packet->SetEndWaterMark(endMark);
824     packet->SetSessionId(context->GetRequestSessionId());
825     packet->SetQuery(context->GetQuery());
826     packet->SetQueryId(context->GetQuerySyncId());
827     packet->SetLastSequence();
828     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
829     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
830     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
831         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
832         label_.c_str(), STR_MASK(GetDeviceId()));
833     UpdateSendInfo(dataTime, context);
834     return SendDataPacket(syncType, packet, context);
835 }
836 
PullResponseStart(SingleVerSyncTaskContext * context)837 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
838 {
839     if (context == nullptr) {
840         return -E_INVALID_ARGS;
841     }
842     SyncEntry syncData;
843     // get data
844     int errCode = GetMatchData(context, syncData);
845     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
846         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
847             (void)SendPullResponseDataPkt(errCode, syncData, context);
848         }
849         return errCode;
850     }
851     // if send finished
852     int ackCode = E_OK;
853     ContinueToken token = nullptr;
854     context->GetContinueToken(token);
855     if (errCode == E_OK && token == nullptr) {
856         LOGD("[DataSync][PullResponse] send last frame end");
857         ackCode = SEND_FINISHED;
858     }
859     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
860     UpdateWaterMark isUpdateWaterMark;
861     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
862     if (errCode == E_OK) {
863         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
864     }
865     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
866     if (errCode == E_OK || errCode == -E_TIMEOUT) {
867         UpdateSendInfo(dataTime, context);
868     }
869     if (errCode == E_OK) {
870         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
871             context->GetQuery().HasOrderBy())) {
872             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
873             return E_OK;
874         }
875         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
876         SaveLocalWaterMark(curType, context, tmpDataTime);
877     }
878     return errCode;
879 }
880 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)881 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
882     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
883 {
884     WaterMark tmpPeerWatermark = dataTime.endTime;
885     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
886     if (isUpdateWaterMark.normalUpdateMark) {
887         tmpPeerWatermark++;
888     }
889     if (isUpdateWaterMark.deleteUpdateMark) {
890         tmpPeerDeletedWatermark++;
891     }
892     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
893 }
894 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)895 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
896     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
897 {
898     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
899         return;
900     }
901     int errCode = E_OK;
902     if (syncType != SyncType::QUERY_SYNC_TYPE) {
903         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), context->GetTargetUserId(), peerWatermark, true);
904     } else {
905         if (peerWatermark != 0) {
906             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
907             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), context->GetTargetUserId(),
908                 peerWatermark);
909             if (errCode != E_OK) {
910                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
911             }
912         }
913         if (peerDeletedWatermark != 0) {
914             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
915                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
916             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(),
917                 peerDeletedWatermark);
918         }
919     }
920     if (errCode != E_OK) {
921         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
922     }
923 }
924 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)925 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
926 {
927     uint16_t remoteCommunicatorVersion = 0;
928     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
929         -E_NOT_FOUND) {
930         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
931         return -E_VERSION_NOT_SUPPORT;
932     }
933     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
934     if (remoteCommunicatorVersion == 0) {
935         LOGI("[DataSync] set remote version 0");
936         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
937         return E_OK;
938     } else {
939         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
940         if (isControlMsg) {
941             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
942         } else {
943             (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
944         }
945         return -E_NEED_ABILITY_SYNC;
946     }
947 }
948 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)949 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
950 {
951     if (context == nullptr || message == nullptr) {
952         return -E_INVALID_ARGS;
953     }
954     auto *packet = message->GetObject<DataRequestPacket>();
955     if (packet == nullptr) {
956         return -E_INVALID_ARGS;
957     }
958     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
959         return DoAbilitySyncIfNeed(context, message);
960     }
961     int32_t sendCode = packet->GetSendCode();
962     if (sendCode == -E_VERSION_NOT_SUPPORT) {
963         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
964         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
965         return -E_WAIT_NEXT_MESSAGE;
966     }
967     // only deal with pull response packet errCode
968     if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
969         message->GetSessionId() == context->GetRequestSessionId()) {
970         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
971         return sendCode;
972     }
973     int errCode = RunPermissionCheck(context, message, packet);
974     if (errCode != E_OK) {
975         return errCode;
976     }
977     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
978         errCode = CheckSchemaStrategy(context, message);
979     }
980     if (errCode == E_OK) {
981         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
982     }
983     if (errCode != E_OK) {
984         (void)SendDataAck(context, message, errCode, 0);
985         return errCode;
986     }
987     errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
988     if (errCode != E_OK) {
989         (void)SendDataAck(context, message, errCode, 0);
990     }
991     return errCode;
992 }
993 
DataRequestRecvInner(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)994 int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, const Message *message,
995     WaterMark &pullEndWatermark)
996 {
997     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
998     if (packet == nullptr) {
999         LOGE("[DataSync][DataRequestRecv] get packet object failed");
1000         return -E_INVALID_ARGS;
1001     }
1002     const std::vector<SendDataItem> &data = packet->GetData();
1003     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1004     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
1005         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
1006         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
1007     context->SetReceiveWaterMarkErr(false);
1008     UpdateWaterMark isUpdateWaterMark;
1009     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1010     int errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1011     if (errCode != E_OK) {
1012         return errCode;
1013     }
1014     Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1015     if (WaterMarkErrHandle(curType, context, message)) {
1016         return E_OK;
1017     }
1018     GetPullEndWatermark(context, packet, pullEndWatermark);
1019     // save data first
1020     errCode = SaveData(context, data, curType,
1021         SingleVerDataSyncUtils::GetQueryFromDataRequest(*packet, *context, message->GetSessionId()));
1022     if (errCode != E_OK) {
1023         (void)SendDataAck(context, message, errCode, dataTime.endTime);
1024         return errCode;
1025     }
1026     SingleVerDataSyncUtils::UpdateSyncProcess(context, packet);
1027 
1028     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1029         pullEndWatermark = 0;
1030         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1031     } else {
1032         // if data is empty, we don't know the max timestap of this packet.
1033         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1034     }
1035     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1036         context->GetRequestSessionId());
1037     UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1038     context->RefreshSaveTime(packet->IsLastSequence());
1039     if (errCode != E_OK) {
1040         return errCode;
1041     }
1042     if (packet->GetSendCode() == SEND_FINISHED) {
1043         return -E_RECV_FINISHED;
1044     }
1045     return errCode;
1046 }
1047 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)1048 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
1049     WaterMark &pullEndWatermark)
1050 {
1051     int errCode = DataRequestRecvPre(context, message);
1052     if (errCode != E_OK) {
1053         return errCode;
1054     }
1055     return DataRequestRecvInner(context, message, pullEndWatermark);
1056 }
1057 
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1058 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1059     SingleVerSyncTaskContext *context)
1060 {
1061     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1062     if (message == nullptr) {
1063         LOGE("[DataSync][SendDataPacket] new message error");
1064         delete packet;
1065         packet = nullptr;
1066         return -E_OUT_OF_MEMORY;
1067     }
1068     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1069     int errCode = message->SetExternalObject(packet);
1070     if (errCode != E_OK) {
1071         delete packet;
1072         packet = nullptr;
1073         delete message;
1074         message = nullptr;
1075         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1076         return errCode;
1077     }
1078     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1079         context->GetSequenceId(), context->GetRequestSessionId());
1080     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1081     if (performance != nullptr) {
1082         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1083     }
1084     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1085         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1086     };
1087     errCode = Send(context, message, handler, packetLen);
1088     if (errCode != E_OK) {
1089         delete message;
1090         message = nullptr;
1091     }
1092 
1093     return errCode;
1094 }
1095 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1096 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1097     SingleVerSyncTaskContext *context)
1098 {
1099     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1100     if (packet == nullptr) {
1101         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1102         return -E_OUT_OF_MEMORY;
1103     }
1104     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1105     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1106 
1107     if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1108         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1109         uint32_t total = 0u;
1110         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1111         LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1112         packet->SetTotalDataCount(total);
1113     }
1114 
1115     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1116     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1117     if (message == nullptr) {
1118         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1119         delete packet;
1120         packet = nullptr;
1121         return -E_OUT_OF_MEMORY;
1122     }
1123     int errCode = message->SetExternalObject(packet);
1124     if (errCode != E_OK) {
1125         delete packet;
1126         packet = nullptr;
1127         delete message;
1128         message = nullptr;
1129         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1130         return errCode;
1131     }
1132     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1133         context->GetSequenceId(), context->GetResponseSessionId());
1134     SendResetWatchDogPacket(context, packetLen);
1135     errCode = Send(context, message, nullptr, packetLen);
1136     if (errCode != E_OK) {
1137         delete message;
1138         message = nullptr;
1139     }
1140     return errCode;
1141 }
1142 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1143 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1144 {
1145     (void)SendDataAck(context, message, E_OK, 0);
1146 }
1147 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1148 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1149     WaterMark maxSendDataTime)
1150 {
1151     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1152     if (packet == nullptr) {
1153         return -E_INVALID_ARGS;
1154     }
1155     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1156     if (ackMessage == nullptr) {
1157         LOGE("[DataSync][SendDataAck] new message error");
1158         return -E_OUT_OF_MEMORY;
1159     }
1160     DataAckPacket ack;
1161     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1162     int errCode = ackMessage->SetCopiedObject(ack);
1163     if (errCode != E_OK) {
1164         delete ackMessage;
1165         ackMessage = nullptr;
1166         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1167         return errCode;
1168     }
1169     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1170         message->GetSequenceId(), message->GetSessionId());
1171 
1172     errCode = Send(context, ackMessage, nullptr, 0);
1173     if (errCode != E_OK) {
1174         delete ackMessage;
1175         ackMessage = nullptr;
1176     }
1177     return errCode;
1178 }
1179 
AckPacketIdCheck(const Message * message)1180 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1181 {
1182     if (message == nullptr) {
1183         LOGE("[DataSync] AckRecv message nullptr");
1184         return false;
1185     }
1186     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1187         return true;
1188     }
1189     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1190     if (packet == nullptr) {
1191         return false;
1192     }
1193     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1194     std::lock_guard<std::mutex> lock(lock_);
1195     uint32_t sequenceId = message->GetSequenceId();
1196     if (reSendMap_.count(sequenceId) != 0) {
1197         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1198         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1199             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1200                 originalPacketId);
1201             return false;
1202         }
1203     }
1204     return true;
1205 }
1206 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1207 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1208 {
1209     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1210     if (errCode != E_OK) {
1211         return errCode;
1212     }
1213     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1214     if (packet == nullptr) {
1215         return -E_INVALID_ARGS;
1216     }
1217     int32_t recvCode = packet->GetRecvCode();
1218     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1219         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1220     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1221         LOGE("[DataSync][AckRecv] Version mismatch");
1222         return -E_VERSION_NOT_SUPPORT;
1223     }
1224 
1225     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1226         // we should ReleaseContinueToken, avoid crash
1227         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1228             STR_MASK(GetDeviceId()));
1229         context->ReleaseContinueToken();
1230         return recvCode;
1231     }
1232     uint64_t data = packet->GetData();
1233     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1234         return DealWaterMarkException(context, data, packet->GetReserved());
1235     }
1236 
1237     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1238         // data only use low 32bit
1239         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1240         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1241             STR_MASK(GetDeviceId()));
1242     }
1243 
1244     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1245         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1246             label_.c_str(), STR_MASK(GetDeviceId()));
1247         return recvCode;
1248     }
1249 
1250     // Judge if send finished
1251     ContinueToken token;
1252     context->GetContinueToken(token);
1253     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1254         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1255         return -E_NO_DATA_SEND;
1256     }
1257 
1258     // send next data
1259     return -E_SEND_DATA;
1260 }
1261 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1262 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1263     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1264 {
1265     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1266         LOGE("[SingleVerDataSync] messageId not available.");
1267         return;
1268     }
1269     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1270     if (ackMessage == nullptr) {
1271         LOGE("[DataSync][SaveDataNotify] new message failed");
1272         return;
1273     }
1274 
1275     DataAckPacket ack;
1276     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1277     ack.SetVersion(pktVersion);
1278     int errCode = ackMessage->SetCopiedObject(ack);
1279     if (errCode != E_OK) {
1280         delete ackMessage;
1281         ackMessage = nullptr;
1282         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1283         return;
1284     }
1285     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1286 
1287     errCode = Send(context, ackMessage, nullptr, 0);
1288     if (errCode != E_OK) {
1289         delete ackMessage;
1290         ackMessage = nullptr;
1291     }
1292     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1293         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1294 }
1295 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1296 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1297     WaterMark &pullEndWatermark) const
1298 {
1299     if (packet == nullptr) {
1300         return;
1301     }
1302     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1303     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1304         WaterMark endMark = packet->GetEndWaterMark();
1305         TimeOffset offset;
1306         metadata_->GetTimeOffset(context->GetDeviceId(), context->GetTargetUserId(), offset);
1307         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1308         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1309             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1310     }
1311 }
1312 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1313 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1314     const std::vector<uint64_t> &reserved)
1315 {
1316     WaterMark deletedWaterMark = 0;
1317     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1318     if (curType == SyncType::QUERY_SYNC_TYPE) {
1319         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1320             LOGE("[DataSync] get packet reserve size failed");
1321             return -E_INVALID_ARGS;
1322         }
1323         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1324     }
1325     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1326         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1327     int errCode = SaveLocalWaterMark(curType, context,
1328         {0, 0, ackWaterMark, deletedWaterMark});
1329     if (errCode != E_OK) {
1330         return errCode;
1331     }
1332     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1333     context->IncNegotiationCount();
1334     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1335     if (!context->IsNeedClearRemoteStaleData()) {
1336         return -E_RE_SEND_DATA;
1337     }
1338     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1339     if (errCode != E_OK) {
1340         return errCode;
1341     }
1342     return -E_RE_SEND_DATA;
1343 }
1344 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1345 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1346     const DataRequestPacket *packet)
1347 {
1348     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1349     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1350     if (errCode != E_OK) {
1351         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1352             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1353         }
1354         return -E_NOT_PERMIT;
1355     }
1356     const std::vector<SendDataItem> &data = packet->GetData();
1357     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1358     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1359     auto securityOption = packet->GetSecurityOption();
1360     if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1361         securityOption.securityLabel != SecurityLabel::NOT_SET) {
1362         context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1363     }
1364     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1365         !context->GetReceivcPermitCheck()) {
1366         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1367         if (permitReceive) {
1368             context->SetReceivcPermitCheck(true);
1369         } else {
1370             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1371             return -E_SECURITY_OPTION_CHECK_ERROR;
1372         }
1373     }
1374     return errCode;
1375 }
1376 
1377 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1378 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1379 {
1380     // When mtu less than 30k, we send data with bluetooth
1381     // In order not to block the bluetooth channel, we don't send notify packet here
1382     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1383         return;
1384     }
1385     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1386 
1387     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1388     if (ackMessage == nullptr) {
1389         LOGE("[DataSync][ResetWatchDog] new message failed");
1390         return;
1391     }
1392 
1393     DataAckPacket ack;
1394     ack.SetData(data);
1395     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1396     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1397     int errCode = ackMessage->SetCopiedObject(ack);
1398     if (errCode != E_OK) {
1399         delete ackMessage;
1400         ackMessage = nullptr;
1401         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1402         return;
1403     }
1404     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1405         context->GetSequenceId(), context->GetResponseSessionId());
1406 
1407     errCode = Send(context, ackMessage, nullptr, 0);
1408     if (errCode != E_OK) {
1409         delete ackMessage;
1410         ackMessage = nullptr;
1411         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1412             STR_MASK(GetDeviceId()));
1413     } else {
1414         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1415             STR_MASK(GetDeviceId()));
1416     }
1417 }
1418 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1419 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1420 {
1421     if (context == nullptr) {
1422         return -E_INVALID_ARGS;
1423     }
1424     int errCode = RemoveDeviceDataIfNeed(context);
1425     if (errCode != E_OK) {
1426         context->SetTaskErrCode(errCode);
1427         return errCode;
1428     }
1429     SyncEntry syncData;
1430     errCode = GetReSendData(syncData, context, reSendInfo);
1431     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1432         return errCode;
1433     }
1434     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1435     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1436     if (packet == nullptr) {
1437         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1438         return -E_OUT_OF_MEMORY;
1439     }
1440     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1441     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1442     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1443         LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1444         return errCode;
1445     }
1446     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1447         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1448         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1449         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1450             dataTime.deleteEndTime += 1;
1451         }
1452         if (reSendInfo.end > reSendInfo.start) {
1453             dataTime.endTime += 1;
1454         }
1455         errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1456         if (errCode != E_OK) {
1457             LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1458         }
1459     }
1460     return errCode;
1461 }
1462 
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1463 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1464     uint32_t sessionId, uint32_t sequenceId)
1465 {
1466     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1467     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1468     if (message == nullptr) {
1469         LOGE("[DataSync][SendDataPacket] new message error");
1470         delete packet;
1471         packet = nullptr;
1472         return -E_OUT_OF_MEMORY;
1473     }
1474     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1475     int errCode = message->SetExternalObject(packet);
1476     if (errCode != E_OK) {
1477         delete packet;
1478         packet = nullptr;
1479         delete message;
1480         message = nullptr;
1481         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1482         return errCode;
1483     }
1484     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1485     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1486         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1487     };
1488     errCode = Send(context, message, handler, packetLen);
1489     if (errCode != E_OK) {
1490         delete message;
1491         message = nullptr;
1492     }
1493     return errCode;
1494 }
1495 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1496 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1497 {
1498     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1499     int mode = SyncOperation::TransferSyncMode(inMode);
1500     // for pull mode it just need to get data, no need to send data.
1501     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1502         return E_OK;
1503     }
1504     if (context->GetSendPermitCheck()) {
1505         return E_OK;
1506     }
1507     bool isPermitSync = true;
1508     std::string deviceId = context->GetDeviceId();
1509     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1510     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1511         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1512     }
1513     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1514         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1515     if (isPermitSync) {
1516         context->SetSendPermitCheck(true);
1517         return E_OK;
1518     }
1519     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1520         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1521         return -E_SECURITY_OPTION_CHECK_ERROR;
1522     }
1523     if (mode == SyncModeType::RESPONSE_PULL) {
1524         SyncEntry syncData;
1525         (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1526         return -E_SECURITY_OPTION_CHECK_ERROR;
1527     }
1528     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1529         return -E_SECURITY_OPTION_CHECK_ERROR;
1530     }
1531     return E_OK;
1532 }
1533 
GetLabel() const1534 std::string SingleVerDataSync::GetLabel() const
1535 {
1536     return label_;
1537 }
1538 
GetDeviceId() const1539 std::string SingleVerDataSync::GetDeviceId() const
1540 {
1541     return deviceId_;
1542 }
1543 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1544 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1545 {
1546     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1547     if (packet == nullptr) {
1548         LOGE("[WaterMarkErrHandle] get packet object failed");
1549         return -E_INVALID_ARGS;
1550     }
1551     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1552     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1553     WaterMark peerMark = 0;
1554     WaterMark deletedMark = 0;
1555     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), context->GetTargetUserId(), peerMark);
1556     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1557         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), deletedMark);
1558     }
1559     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1560         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1561         context->SetReceiveWaterMarkErr(true);
1562         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1563         return true;
1564     }
1565     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1566         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1567             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1568             peerMark);
1569         context->SetReceiveWaterMarkErr(true);
1570         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1571         return true;
1572     }
1573     return false;
1574 }
1575 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1576 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1577 {
1578     auto *packet = message->GetObject<DataRequestPacket>();
1579     if (packet == nullptr) {
1580         return -E_INVALID_ARGS;
1581     }
1582     if (metadata_->IsAbilitySyncFinish(deviceId_, context->GetTargetUserId())) {
1583         return E_OK;
1584     }
1585     auto query = packet->GetQuery();
1586     std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1587     if (!schemaSyncStatus.second) {
1588         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1589         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1590         return -E_NEED_ABILITY_SYNC;
1591     }
1592     if (!schemaSyncStatus.first) {
1593         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1594         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1595         return -E_SCHEMA_MISMATCH;
1596     }
1597     return E_OK;
1598 }
1599 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1600 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1601 {
1602     int mode = SyncOperation::TransferSyncMode(inMode);
1603     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1604         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1605         return;
1606     }
1607     if (storage_ == nullptr) {
1608         LOGE("RemotePushFinished fail, storage is nullptr.");
1609         return;
1610     }
1611 
1612     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1613         storage_->NotifyRemotePushFinished(deviceId_);
1614     }
1615 }
1616 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1617 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1618     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1619 {
1620     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1621     WaterMark localMark = 0;
1622     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1623     ackPacket.SetRecvCode(recvCode);
1624     WaterMark mark = 0;
1625     GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), context->GetTargetUserId(), mark);
1626     WaterMark deletedPeerMark = 0;
1627     GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), context->GetTargetUserId(), deletedPeerMark);
1628     SyncTimeRange dataTime = {.endTime = mark, .deleteEndTime = deletedPeerMark};
1629     bool isPacketWaterLower = false;
1630     // send ack packet
1631     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1632         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1633     } else if (recvCode != WATER_MARK_INVALID) {
1634         if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetLocalWaterMark() < mark) {
1635             LOGI("[DataSync][SetAckPacket] packetLocalMark=%" PRIu64 ",lockMark=%" PRIu64, packet->GetLocalWaterMark(),
1636                 mark);
1637             isPacketWaterLower = true;
1638             dataTime.endTime = packet->GetLocalWaterMark();
1639             mark = packet->GetLocalWaterMark();
1640         }
1641         ackPacket.SetData(mark);
1642     }
1643     std::vector<uint64_t> reserved {localMark};
1644     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1645     uint64_t packetId = 0;
1646     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1647         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1648     }
1649     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1650         reserved.push_back(packetId);
1651     }
1652     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1653     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1654         if (recvCode == -E_NEED_ABILITY_SYNC && packet->GetDeletedWaterMark() < deletedPeerMark) {
1655             LOGI("[DataSync][SetAckPacket] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64,
1656                 packet->GetDeletedWaterMark(), deletedPeerMark);
1657             isPacketWaterLower = true;
1658             dataTime.deleteEndTime = packet->GetDeletedWaterMark();
1659             deletedPeerMark = packet->GetDeletedWaterMark();
1660         }
1661         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1662     }
1663     ackPacket.SetReserved(reserved);
1664     ackPacket.SetVersion(version);
1665     UpdateWaterMark isUpdateWaterMark = {true, true};
1666     if (isPacketWaterLower) {
1667         UpdatePeerWaterMarkInner(*packet, dataTime, isUpdateWaterMark, context);
1668     }
1669 }
1670 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1671 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1672     DataSyncReSendInfo reSendInfo)
1673 {
1674     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1675     if (mode == SyncModeType::PULL) {
1676         return E_OK;
1677     }
1678     ContinueToken token = nullptr;
1679     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1680     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1681         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1682     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1683     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1684     int errCode;
1685     if (curType != SyncType::QUERY_SYNC_TYPE) {
1686         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1687             reSendDataSizeInfo);
1688     } else {
1689         QuerySyncObject queryObj = context->GetQuery();
1690         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1691             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1692     }
1693     if (token != nullptr) {
1694         storage_->ReleaseContinueToken(token);
1695     }
1696     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1697         context->SetTaskErrCode(errCode);
1698         return errCode;
1699     }
1700     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1701         return errCode;
1702     }
1703     SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1704         DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1705 
1706     int innerCode = InterceptData(syncData);
1707     if (innerCode != E_OK) {
1708         context->SetTaskErrCode(innerCode);
1709         return innerCode;
1710     }
1711 
1712     bool needCompressOnSync = false;
1713     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1714     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1715     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1716     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1717         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1718             { remoteAlgo, version });
1719         if (compressCode != E_OK) {
1720             return compressCode;
1721         }
1722     }
1723     return errCode;
1724 }
1725 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1726 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1727 {
1728     if (context == nullptr) {
1729         LOGE("[SingleVerDataSync][RemoveDeviceDataIfNeed] context is nullptr.");
1730         return -E_INVALID_ARGS;
1731     }
1732     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1733         return E_OK;
1734     }
1735     uint64_t clearRemoteDataMark = 0;
1736     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1737     metadata_->GetRemoveDataMark(context->GetDeviceId(), context->GetTargetUserId(), clearRemoteDataMark);
1738     if (clearRemoteDataMark == 0) {
1739         return E_OK;
1740     }
1741     int errCode = E_OK;
1742     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1743         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1744         if (errCode != E_OK) {
1745             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1746             return errCode;
1747         }
1748     }
1749     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1750         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId(), context->GetTargetUserId());
1751         if (errCode != E_OK) {
1752             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1753             return errCode;
1754         }
1755     }
1756     return E_OK;
1757 }
1758 
UpdateMtuSize()1759 void SingleVerDataSync::UpdateMtuSize()
1760 {
1761     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1762 }
1763 
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1764 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1765     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1766 {
1767     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1768     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1769     WaterMark peerMark = 0;
1770     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), context->GetTargetUserId(),
1771         peerMark);
1772     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1773     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1774     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1775     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1776     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1777         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1778         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1779         packet->SetLastSequence();
1780     }
1781     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1782         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1783         sendCode = SEND_FINISHED;
1784     }
1785     packet->SetData(syncData.entries);
1786     packet->SetCompressData(syncData.compressedEntries);
1787     packet->SetBasicInfo(sendCode, version, reSendMode);
1788     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1789     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1790     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1791         packet->SetEndWaterMark(context->GetEndMark());
1792         packet->SetQuery(context->GetQuery());
1793     }
1794     packet->SetQueryId(context->GetQuerySyncId());
1795     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1796         std::vector<uint64_t> reserved {reSendInfo.packetId};
1797         packet->SetReserved(reserved);
1798     }
1799     if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1800         // resend pull packet dont set compress type
1801         return;
1802     }
1803     bool needCompressOnSync = false;
1804     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1805     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1806     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1807     if (!packet->GetCompressData().empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1808         packet->SetCompressDataMark();
1809         packet->SetCompressAlgo(curAlgo);
1810     }
1811     FillRequestReSendPacketV2(context, packet);
1812 }
1813 
FillRequestReSendPacketV2(const SingleVerSyncTaskContext * context,DataRequestPacket * packet)1814 void SingleVerDataSync::FillRequestReSendPacketV2(const SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1815 {
1816     if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1817         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1818         uint32_t total = 0u;
1819         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1820         packet->SetTotalDataCount(total);
1821     }
1822 }
1823 
GetDataSizeSpecInfo(size_t packetSize)1824 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1825 {
1826     bool needCompressOnSync = false;
1827     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1828     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1829     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1830         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1831     return {blockSize, packetSize};
1832 }
1833 
InterceptData(SyncEntry & syncEntry)1834 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1835 {
1836     if (storage_ == nullptr) {
1837         LOGE("Invalid DB. Can not intercept data.");
1838         return -E_INVALID_DB;
1839     }
1840 
1841     // GetLocalDeviceName get local device ID.
1842     // GetDeviceId get remote device ID.
1843     // If intercept data fail, entries will be released.
1844     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1845 }
1846 
ControlCmdStart(SingleVerSyncTaskContext * context)1847 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1848 {
1849     if (context == nullptr) {
1850         return -E_INVALID_ARGS;
1851     }
1852     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1853     if (subManager == nullptr) {
1854         return -E_INVALID_ARGS;
1855     }
1856     int errCode = ControlCmdStartCheck(context);
1857     if (errCode != E_OK) {
1858         return errCode;
1859     }
1860     auto packet = new (std::nothrow) SubscribeRequest();
1861     if (packet == nullptr) {
1862         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1863         return -E_OUT_OF_MEMORY;
1864     }
1865     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1866         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1867         if (errCode != E_OK) {
1868             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1869             delete packet;
1870             packet = nullptr;
1871             return errCode;
1872         }
1873     }
1874     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1875     errCode = SendControlPacket(packet, context);
1876     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1877         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1878     }
1879     return errCode;
1880 }
1881 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1882 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1883 {
1884     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1885     if (packet == nullptr) {
1886         return -E_INVALID_ARGS;
1887     }
1888     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1889         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1890     int errCode = ControlCmdRequestRecvPre(context, message);
1891     if (errCode != E_OK) {
1892         return errCode;
1893     }
1894     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1895         errCode = SubscribeRequestRecv(context, message);
1896     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1897         errCode = UnsubscribeRequestRecv(context, message);
1898     }
1899     return errCode;
1900 }
1901 
UpdatePeerWaterMarkInner(const DataRequestPacket & packet,const SyncTimeRange & dataTime,const UpdateWaterMark & isUpdateWaterMark,const SingleVerSyncTaskContext * context)1902 void SingleVerDataSync::UpdatePeerWaterMarkInner(const DataRequestPacket &packet, const SyncTimeRange &dataTime,
1903     const UpdateWaterMark &isUpdateWaterMark, const SingleVerSyncTaskContext *context)
1904 {
1905     SyncType curType = SyncOperation::GetSyncType(packet.GetMode());
1906     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1907         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1908     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet.IsNeedUpdateWaterMark()) {
1909         UpdateQueryPeerWaterMark(curType, packet.GetQueryId(), dataTime, context, isUpdateWaterMark);
1910     }
1911 }
1912 } // namespace DistributedDB
1913