• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43     storage_ = nullptr;
44     communicateHandle_ = nullptr;
45     metadata_ = nullptr;
46 }
47 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52         return -E_INVALID_ARGS;
53     }
54     storage_ = static_cast<SyncGenericInterface *>(inStorage);
55     communicateHandle_ = inCommunicateHandle;
56     metadata_ = inMetadata;
57     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58     std::vector<uint8_t> label = inStorage->GetIdentifier();
59     label.resize(3); // only show 3 Bytes enough
60     label_ = DBCommon::VectorToHexString(label);
61     deviceId_ = deviceId;
62     msgSchedule_.Initialize(label_, deviceId_);
63     return E_OK;
64 }
65 
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68     std::lock_guard<std::mutex> lock(lock_);
69     int errCode = CheckPermitSendData(mode, context);
70     if (errCode != E_OK) {
71         return errCode;
72     }
73     if (sessionId_ != 0) { // auto sync timeout resend
74         return ReSendData(context);
75     }
76     ResetSyncStatus(mode, context);
77     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78     int tmpMode = SyncOperation::TransferSyncMode(mode);
79     if (tmpMode == SyncModeType::PUSH) {
80         errCode = PushStart(context);
81     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82         errCode = PushPullStart(context);
83     } else if (tmpMode == SyncModeType::PULL) {
84         errCode = PullRequestStart(context);
85     } else {
86         SingleVerDataSyncUtils::CacheInitWaterMark(context, this);
87         errCode = PullResponseStart(context);
88     }
89     if (context->IsSkipTimeoutError(errCode)) {
90         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
91         // just return to avoid higher pressure for send.
92         return E_OK;
93     }
94     if (errCode != E_OK) {
95         LOGE("[DataSync] SendStart errCode=%d", errCode);
96         return errCode;
97     }
98     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
99         LOGE("wait for recv finished for push and pull mode");
100         return -E_EKEYREVOKED;
101     }
102     return InnerSyncStart(context);
103 }
104 
InnerSyncStart(SingleVerSyncTaskContext * context)105 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
106 {
107     int errCode = CheckPermitSendData(mode_, context);
108     if (errCode != E_OK) {
109         return errCode;
110     }
111     while (true) {
112         if (windowSize_ <= 0 || isAllDataHasSent_) {
113             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
114                 label_.c_str(), STR_MASK(deviceId_));
115             return E_OK;
116         }
117         int mode = SyncOperation::TransferSyncMode(mode_);
118         if (mode == SyncModeType::PULL) {
119             LOGE("[DataSync] unexpected error");
120             return -E_INVALID_ARGS;
121         }
122         context->IncSequenceId();
123         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
124             errCode = PushStart(context);
125         } else {
126             errCode = PullResponseStart(context);
127         }
128         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
129             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
130             isAllDataHasSent_ = true;
131             return -E_EKEYREVOKED;
132         }
133         if (context->IsSkipTimeoutError(errCode)) {
134             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
135             // just return to avoid higher pressure for send.
136             return E_OK;
137         }
138         if (errCode != E_OK) {
139             LOGE("[DataSync] InnerSend errCode=%d", errCode);
140             return errCode;
141         }
142     }
143     return E_OK;
144 }
145 
InnerClearSyncStatus()146 void SingleVerDataSync::InnerClearSyncStatus()
147 {
148     sessionId_ = 0;
149     reSendMap_.clear();
150     windowSize_ = 0;
151     maxSequenceIdHasSent_ = 0;
152     isAllDataHasSent_ = false;
153 }
154 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)155 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
156 {
157     if (message == nullptr) {
158         LOGE("[DataSync] AckRecv message nullptr");
159         return -E_INVALID_ARGS;
160     }
161     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
162     if (packet == nullptr) {
163         return -E_INVALID_ARGS;
164     }
165     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
166     uint32_t sessionId = message->GetSessionId();
167     uint32_t sequenceId = message->GetSequenceId();
168 
169     std::lock_guard<std::mutex> lock(lock_);
170     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
171         windowSize_, label_.c_str(), STR_MASK(deviceId_));
172     if (sessionId != sessionId_) {
173         LOGI("[DataSync] ignore ack,sessionId is different");
174         return E_OK;
175     }
176     Timestamp lastQueryTime = 0;
177     if (reSendMap_.count(sequenceId) != 0) {
178         lastQueryTime = reSendMap_[sequenceId].end;
179         reSendMap_.erase(sequenceId);
180         windowSize_++;
181     } else {
182         LOGI("[DataSync] ack seqId not in map");
183         return E_OK;
184     }
185     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
186         Timestamp dbLastQueryTime = 0;
187         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
188         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
189             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
190         }
191         if (errCode != E_OK) {
192             return errCode;
193         }
194     }
195     if (!isAllDataHasSent_) {
196         return InnerSyncStart(context);
197     } else if (reSendMap_.empty()) {
198         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
199         InnerClearSyncStatus();
200         return -E_FINISHED;
201     }
202     return E_OK;
203 }
204 
ClearSyncStatus()205 void SingleVerDataSync::ClearSyncStatus()
206 {
207     std::lock_guard<std::mutex> lock(lock_);
208     InnerClearSyncStatus();
209 }
210 
ReSendData(SingleVerSyncTaskContext * context)211 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
212 {
213     if (reSendMap_.empty()) {
214         LOGI("[DataSync] ReSend map empty");
215         return -E_INTERNAL_ERROR;
216     }
217     uint32_t sequenceId = reSendMap_.begin()->first;
218     ReSendInfo reSendInfo = reSendMap_.begin()->second;
219     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
220         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
221         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
222         windowSize_, label_.c_str(), STR_MASK(deviceId_));
223     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
224         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
225     return ReSend(context, dataReSendInfo);
226 }
227 
GetLocalDeviceName()228 std::string SingleVerDataSync::GetLocalDeviceName()
229 {
230     std::string deviceInfo;
231     if (communicateHandle_ != nullptr) {
232         communicateHandle_->GetLocalIdentity(deviceInfo);
233     }
234     return deviceInfo;
235 }
236 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)237 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
238     uint32_t packetLen)
239 {
240     bool startFeedDogRet = false;
241     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
242         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
243             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
244         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
245     }
246     SendConfig sendConfig;
247     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
248     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
249     if (errCode != E_OK) {
250         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
251         if (startFeedDogRet) {
252             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
253         }
254     }
255     return errCode;
256 }
257 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)258 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
259     std::vector<SendDataItem> &outData, size_t packetSize)
260 {
261     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
262     if (performance != nullptr) {
263         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
264     }
265     // start a watch dog before get data
266     // it will send ack util get data finished
267     context->StartFeedDogForGetData(context->GetResponseSessionId());
268     int errCode = GetData(context, packetSize, outData);
269     context->StopFeedDogForGetData();
270     if (performance != nullptr) {
271         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
272     }
273     if (!outData.empty()) {
274         SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
275     }
276     return errCode;
277 }
278 
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)279 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
280 {
281     int errCode;
282     UpdateMtuSize();
283     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
284         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
285         LOGI("[DataSync][GetData] resend data");
286         errCode = GetUnsyncData(context, outData, packetSize);
287     } else {
288         ContinueToken token;
289         context->GetContinueToken(token);
290         if (token == nullptr) {
291             errCode = GetUnsyncData(context, outData, packetSize);
292         } else {
293             LOGD("[DataSync][GetData] get data from token");
294             // if there is data to send, read out data, and update local watermark, send data
295             errCode = GetNextUnsyncData(context, outData, packetSize);
296         }
297     }
298     if (errCode == -E_UNFINISHED) {
299         LOGD("[DataSync][GetData] not finished.");
300     }
301     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
303         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
304     }
305     return errCode;
306 }
307 
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)308 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
309 {
310     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
311     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
312         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
313     bool needCompressOnSync = false;
314     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
315     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
316     int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
317     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
318         context->SetTaskErrCode(errCode);
319         return errCode;
320     }
321 
322     int innerCode = InterceptData(syncOutData);
323     if (innerCode != E_OK) {
324         context->SetTaskErrCode(innerCode);
325         return innerCode;
326     }
327 
328     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
329     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
330         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
331             { remoteAlgo, version });
332         if (compressCode != E_OK) {
333             return compressCode;
334         }
335     }
336     return errCode;
337 }
338 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)339 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
340     size_t packetSize)
341 {
342     SyncTimeRange waterRange;
343     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
344     WaterMark startMark = 0;
345     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
346     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
347     if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
348         return E_OK;
349     }
350     waterRange.beginTime = startMark;
351     if (curType == SyncType::QUERY_SYNC_TYPE) {
352         WaterMark deletedStartMark = 0;
353         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
354         Timestamp lastQueryTimestamp = 0;
355         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
356             lastQueryTimestamp);
357         if (errCode != E_OK) {
358             return errCode;
359         }
360         waterRange.deleteBeginTime = deletedStartMark;
361         waterRange.lastQueryTime = lastQueryTimestamp;
362     }
363     return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
364 }
365 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)366 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
367     DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
368 {
369     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
370     ContinueToken token = nullptr;
371     context->GetContinueToken(token);
372     if (token != nullptr) {
373         storage_->ReleaseContinueToken(token);
374     }
375     int errCode;
376     if (curType != SyncType::QUERY_SYNC_TYPE) {
377         errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
378             syncDataSizeInfo);
379     } else {
380         QuerySyncObject queryObj = context->GetQuery();
381         errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
382     }
383     context->SetContinueToken(token);
384     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
385         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
386     }
387     return errCode;
388 }
389 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)390 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
391     size_t packetSize)
392 {
393     ContinueToken token;
394     context->GetContinueToken(token);
395     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
396     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
397     context->SetContinueToken(token);
398     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
399         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
400     }
401     return errCode;
402 }
403 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)404 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
405     SyncType curType, const QuerySyncObject &query)
406 {
407     if (inData.empty()) {
408         return E_OK;
409     }
410     SingleVerDataSyncUtils::RecordClientId(*context, *storage_, metadata_);
411     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
412     if (performance != nullptr) {
413         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
414     }
415     const auto localDeviceName = GetLocalDeviceName();
416     const std::string localHashName = DBCommon::TransferHashString(localDeviceName);
417     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
418     std::vector<SendDataItem> copyData = inData;
419     int errCode = storage_->InterceptData(copyData, GetDeviceId(), localDeviceName, false);
420     if (errCode != E_OK) {
421         LOGE("[DataSync][SaveData] intercept data failed, errCode=%d", errCode);
422         return errCode;
423     }
424     // query only support prefix key and don't have query in packet in 104 version
425     errCode = storage_->PutSyncDataWithQuery(query, copyData, context->GetDeviceId());
426     if (performance != nullptr) {
427         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428     }
429     if (errCode != E_OK) {
430         LOGE("[DataSync][SaveData] save sync data failed, errCode=%d", errCode);
431     }
432     return errCode;
433 }
434 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437     mode_ = inMode;
438     maxSequenceIdHasSent_ = 0;
439     isAllDataHasSent_ = false;
440     context->ReSetSequenceId();
441     reSendMap_.clear();
442     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443         windowSize_ = LOW_VERSION_WINDOW_SIZE;
444     } else {
445         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446     }
447     int mode = SyncOperation::TransferSyncMode(inMode);
448     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449         sessionId_ = context->GetRequestSessionId();
450     } else {
451         sessionId_ = context->GetResponseSessionId();
452     }
453 }
454 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458     WaterMark localMark = 0;
459     WaterMark deleteMark = 0;
460     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461     GetLocalDeleteSyncWaterMark(context, deleteMark);
462     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468     WaterMark localMark = 0;
469     int errCode = E_OK;
470     const std::string &deviceId = context->GetDeviceId();
471     std::string queryId = context->GetQuerySyncId();
472     if (syncType != SyncType::QUERY_SYNC_TYPE) {
473         if (isCheckBeforUpdate) {
474             GetLocalWaterMark(syncType, queryId, context, localMark);
475             if (localMark >= dataTimeRange.endTime) {
476                 return E_OK;
477             }
478         }
479         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
480     } else {
481         bool isNeedUpdateMark = true;
482         bool isNeedUpdateDeleteMark = true;
483         if (isCheckBeforUpdate) {
484             WaterMark deleteDataWaterMark = 0;
485             GetLocalWaterMark(syncType, queryId, context, localMark);
486             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487             if (localMark >= dataTimeRange.endTime) {
488                 isNeedUpdateMark = false;
489             }
490             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491                 isNeedUpdateDeleteMark = false;
492             }
493         }
494         if (isNeedUpdateMark) {
495             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
497             if (errCode != E_OK) {
498                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
499                 return errCode;
500             }
501         }
502         if (isNeedUpdateDeleteMark) {
503             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
504                 dataTimeRange.deleteEndTime);
505             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
506         }
507     }
508     if (errCode != E_OK) {
509         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
510     }
511     return errCode;
512 }
513 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const514 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
515     const DeviceID &deviceId, WaterMark &waterMark) const
516 {
517     if (syncType != SyncType::QUERY_SYNC_TYPE) {
518         metadata_->GetPeerWaterMark(deviceId, waterMark);
519         return;
520     }
521     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
522 }
523 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)524 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
525 {
526     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
527 }
528 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const529 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
530     WaterMark &waterMark) const
531 {
532     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
533 }
534 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const535 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
536     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
537 {
538     if (syncType != SyncType::QUERY_SYNC_TYPE) {
539         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
540         return;
541     }
542     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
543         waterMark, context->IsAutoLiftWaterMark());
544 }
545 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)546 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
547     WaterMark maxSendDataTime)
548 {
549     bool isNeedClearRemoteData = false;
550     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
551     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
552         uint64_t clearDeviceDataMark = 0;
553         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
554         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
555     } else {
556         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
557         if (packet == nullptr) {
558             LOGE("[RemoveDeviceDataHandle] get packet object failed");
559             return -E_INVALID_ARGS;
560         }
561         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
562         WaterMark packetLocalMark = packet->GetLocalWaterMark();
563         WaterMark peerMark = 0;
564         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
565         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
566     }
567     if (!isNeedClearRemoteData) {
568         return E_OK;
569     }
570     int errCode = E_OK;
571     if (context->IsNeedClearRemoteStaleData()) {
572         // need to clear remote device history data
573         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
574         if (errCode != E_OK) {
575             (void)SendDataAck(context, message, errCode, maxSendDataTime);
576             return errCode;
577         }
578         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
579             // avoid repeat clear in ack
580             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
581         }
582     }
583     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
584         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
585         if (errCode != E_OK) {
586             (void)SendDataAck(context, message, errCode, maxSendDataTime);
587             return errCode;
588         }
589     }
590     return E_OK;
591 }
592 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)593 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
594     const std::vector<uint64_t> &reserved)
595 {
596     bool isNeedClearRemoteData = false;
597     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
598     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
599     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
600         uint64_t clearDeviceDataMark = 0;
601         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
602         isNeedClearRemoteData = (clearDeviceDataMark != 0);
603     } else if (reserved.empty()) {
604         WaterMark localMark = 0;
605         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
606         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
607     } else {
608         WaterMark peerMark = 0;
609         GetPeerWaterMark(curType, context->GetQuerySyncId(),
610             context->GetDeviceId(), peerMark);
611         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
612     }
613     if (!isNeedClearRemoteData) {
614         return E_OK;
615     }
616     // need to clear remote historydata
617     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
618         label_.c_str(), STR_MASK(GetDeviceId()));
619     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
620     if (errCode != E_OK) {
621         return errCode;
622     }
623     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
624         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
625     }
626     return errCode;
627 }
628 
SetSessionEndTimestamp(Timestamp end)629 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
630 {
631     sessionEndTimestamp_ = end;
632 }
633 
GetSessionEndTimestamp() const634 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
635 {
636     return sessionEndTimestamp_;
637 }
638 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)639 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
640 {
641     ReSendInfo reSendInfo;
642     reSendInfo.start = dataTimeRange.beginTime;
643     reSendInfo.end = dataTimeRange.endTime;
644     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
645     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
646     reSendInfo.packetId = context->GetPacketId();
647     maxSequenceIdHasSent_++;
648     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
649     windowSize_--;
650     ContinueToken token;
651     context->GetContinueToken(token);
652     if (token == nullptr) {
653         isAllDataHasSent_ = true;
654     }
655     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
656         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
657         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
658         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
659 }
660 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)661 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
662     SyncEntry &syncData, int sendCode, int mode)
663 {
664     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
665     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
666     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
667     WaterMark localMark = 0;
668     WaterMark peerMark = 0;
669     WaterMark deleteMark = 0;
670     bool needCompressOnSync = false;
671     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
672     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
673     std::string id = context->GetQuerySyncId();
674     GetLocalWaterMark(curType, id, context, localMark);
675     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
676     GetLocalDeleteSyncWaterMark(context, deleteMark);
677     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
678         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
679         packet->SetLastSequence();
680     }
681     int tmpMode = mode;
682     if (mode == SyncModeType::RESPONSE_PULL) {
683         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
684     }
685     packet->SetData(syncData.entries);
686     packet->SetCompressData(syncData.compressedEntries);
687     packet->SetBasicInfo(sendCode, version, tmpMode);
688     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
689     packet->SetWaterMark(localMark, peerMark, deleteMark);
690     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
691         packet->SetEndWaterMark(context->GetEndMark());
692         packet->SetSessionId(context->GetRequestSessionId());
693     }
694     packet->SetQuery(context->GetQuery());
695     packet->SetQueryId(context->GetQuerySyncId());
696     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
697     // empty compress data should not mark compress
698     if (!syncData.compressedEntries.empty() && needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
699         packet->SetCompressDataMark();
700         packet->SetCompressAlgo(curAlgo);
701     }
702     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
703     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
704         context->GetQuery().HasOrderBy())) {
705         packet->SetUpdateWaterMark();
706     }
707     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
708         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
709         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
710 }
711 
RequestStart(SingleVerSyncTaskContext * context,int mode)712 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
713 {
714     int errCode = QuerySyncCheck(context);
715     if (errCode != E_OK) {
716         LOGE("[DataSync][PushStart] check query failed, errCode=%d", errCode);
717         return errCode;
718     }
719     errCode = RemoveDeviceDataIfNeed(context);
720     if (errCode != E_OK) {
721         context->SetTaskErrCode(errCode);
722         return errCode;
723     }
724     SyncEntry syncData;
725     // get data
726     errCode = GetMatchData(context, syncData);
727     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
728 
729     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
730         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
731         return errCode;
732     }
733 
734     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
735     if (packet == nullptr) {
736         LOGE("[DataSync][PushStart] new DataRequestPacket error");
737         return -E_OUT_OF_MEMORY;
738     }
739     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
740     UpdateWaterMark isUpdateWaterMark;
741     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
742     if (errCode == E_OK) {
743         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
744     }
745     FillDataRequestPacket(packet, context, syncData, errCode, mode);
746     errCode = SendDataPacket(curType, packet, context);
747     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
748     if (performance != nullptr) {
749         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
750     }
751     if (errCode == E_OK || errCode == -E_TIMEOUT) {
752         UpdateSendInfo(dataTime, context);
753     }
754     if (errCode == E_OK) {
755         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
756             context->GetQuery().HasOrderBy())) {
757             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
758             return E_OK;
759         }
760         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
761         SaveLocalWaterMark(curType, context, tmpDataTime);
762     }
763     return errCode;
764 }
765 
PushStart(SingleVerSyncTaskContext * context)766 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
767 {
768     if (context == nullptr) {
769         return -E_INVALID_ARGS;
770     }
771     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
772     return RequestStart(context,
773         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
774 }
775 
PushPullStart(SingleVerSyncTaskContext * context)776 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
777 {
778     if (context == nullptr) {
779         return -E_INVALID_ARGS;
780     }
781     return RequestStart(context, context->GetMode());
782 }
783 
PullRequestStart(SingleVerSyncTaskContext * context)784 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
785 {
786     if (context == nullptr) {
787         return -E_INVALID_ARGS;
788     }
789     int errCode = QuerySyncCheck(context);
790     if (errCode != E_OK) {
791         return errCode;
792     }
793     errCode = RemoveDeviceDataIfNeed(context);
794     if (errCode != E_OK) {
795         context->SetTaskErrCode(errCode);
796         return errCode;
797     }
798     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
799     if (packet == nullptr) {
800         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
801         return -E_OUT_OF_MEMORY;
802     }
803     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
804     WaterMark peerMark = 0;
805     WaterMark localMark = 0;
806     WaterMark deleteMark = 0;
807     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
808         context->GetDeviceId(), peerMark);
809     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
810     GetLocalDeleteSyncWaterMark(context, deleteMark);
811     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
812     WaterMark endMark = context->GetEndMark();
813     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
814     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
815     packet->SetBasicInfo(E_OK, version, context->GetMode());
816     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
817     packet->SetWaterMark(localMark, peerMark, deleteMark);
818     packet->SetEndWaterMark(endMark);
819     packet->SetSessionId(context->GetRequestSessionId());
820     packet->SetQuery(context->GetQuery());
821     packet->SetQueryId(context->GetQuerySyncId());
822     packet->SetLastSequence();
823     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
824     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
825     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
826         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark,
827         label_.c_str(), STR_MASK(GetDeviceId()));
828     UpdateSendInfo(dataTime, context);
829     return SendDataPacket(syncType, packet, context);
830 }
831 
PullResponseStart(SingleVerSyncTaskContext * context)832 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
833 {
834     if (context == nullptr) {
835         return -E_INVALID_ARGS;
836     }
837     SyncEntry syncData;
838     // get data
839     int errCode = GetMatchData(context, syncData);
840     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
841         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
842             (void)SendPullResponseDataPkt(errCode, syncData, context);
843         }
844         return errCode;
845     }
846     // if send finished
847     int ackCode = E_OK;
848     ContinueToken token = nullptr;
849     context->GetContinueToken(token);
850     if (errCode == E_OK && token == nullptr) {
851         LOGD("[DataSync][PullResponse] send last frame end");
852         ackCode = SEND_FINISHED;
853     }
854     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
855     UpdateWaterMark isUpdateWaterMark;
856     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
857     if (errCode == E_OK) {
858         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
859     }
860     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
861     if (errCode == E_OK || errCode == -E_TIMEOUT) {
862         UpdateSendInfo(dataTime, context);
863     }
864     if (errCode == E_OK) {
865         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
866             context->GetQuery().HasOrderBy())) {
867             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
868             return E_OK;
869         }
870         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
871         SaveLocalWaterMark(curType, context, tmpDataTime);
872     }
873     return errCode;
874 }
875 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)876 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
877     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
878 {
879     WaterMark tmpPeerWatermark = dataTime.endTime;
880     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
881     if (isUpdateWaterMark.normalUpdateMark) {
882         tmpPeerWatermark++;
883     }
884     if (isUpdateWaterMark.deleteUpdateMark) {
885         tmpPeerDeletedWatermark++;
886     }
887     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
888 }
889 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)890 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
891     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
892 {
893     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
894         return;
895     }
896     int errCode = E_OK;
897     if (syncType != SyncType::QUERY_SYNC_TYPE) {
898         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
899     } else {
900         if (peerWatermark != 0) {
901             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
902             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
903             if (errCode != E_OK) {
904                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
905             }
906         }
907         if (peerDeletedWatermark != 0) {
908             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
909                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
910             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
911         }
912     }
913     if (errCode != E_OK) {
914         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
915     }
916 }
917 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)918 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
919 {
920     uint16_t remoteCommunicatorVersion = 0;
921     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
922         -E_NOT_FOUND) {
923         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
924         return -E_VERSION_NOT_SUPPORT;
925     }
926     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
927     if (remoteCommunicatorVersion == 0) {
928         LOGI("[DataSync] set remote version 0");
929         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
930         return E_OK;
931     } else {
932         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
933         if (isControlMsg) {
934             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
935         } else {
936             (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
937         }
938         return -E_NEED_ABILITY_SYNC;
939     }
940 }
941 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)942 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
943 {
944     if (context == nullptr || message == nullptr) {
945         return -E_INVALID_ARGS;
946     }
947     auto *packet = message->GetObject<DataRequestPacket>();
948     if (packet == nullptr) {
949         return -E_INVALID_ARGS;
950     }
951     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
952         return DoAbilitySyncIfNeed(context, message);
953     }
954     int32_t sendCode = packet->GetSendCode();
955     if (sendCode == -E_VERSION_NOT_SUPPORT) {
956         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
957         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
958         return -E_WAIT_NEXT_MESSAGE;
959     }
960     // only deal with pull response packet errCode
961     if (sendCode != E_OK && sendCode != SEND_FINISHED && sendCode != -E_UNFINISHED &&
962         message->GetSessionId() == context->GetRequestSessionId()) {
963         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
964         return sendCode;
965     }
966     int errCode = RunPermissionCheck(context, message, packet);
967     if (errCode != E_OK) {
968         return errCode;
969     }
970     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
971         errCode = CheckSchemaStrategy(context, message);
972     }
973     if (errCode == E_OK) {
974         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
975     }
976     if (errCode != E_OK) {
977         (void)SendDataAck(context, message, errCode, 0);
978         return errCode;
979     }
980     errCode = SingleVerDataSyncUtils::SchemaVersionMatchCheck(*context, *packet, metadata_);
981     if (errCode != E_OK) {
982         (void)SendDataAck(context, message, errCode, 0);
983     }
984     return errCode;
985 }
986 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)987 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
988     WaterMark &pullEndWatermark)
989 {
990     int errCode = DataRequestRecvPre(context, message);
991     if (errCode != E_OK) {
992         return errCode;
993     }
994     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
995     const std::vector<SendDataItem> &data = packet->GetData();
996     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
997     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
998         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
999         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
1000     context->SetReceiveWaterMarkErr(false);
1001     UpdateWaterMark isUpdateWaterMark;
1002     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
1003     errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
1004     if (errCode != E_OK) {
1005         return errCode;
1006     }
1007     Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1008     if (WaterMarkErrHandle(curType, context, message)) {
1009         return E_OK;
1010     }
1011     GetPullEndWatermark(context, packet, pullEndWatermark);
1012     // save data first
1013     errCode = SaveData(context, data, curType, packet->GetQuery());
1014     if (errCode != E_OK) {
1015         (void)SendDataAck(context, message, errCode, dataTime.endTime);
1016         return errCode;
1017     }
1018     SingleVerDataSyncUtils::UpdateSyncProcess(context, packet, data.size());
1019 
1020     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1021         pullEndWatermark = 0;
1022         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1023     } else {
1024         // if data is empty, we don't know the max timestap of this packet.
1025         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1026     }
1027     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1028         context->GetRequestSessionId());
1029     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1030         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1031     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1032         UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1033     }
1034     if (errCode != E_OK) {
1035         return errCode;
1036     }
1037     if (packet->GetSendCode() == SEND_FINISHED) {
1038         return -E_RECV_FINISHED;
1039     }
1040     return errCode;
1041 }
1042 
SendDataPacket(SyncType syncType,DataRequestPacket * packet,SingleVerSyncTaskContext * context)1043 int SingleVerDataSync::SendDataPacket(SyncType syncType, DataRequestPacket *packet,
1044     SingleVerSyncTaskContext *context)
1045 {
1046     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1047     if (message == nullptr) {
1048         LOGE("[DataSync][SendDataPacket] new message error");
1049         delete packet;
1050         packet = nullptr;
1051         return -E_OUT_OF_MEMORY;
1052     }
1053     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1054     int errCode = message->SetExternalObject(packet);
1055     if (errCode != E_OK) {
1056         delete packet;
1057         packet = nullptr;
1058         delete message;
1059         message = nullptr;
1060         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1061         return errCode;
1062     }
1063     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1064         context->GetSequenceId(), context->GetRequestSessionId());
1065     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1066     if (performance != nullptr) {
1067         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1068     }
1069     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1070         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1071     };
1072     errCode = Send(context, message, handler, packetLen);
1073     if (errCode != E_OK) {
1074         delete message;
1075         message = nullptr;
1076     }
1077 
1078     return errCode;
1079 }
1080 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1081 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1082     SingleVerSyncTaskContext *context)
1083 {
1084     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1085     if (packet == nullptr) {
1086         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1087         return -E_OUT_OF_MEMORY;
1088     }
1089     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1090     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1091 
1092     if ((ackCode == E_OK || ackCode == SEND_FINISHED) &&
1093         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1094         uint32_t total = 0u;
1095         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1096         LOGD("[SendPullResponseDataPkt] GetUnsyncTotal total=%u", total);
1097         packet->SetTotalDataCount(total);
1098     }
1099     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1100     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1101     if (message == nullptr) {
1102         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1103         delete packet;
1104         packet = nullptr;
1105         return -E_OUT_OF_MEMORY;
1106     }
1107     int errCode = message->SetExternalObject(packet);
1108     if (errCode != E_OK) {
1109         delete packet;
1110         packet = nullptr;
1111         delete message;
1112         message = nullptr;
1113         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1114         return errCode;
1115     }
1116     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1117         context->GetSequenceId(), context->GetResponseSessionId());
1118     SendResetWatchDogPacket(context, packetLen);
1119     errCode = Send(context, message, nullptr, packetLen);
1120     if (errCode != E_OK) {
1121         delete message;
1122         message = nullptr;
1123     }
1124     return errCode;
1125 }
1126 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1127 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1128 {
1129     (void)SendDataAck(context, message, E_OK, 0);
1130 }
1131 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1132 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1133     WaterMark maxSendDataTime)
1134 {
1135     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1136     if (packet == nullptr) {
1137         return -E_INVALID_ARGS;
1138     }
1139     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1140     if (ackMessage == nullptr) {
1141         LOGE("[DataSync][SendDataAck] new message error");
1142         return -E_OUT_OF_MEMORY;
1143     }
1144     DataAckPacket ack;
1145     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1146     int errCode = ackMessage->SetCopiedObject(ack);
1147     if (errCode != E_OK) {
1148         delete ackMessage;
1149         ackMessage = nullptr;
1150         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1151         return errCode;
1152     }
1153     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1154         message->GetSequenceId(), message->GetSessionId());
1155 
1156     errCode = Send(context, ackMessage, nullptr, 0);
1157     if (errCode != E_OK) {
1158         delete ackMessage;
1159         ackMessage = nullptr;
1160     }
1161     return errCode;
1162 }
1163 
AckPacketIdCheck(const Message * message)1164 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1165 {
1166     if (message == nullptr) {
1167         LOGE("[DataSync] AckRecv message nullptr");
1168         return false;
1169     }
1170     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1171         return true;
1172     }
1173     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1174     if (packet == nullptr) {
1175         return false;
1176     }
1177     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1178     std::lock_guard<std::mutex> lock(lock_);
1179     uint32_t sequenceId = message->GetSequenceId();
1180     if (reSendMap_.count(sequenceId) != 0) {
1181         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1182         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1183             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1184                 originalPacketId);
1185             return false;
1186         }
1187     }
1188     return true;
1189 }
1190 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1191 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1192 {
1193     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1194     if (errCode != E_OK) {
1195         return errCode;
1196     }
1197     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1198     if (packet == nullptr) {
1199         return -E_INVALID_ARGS;
1200     }
1201     int32_t recvCode = packet->GetRecvCode();
1202     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1203         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1204     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1205         LOGE("[DataSync][AckRecv] Version mismatch");
1206         return -E_VERSION_NOT_SUPPORT;
1207     }
1208 
1209     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT || recvCode == -E_NEED_TIME_SYNC) {
1210         // we should ReleaseContinueToken, avoid crash
1211         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1212             STR_MASK(GetDeviceId()));
1213         context->ReleaseContinueToken();
1214         return recvCode;
1215     }
1216     uint64_t data = packet->GetData();
1217     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1218         return DealWaterMarkException(context, data, packet->GetReserved());
1219     }
1220 
1221     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1222         // data only use low 32bit
1223         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1224         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1225             STR_MASK(GetDeviceId()));
1226     }
1227 
1228     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1229         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1230             label_.c_str(), STR_MASK(GetDeviceId()));
1231         return recvCode;
1232     }
1233 
1234     // Judge if send finished
1235     ContinueToken token;
1236     context->GetContinueToken(token);
1237     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1238         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1239         return -E_NO_DATA_SEND;
1240     }
1241 
1242     // send next data
1243     return -E_SEND_DATA;
1244 }
1245 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1246 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1247     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1248 {
1249     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1250         LOGE("[SingleVerDataSync] messageId not available.");
1251         return;
1252     }
1253     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1254     if (ackMessage == nullptr) {
1255         LOGE("[DataSync][SaveDataNotify] new message failed");
1256         return;
1257     }
1258 
1259     DataAckPacket ack;
1260     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1261     ack.SetVersion(pktVersion);
1262     int errCode = ackMessage->SetCopiedObject(ack);
1263     if (errCode != E_OK) {
1264         delete ackMessage;
1265         ackMessage = nullptr;
1266         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1267         return;
1268     }
1269     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1270 
1271     errCode = Send(context, ackMessage, nullptr, 0);
1272     if (errCode != E_OK) {
1273         delete ackMessage;
1274         ackMessage = nullptr;
1275     }
1276     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1277         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1278 }
1279 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1280 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1281     WaterMark &pullEndWatermark) const
1282 {
1283     if (packet == nullptr) {
1284         return;
1285     }
1286     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1287     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1288         WaterMark endMark = packet->GetEndWaterMark();
1289         TimeOffset offset;
1290         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1291         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1292         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1293             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1294     }
1295 }
1296 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1297 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1298     const std::vector<uint64_t> &reserved)
1299 {
1300     WaterMark deletedWaterMark = 0;
1301     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1302     if (curType == SyncType::QUERY_SYNC_TYPE) {
1303         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1304             LOGE("[DataSync] get packet reserve size failed");
1305             return -E_INVALID_ARGS;
1306         }
1307         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1308     }
1309     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1310         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1311     int errCode = SaveLocalWaterMark(curType, context,
1312         {0, 0, ackWaterMark, deletedWaterMark});
1313     if (errCode != E_OK) {
1314         return errCode;
1315     }
1316     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1317     context->IncNegotiationCount();
1318     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1319     if (!context->IsNeedClearRemoteStaleData()) {
1320         return -E_RE_SEND_DATA;
1321     }
1322     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1323     if (errCode != E_OK) {
1324         return errCode;
1325     }
1326     return -E_RE_SEND_DATA;
1327 }
1328 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1329 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1330     const DataRequestPacket *packet)
1331 {
1332     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1333     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1334     if (errCode != E_OK) {
1335         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1336             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1337         }
1338         return -E_NOT_PERMIT;
1339     }
1340     const std::vector<SendDataItem> &data = packet->GetData();
1341     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1342     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1343     auto securityOption = packet->GetSecurityOption();
1344     if (context->GetRemoteSeccurityOption().securityLabel == SecurityLabel::NOT_SET &&
1345         securityOption.securityLabel != SecurityLabel::NOT_SET) {
1346         context->SetRemoteSeccurityOption(packet->GetSecurityOption());
1347     }
1348     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1349         !context->GetReceivcPermitCheck()) {
1350         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1351         if (permitReceive) {
1352             context->SetReceivcPermitCheck(true);
1353         } else {
1354             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1355             return -E_SECURITY_OPTION_CHECK_ERROR;
1356         }
1357     }
1358     return errCode;
1359 }
1360 
1361 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1362 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1363 {
1364     // When mtu less than 30k, we send data with bluetooth
1365     // In order not to block the bluetooth channel, we don't send notify packet here
1366     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1367         return;
1368     }
1369     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1370 
1371     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1372     if (ackMessage == nullptr) {
1373         LOGE("[DataSync][ResetWatchDog] new message failed");
1374         return;
1375     }
1376 
1377     DataAckPacket ack;
1378     ack.SetData(data);
1379     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1380     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1381     int errCode = ackMessage->SetCopiedObject(ack);
1382     if (errCode != E_OK) {
1383         delete ackMessage;
1384         ackMessage = nullptr;
1385         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1386         return;
1387     }
1388     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1389         context->GetSequenceId(), context->GetResponseSessionId());
1390 
1391     errCode = Send(context, ackMessage, nullptr, 0);
1392     if (errCode != E_OK) {
1393         delete ackMessage;
1394         ackMessage = nullptr;
1395         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1396             STR_MASK(GetDeviceId()));
1397     } else {
1398         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1399             STR_MASK(GetDeviceId()));
1400     }
1401 }
1402 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1403 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1404 {
1405     if (context == nullptr) {
1406         return -E_INVALID_ARGS;
1407     }
1408     int errCode = RemoveDeviceDataIfNeed(context);
1409     if (errCode != E_OK) {
1410         context->SetTaskErrCode(errCode);
1411         return errCode;
1412     }
1413     SyncEntry syncData;
1414     errCode = GetReSendData(syncData, context, reSendInfo);
1415     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1416         return errCode;
1417     }
1418     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1419     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1420     if (packet == nullptr) {
1421         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1422         return -E_OUT_OF_MEMORY;
1423     }
1424     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1425     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1426     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() || context->GetQuery().HasOrderBy())) {
1427         LOGI("[DataSync][ReSend] query contain limit/offset/orderby, no need to update watermark.");
1428         return errCode;
1429     }
1430     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1431         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1432         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1433         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1434             dataTime.deleteEndTime += 1;
1435         }
1436         if (reSendInfo.end > reSendInfo.start) {
1437             dataTime.endTime += 1;
1438         }
1439         errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1440         if (errCode != E_OK) {
1441             LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1442         }
1443     }
1444     return errCode;
1445 }
1446 
SendReSendPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1447 int SingleVerDataSync::SendReSendPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1448     uint32_t sessionId, uint32_t sequenceId)
1449 {
1450     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1451     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1452     if (message == nullptr) {
1453         LOGE("[DataSync][SendDataPacket] new message error");
1454         delete packet;
1455         packet = nullptr;
1456         return -E_OUT_OF_MEMORY;
1457     }
1458     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1459     int errCode = message->SetExternalObject(packet);
1460     if (errCode != E_OK) {
1461         delete packet;
1462         packet = nullptr;
1463         delete message;
1464         message = nullptr;
1465         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1466         return errCode;
1467     }
1468     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1469     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1470         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1471     };
1472     errCode = Send(context, message, handler, packetLen);
1473     if (errCode != E_OK) {
1474         delete message;
1475         message = nullptr;
1476     }
1477     return errCode;
1478 }
1479 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1480 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1481 {
1482     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1483     int mode = SyncOperation::TransferSyncMode(inMode);
1484     // for pull mode it just need to get data, no need to send data.
1485     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1486         return E_OK;
1487     }
1488     if (context->GetSendPermitCheck()) {
1489         return E_OK;
1490     }
1491     bool isPermitSync = true;
1492     std::string deviceId = context->GetDeviceId();
1493     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1494     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1495         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1496     }
1497     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1498         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1499     if (isPermitSync) {
1500         context->SetSendPermitCheck(true);
1501         return E_OK;
1502     }
1503     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1504         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1505         return -E_SECURITY_OPTION_CHECK_ERROR;
1506     }
1507     if (mode == SyncModeType::RESPONSE_PULL) {
1508         SyncEntry syncData;
1509         (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1510         return -E_SECURITY_OPTION_CHECK_ERROR;
1511     }
1512     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1513         return -E_SECURITY_OPTION_CHECK_ERROR;
1514     }
1515     return E_OK;
1516 }
1517 
GetLabel() const1518 std::string SingleVerDataSync::GetLabel() const
1519 {
1520     return label_;
1521 }
1522 
GetDeviceId() const1523 std::string SingleVerDataSync::GetDeviceId() const
1524 {
1525     return deviceId_;
1526 }
1527 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1528 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1529 {
1530     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1531     if (packet == nullptr) {
1532         LOGE("[WaterMarkErrHandle] get packet object failed");
1533         return -E_INVALID_ARGS;
1534     }
1535     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1536     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1537     WaterMark peerMark = 0;
1538     WaterMark deletedMark = 0;
1539     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1540     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1541         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1542     }
1543     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1544         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1545         context->SetReceiveWaterMarkErr(true);
1546         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1547         return true;
1548     }
1549     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1550         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1551             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1552             peerMark);
1553         context->SetReceiveWaterMarkErr(true);
1554         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1555         return true;
1556     }
1557     return false;
1558 }
1559 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1560 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1561 {
1562     auto *packet = message->GetObject<DataRequestPacket>();
1563     if (packet == nullptr) {
1564         return -E_INVALID_ARGS;
1565     }
1566     if (metadata_->IsAbilitySyncFinish(deviceId_)) {
1567         return E_OK;
1568     }
1569     auto query = packet->GetQuery();
1570     std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1571     if (!schemaSyncStatus.second) {
1572         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1573         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1574         return -E_NEED_ABILITY_SYNC;
1575     }
1576     if (!schemaSyncStatus.first) {
1577         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1578         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1579         return -E_SCHEMA_MISMATCH;
1580     }
1581     return E_OK;
1582 }
1583 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1584 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1585 {
1586     int mode = SyncOperation::TransferSyncMode(inMode);
1587     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1588         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1589         return;
1590     }
1591 
1592     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1593         storage_->NotifyRemotePushFinished(deviceId_);
1594     }
1595 }
1596 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1597 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1598     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1599 {
1600     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1601     WaterMark localMark = 0;
1602     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1603     ackPacket.SetRecvCode(recvCode);
1604     // send ack packet
1605     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1606         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1607     } else if (recvCode != WATER_MARK_INVALID) {
1608         WaterMark mark = 0;
1609         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1610         ackPacket.SetData(mark);
1611     }
1612     std::vector<uint64_t> reserved {localMark};
1613     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1614     uint64_t packetId = 0;
1615     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1616         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1617     }
1618     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1619         reserved.push_back(packetId);
1620     }
1621     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1622     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1623         WaterMark deletedPeerMark;
1624         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1625         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1626     }
1627     ackPacket.SetReserved(reserved);
1628     ackPacket.SetVersion(version);
1629 }
1630 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1631 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1632     DataSyncReSendInfo reSendInfo)
1633 {
1634     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1635     if (mode == SyncModeType::PULL) {
1636         return E_OK;
1637     }
1638     ContinueToken token = nullptr;
1639     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1640     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1641         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1642     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1643     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1644     int errCode;
1645     if (curType != SyncType::QUERY_SYNC_TYPE) {
1646         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1647             reSendDataSizeInfo);
1648     } else {
1649         QuerySyncObject queryObj = context->GetQuery();
1650         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1651             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1652     }
1653     if (token != nullptr) {
1654         storage_->ReleaseContinueToken(token);
1655     }
1656     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1657         context->SetTaskErrCode(errCode);
1658         return errCode;
1659     }
1660     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1661         return errCode;
1662     }
1663     SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(
1664         DBCommon::TransferHashString(GetLocalDeviceName()), syncData.entries);
1665 
1666     int innerCode = InterceptData(syncData);
1667     if (innerCode != E_OK) {
1668         context->SetTaskErrCode(innerCode);
1669         return innerCode;
1670     }
1671 
1672     bool needCompressOnSync = false;
1673     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1674     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1675     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1676     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1677         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1678             { remoteAlgo, version });
1679         if (compressCode != E_OK) {
1680             return compressCode;
1681         }
1682     }
1683     return errCode;
1684 }
1685 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1686 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1687 {
1688     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1689         return E_OK;
1690     }
1691     uint64_t clearRemoteDataMark = 0;
1692     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1693     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1694     if (clearRemoteDataMark == 0) {
1695         return E_OK;
1696     }
1697     int errCode = E_OK;
1698     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1699         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1700         if (errCode != E_OK) {
1701             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1702             return errCode;
1703         }
1704     }
1705     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1706         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1707         if (errCode != E_OK) {
1708             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1709             return errCode;
1710         }
1711     }
1712     return E_OK;
1713 }
1714 
UpdateMtuSize()1715 void SingleVerDataSync::UpdateMtuSize()
1716 {
1717     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1718 }
1719 
FillRequestReSendPacket(SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1720 void SingleVerDataSync::FillRequestReSendPacket(SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1721     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1722 {
1723     SingleVerDataSyncUtils::SetDataRequestCommonInfo(*context, *storage_, *packet, metadata_);
1724     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1725     WaterMark peerMark = 0;
1726     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1727         peerMark);
1728     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1729     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1730     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1731     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1732     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1733         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1734         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1735         packet->SetLastSequence();
1736     }
1737     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1738         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1739         sendCode = SEND_FINISHED;
1740     }
1741     packet->SetData(syncData.entries);
1742     packet->SetCompressData(syncData.compressedEntries);
1743     packet->SetBasicInfo(sendCode, version, reSendMode);
1744     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1745     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1746     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH || context->IsQuerySync()) {
1747         packet->SetEndWaterMark(context->GetEndMark());
1748         packet->SetQuery(context->GetQuery());
1749     }
1750     packet->SetQueryId(context->GetQuerySyncId());
1751     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1752         std::vector<uint64_t> reserved {reSendInfo.packetId};
1753         packet->SetReserved(reserved);
1754     }
1755     if (reSendMode == SyncModeType::PULL || reSendMode == SyncModeType::QUERY_PULL) {
1756         // resend pull packet dont set compress type
1757         return;
1758     }
1759     bool needCompressOnSync = false;
1760     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1761     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1762     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1763     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1764         packet->SetCompressDataMark();
1765         packet->SetCompressAlgo(curAlgo);
1766     }
1767     FillRequestReSendPacketV2(context, packet);
1768 }
1769 
FillRequestReSendPacketV2(SingleVerSyncTaskContext * context,DataRequestPacket * packet)1770 void SingleVerDataSync::FillRequestReSendPacketV2(SingleVerSyncTaskContext *context, DataRequestPacket *packet)
1771 {
1772     if (context->GetMode() == SyncModeType::RESPONSE_PULL &&
1773         SingleVerDataSyncUtils::IsSupportRequestTotal(packet->GetVersion())) {
1774         uint32_t total = 0u;
1775         (void)SingleVerDataSyncUtils::GetUnsyncTotal(context, storage_, total);
1776         packet->SetTotalDataCount(total);
1777     }
1778 }
1779 
GetDataSizeSpecInfo(size_t packetSize)1780 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1781 {
1782     bool needCompressOnSync = false;
1783     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1784     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1785     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1786         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1787     return {blockSize, packetSize};
1788 }
1789 
InterceptData(SyncEntry & syncEntry)1790 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1791 {
1792     if (storage_ == nullptr) {
1793         LOGE("Invalid DB. Can not intercept data.");
1794         return -E_INVALID_DB;
1795     }
1796 
1797     // GetLocalDeviceName get local device ID.
1798     // GetDeviceId get remote device ID.
1799     // If intercept data fail, entries will be released.
1800     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId(), true);
1801 }
1802 
ControlCmdStart(SingleVerSyncTaskContext * context)1803 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1804 {
1805     if (context == nullptr) {
1806         return -E_INVALID_ARGS;
1807     }
1808     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1809     if (subManager == nullptr) {
1810         return -E_INVALID_ARGS;
1811     }
1812     int errCode = ControlCmdStartCheck(context);
1813     if (errCode != E_OK) {
1814         return errCode;
1815     }
1816     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1817     if (packet == nullptr) {
1818         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1819         return -E_OUT_OF_MEMORY;
1820     }
1821     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1822         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1823         if (errCode != E_OK) {
1824             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1825             delete packet;
1826             packet = nullptr;
1827             return errCode;
1828         }
1829     }
1830     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1831     errCode = SendControlPacket(packet, context);
1832     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1833         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1834     }
1835     return errCode;
1836 }
1837 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1838 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1839 {
1840     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1841     if (packet == nullptr) {
1842         return -E_INVALID_ARGS;
1843     }
1844     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1845         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1846     int errCode = ControlCmdRequestRecvPre(context, message);
1847     if (errCode != E_OK) {
1848         return errCode;
1849     }
1850     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1851         errCode = SubscribeRequestRecv(context, message);
1852     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1853         errCode = UnsubscribeRequestRecv(context, message);
1854     }
1855     return errCode;
1856 }
1857 
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1858 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1859 {
1860     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1861     if (subManager == nullptr) {
1862         return -E_INVALID_ARGS;
1863     }
1864     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1865     if (errCode != E_OK) {
1866         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1867         return errCode;
1868     }
1869     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1870     if (packet == nullptr) {
1871         return -E_INVALID_ARGS;
1872     }
1873     int32_t recvCode = packet->GetRecvCode();
1874     uint32_t cmdType = packet->GetcontrolCmdType();
1875     if (recvCode != E_OK) {
1876         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1877             STR_MASK(GetDeviceId()), cmdType);
1878         // for unsubscribe no need to do something
1879         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1880         return recvCode;
1881     }
1882     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1883         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1884     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1885         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1886     }
1887     if (errCode != E_OK) {
1888         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1889         return errCode;
1890     }
1891     return -E_NO_DATA_SEND; // means control msg send finished
1892 }
1893 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1894 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1895 {
1896     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1897         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1898         LOGE("[ControlCmdStartCheck] not support controlCmd");
1899         return -E_INVALID_ARGS;
1900     }
1901     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1902         context->GetQuery().HasInKeys() &&
1903         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1904         return -E_NOT_SUPPORT;
1905     }
1906     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1907         return E_OK;
1908     }
1909     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1910     if (permitReceive) {
1911         context->SetReceivcPermitCheck(true);
1912     } else {
1913         return -E_SECURITY_OPTION_CHECK_ERROR;
1914     }
1915     return E_OK;
1916 }
1917 
SendControlPacket(ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1918 int SingleVerDataSync::SendControlPacket(ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1919 {
1920     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1921     if (message == nullptr) {
1922         LOGE("[DataSync][SendControlPacket] new message error");
1923         delete packet;
1924         packet = nullptr;
1925         return -E_OUT_OF_MEMORY;
1926     }
1927     uint32_t packetLen = packet->CalculateLen();
1928     int errCode = message->SetExternalObject(packet);
1929     if (errCode != E_OK) {
1930         delete packet;
1931         packet = nullptr;
1932         delete message;
1933         message = nullptr;
1934         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1935         return errCode;
1936     }
1937     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1938         context->GetSequenceId(), context->GetRequestSessionId());
1939     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
1940         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
1941     };
1942     errCode = Send(context, message, handler, packetLen);
1943     if (errCode != E_OK) {
1944         delete message;
1945         message = nullptr;
1946     }
1947     return errCode;
1948 }
1949 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1950 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1951     uint32_t controlCmdType, const CommErrHandler &handler)
1952 {
1953     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1954     if (ackMessage == nullptr) {
1955         LOGE("[DataSync][SendControlAck] new message error");
1956         return -E_OUT_OF_MEMORY;
1957     }
1958     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1959     ControlAckPacket ack;
1960     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1961     int errCode = ackMessage->SetCopiedObject(ack);
1962     if (errCode != E_OK) {
1963         delete ackMessage;
1964         ackMessage = nullptr;
1965         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1966         return errCode;
1967     }
1968     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1969         message->GetSequenceId(), message->GetSessionId());
1970     errCode = Send(context, ackMessage, handler, 0);
1971     if (errCode != E_OK) {
1972         delete ackMessage;
1973         ackMessage = nullptr;
1974     }
1975     return errCode;
1976 }
1977 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1978 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1979 {
1980     if (context == nullptr || message == nullptr) {
1981         return -E_INVALID_ARGS;
1982     }
1983     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1984     if (packet == nullptr) {
1985         return -E_INVALID_ARGS;
1986     }
1987     uint32_t controlCmdType = packet->GetcontrolCmdType();
1988     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1989         return DoAbilitySyncIfNeed(context, message, true);
1990     }
1991     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1992         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1993         return -E_WAIT_NEXT_MESSAGE;
1994     }
1995     return E_OK;
1996 }
1997 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1998 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1999     const Message *message)
2000 {
2001     uint32_t controlCmdType = packet->GetcontrolCmdType();
2002     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
2003         return E_OK;
2004     }
2005     QuerySyncObject syncQuery = packet->GetQuery();
2006     int errCode;
2007     if (!packet->IsAutoSubscribe()) {
2008         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
2009         if (errCode != E_OK) {
2010             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2011             SendControlAck(context, message, errCode, controlCmdType);
2012             return -E_WAIT_NEXT_MESSAGE;
2013         }
2014     }
2015     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
2016         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
2017     if (mode >= SyncModeType::INVALID_MODE) {
2018         LOGE("[SingleVerDataSync] invalid mode");
2019         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
2020         return -E_WAIT_NEXT_MESSAGE;
2021     }
2022     errCode = CheckPermitSendData(mode, context);
2023     if (errCode != E_OK) {
2024         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
2025         SendControlAck(context, message, errCode, controlCmdType);
2026     }
2027     return errCode;
2028 }
2029 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2030 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2031 {
2032     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2033     if (packet == nullptr) {
2034         return -E_INVALID_ARGS;
2035     }
2036     int errCode = SubscribeRequestRecvPre(context, packet, message);
2037     if (errCode != E_OK) {
2038         return errCode;
2039     }
2040     uint32_t controlCmdType = packet->GetcontrolCmdType();
2041     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2042     if (subscribeManager == nullptr) {
2043         LOGE("[SingleVerDataSync] subscribeManager check failed");
2044         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2045         return -E_INVALID_ARGS;
2046     }
2047     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
2048     if (errCode != E_OK) {
2049         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2050             STR_MASK(GetDeviceId()));
2051         SendControlAck(context, message, errCode, controlCmdType);
2052         return errCode;
2053     }
2054     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2055     if (errCode != E_OK) {
2056         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2057             STR_MASK(GetDeviceId()));
2058         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2059         SendControlAck(context, message, errCode, controlCmdType);
2060         return errCode;
2061     }
2062     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2063     if (errCode != E_OK) {
2064         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2065         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2066         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2067             STR_MASK(GetDeviceId()));
2068         return errCode;
2069     }
2070     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2071     DBInfo dbInfo;
2072     storage_->GetDBInfo(dbInfo);
2073     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2074     return errCode;
2075 }
2076 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2077 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2078 {
2079     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2080     if (packet == nullptr) {
2081         return -E_INVALID_ARGS;
2082     }
2083     uint32_t controlCmdType = packet->GetcontrolCmdType();
2084     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2085     if (subscribeManager == nullptr) {
2086         LOGE("[SingleVerDataSync] subscribeManager check failed");
2087         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2088         return -E_INVALID_ARGS;
2089     }
2090     int errCode;
2091     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2092     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2093         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2094         if (errCode != E_OK) {
2095             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2096                 STR_MASK(GetDeviceId()));
2097             SendControlAck(context, message, errCode, controlCmdType);
2098             return errCode;
2099         }
2100     }
2101     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2102     if (errCode != E_OK) {
2103         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2104             STR_MASK(GetDeviceId()));
2105         return errCode;
2106     }
2107     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2108     DBInfo dbInfo;
2109     storage_->GetDBInfo(dbInfo);
2110     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
2111     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2112     return errCode;
2113 }
2114 
PutDataMsg(Message * message)2115 void SingleVerDataSync::PutDataMsg(Message *message)
2116 {
2117     return msgSchedule_.PutMsg(message);
2118 }
2119 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2120 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2121     bool &isNeedContinue)
2122 {
2123     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2124 }
2125 
IsNeedReloadQueue()2126 bool SingleVerDataSync::IsNeedReloadQueue()
2127 {
2128     return msgSchedule_.IsNeedReloadQueue();
2129 }
2130 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2131 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2132 {
2133     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2134 }
2135 
ClearDataMsg()2136 void SingleVerDataSync::ClearDataMsg()
2137 {
2138     msgSchedule_.ClearMsg();
2139 }
2140 
QuerySyncCheck(SingleVerSyncTaskContext * context)2141 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2142 {
2143     if (context == nullptr) {
2144         return -E_INVALID_ARGS;
2145     }
2146     bool isCheckStatus = false;
2147     int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2148     if (errCode != E_OK) {
2149         return errCode;
2150     }
2151     if (!isCheckStatus) {
2152         context->SetTaskErrCode(-E_NOT_SUPPORT);
2153         return -E_NOT_SUPPORT;
2154     }
2155     return E_OK;
2156 }
2157 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2158 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2159     const std::shared_ptr<SubscribeManager> &subscribeManager)
2160 {
2161     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2162         storage_->RemoveSubscribe(queryId);
2163     }
2164 }
2165 } // namespace DistributedDB
2166