• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43     storage_ = nullptr;
44     communicateHandle_ = nullptr;
45     metadata_ = nullptr;
46 }
47 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49     std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52         return -E_INVALID_ARGS;
53     }
54     storage_ = static_cast<SyncGenericInterface *>(inStorage);
55     communicateHandle_ = inCommunicateHandle;
56     metadata_ = inMetadata;
57     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58     std::vector<uint8_t> label = inStorage->GetIdentifier();
59     label.resize(3); // only show 3 Bytes enough
60     label_ = DBCommon::VectorToHexString(label);
61     deviceId_ = deviceId;
62     msgSchedule_.Initialize(label_, deviceId_);
63     return E_OK;
64 }
65 
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68     std::lock_guard<std::mutex> lock(lock_);
69     if (sessionId_ != 0) { // auto sync timeout resend
70         return ReSendData(context);
71     }
72     ResetSyncStatus(mode, context);
73     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
74     int tmpMode = SyncOperation::TransferSyncMode(mode);
75     int errCode = E_OK;
76     if (tmpMode == SyncModeType::PUSH) {
77         errCode = PushStart(context);
78     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79         errCode = PushPullStart(context);
80     } else if (tmpMode == SyncModeType::PULL) {
81         errCode = PullRequestStart(context);
82     } else {
83         errCode = PullResponseStart(context);
84     }
85     if (context->IsSkipTimeoutError(errCode)) {
86         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
87         // just return to avoid higher pressure for send.
88         return E_OK;
89     }
90     if (errCode != E_OK) {
91         LOGE("[DataSync] SendStart errCode=%d", errCode);
92         return errCode;
93     }
94     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
95         LOGE("wait for recv finished for push and pull mode");
96         return -E_EKEYREVOKED;
97     }
98     return InnerSyncStart(context);
99 }
100 
InnerSyncStart(SingleVerSyncTaskContext * context)101 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
102 {
103     while (true) {
104         if (windowSize_ <= 0 || isAllDataHasSent_) {
105             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
106                 label_.c_str(), STR_MASK(deviceId_));
107             return E_OK;
108         }
109         int mode = SyncOperation::TransferSyncMode(mode_);
110         if (mode == SyncModeType::PULL) {
111             LOGE("[DataSync] unexpected error");
112             return -E_INVALID_ARGS;
113         }
114         int errCode;
115         context->IncSequenceId();
116         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
117             errCode = PushStart(context);
118         } else {
119             errCode = PullResponseStart(context);
120         }
121         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
122             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
123             isAllDataHasSent_ = true;
124             return -E_EKEYREVOKED;
125         }
126         if (context->IsSkipTimeoutError(errCode)) {
127             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
128             // just return to avoid higher pressure for send.
129             return E_OK;
130         }
131         if (errCode != E_OK) {
132             LOGE("[DataSync] InnerSend errCode=%d", errCode);
133             return errCode;
134         }
135     }
136     return E_OK;
137 }
138 
InnerClearSyncStatus()139 void SingleVerDataSync::InnerClearSyncStatus()
140 {
141     sessionId_ = 0;
142     reSendMap_.clear();
143     windowSize_ = 0;
144     maxSequenceIdHasSent_ = 0;
145     isAllDataHasSent_ = false;
146 }
147 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)148 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
149 {
150     if (message == nullptr) {
151         LOGE("[DataSync] AckRecv message nullptr");
152         return -E_INVALID_ARGS;
153     }
154     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
155     if (packet == nullptr) {
156         return -E_INVALID_ARGS;
157     }
158     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
159     uint32_t sessionId = message->GetSessionId();
160     uint32_t sequenceId = message->GetSequenceId();
161 
162     std::lock_guard<std::mutex> lock(lock_);
163     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
164         windowSize_, label_.c_str(), STR_MASK(deviceId_));
165     if (sessionId != sessionId_) {
166         LOGI("[DataSync] ignore ack,sessionId is different");
167         return E_OK;
168     }
169     Timestamp lastQueryTime = 0;
170     if (reSendMap_.count(sequenceId) != 0) {
171         lastQueryTime = reSendMap_[sequenceId].end;
172         reSendMap_.erase(sequenceId);
173         windowSize_++;
174     } else {
175         LOGI("[DataSync] ack seqId not in map");
176         return E_OK;
177     }
178     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
179         Timestamp dbLastQueryTime = 0;
180         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
181         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
182             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
183         }
184         if (errCode != E_OK) {
185             return errCode;
186         }
187     }
188     if (!isAllDataHasSent_) {
189         return InnerSyncStart(context);
190     } else if (reSendMap_.size() == 0) {
191         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
192         InnerClearSyncStatus();
193         return -E_FINISHED;
194     }
195     return E_OK;
196 }
197 
ClearSyncStatus()198 void SingleVerDataSync::ClearSyncStatus()
199 {
200     std::lock_guard<std::mutex> lock(lock_);
201     InnerClearSyncStatus();
202 }
203 
ReSendData(SingleVerSyncTaskContext * context)204 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
205 {
206     if (reSendMap_.empty()) {
207         LOGI("[DataSync] ReSend map empty");
208         return -E_INTERNAL_ERROR;
209     }
210     uint32_t sequenceId = reSendMap_.begin()->first;
211     ReSendInfo reSendInfo = reSendMap_.begin()->second;
212     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
213         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
214         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
215         windowSize_, label_.c_str(), STR_MASK(deviceId_));
216     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
217         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
218     return ReSend(context, dataReSendInfo);
219 }
220 
GetLocalDeviceName()221 std::string SingleVerDataSync::GetLocalDeviceName()
222 {
223     std::string deviceInfo;
224     if (communicateHandle_ != nullptr) {
225         communicateHandle_->GetLocalIdentity(deviceInfo);
226     }
227     return deviceInfo;
228 }
229 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)230 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
231     uint32_t packetLen)
232 {
233     bool startFeedDogRet = false;
234     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
235         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
236             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
237         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
238     }
239     SendConfig sendConfig;
240     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
241     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
242     if (errCode != E_OK) {
243         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
244         if (startFeedDogRet) {
245             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
246         }
247     }
248     return errCode;
249 }
250 
GetData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)251 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize)
252 {
253     int errCode;
254     UpdateMtuSize();
255     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
256         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
257         LOGI("[DataSync][GetData] resend data");
258         errCode = GetUnsyncData(context, outData, packetSize);
259     } else {
260         ContinueToken token;
261         context->GetContinueToken(token);
262         if (token == nullptr) {
263             errCode = GetUnsyncData(context, outData, packetSize);
264         } else {
265             LOGD("[DataSync][GetData] get data from token");
266             // if there is data to send, read out data, and update local watermark, send data
267             errCode = GetNextUnsyncData(context, outData, packetSize);
268         }
269     }
270     if (errCode == -E_UNFINISHED) {
271         LOGD("[DataSync][GetData] not finished.");
272     }
273     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
274         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
275         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
276     }
277     return errCode;
278 }
279 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)280 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
281 {
282     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
283     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
284         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
285     bool needCompressOnSync = false;
286     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
287     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
288     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
289     if (performance != nullptr) {
290         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
291     }
292     context->StartFeedDogForGetData(context->GetResponseSessionId());
293     int errCode = GetData(context, syncOutData.entries, packetSize);
294     context->StopFeedDogForGetData();
295     if (performance != nullptr) {
296         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
297     }
298     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
299         context->SetTaskErrCode(errCode);
300         return errCode;
301     }
302 
303     int innerCode = InterceptData(syncOutData);
304     if (innerCode != E_OK) {
305         context->SetTaskErrCode(innerCode);
306         return innerCode;
307     }
308 
309     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
310     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
311         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
312             { remoteAlgo, version });
313         if (compressCode != E_OK) {
314             return compressCode;
315         }
316     }
317     return errCode;
318 }
319 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)320 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
321     size_t packetSize)
322 {
323     int errCode;
324     WaterMark startMark = 0;
325     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
326     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
327     WaterMark endMark = MAX_TIMESTAMP;
328     if ((startMark > endMark)) {
329         return E_OK;
330     }
331     ContinueToken token = nullptr;
332     context->GetContinueToken(token);
333     if (token != nullptr) {
334         storage_->ReleaseContinueToken(token);
335     }
336     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
337     if (curType != SyncType::QUERY_SYNC_TYPE) {
338         errCode = storage_->GetSyncData(startMark, endMark, outData, token, syncDataSizeInfo);
339     } else {
340         WaterMark deletedStartMark = 0;
341         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
342         Timestamp lastQueryTimestamp = 0;
343         errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(),
344             context->GetDeviceId(), lastQueryTimestamp);
345         if (errCode == E_OK) {
346             QuerySyncObject queryObj = context->GetQuery();
347             errCode = storage_->GetSyncData(queryObj,
348                 SyncTimeRange{ startMark, deletedStartMark, endMark, endMark, lastQueryTimestamp},
349                 syncDataSizeInfo, token, outData);
350         }
351     }
352     context->SetContinueToken(token);
353     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
354         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
355     }
356     return errCode;
357 }
358 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)359 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
360     size_t packetSize)
361 {
362     ContinueToken token;
363     context->GetContinueToken(token);
364     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
365     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
366     context->SetContinueToken(token);
367     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
368         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
369     }
370     return errCode;
371 }
372 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)373 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
374     SyncType curType, const QuerySyncObject &query)
375 {
376     if (inData.empty()) {
377         return E_OK;
378     }
379     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
380     if (performance != nullptr) {
381         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
382     }
383 
384     const std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
385     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
386     int errCode = E_OK;
387     // query only support prefix key and don't have query in packet in 104 version
388     errCode = storage_->PutSyncDataWithQuery(query, inData, context->GetDeviceId());
389     if (performance != nullptr) {
390         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
391     }
392     if (errCode != E_OK) {
393         LOGE("[DataSync][SaveData] save sync data failed,errCode=%d", errCode);
394     }
395     return errCode;
396 }
397 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)398 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
399 {
400     mode_ = inMode;
401     maxSequenceIdHasSent_ = 0;
402     isAllDataHasSent_ = false;
403     context->ReSetSequenceId();
404     reSendMap_.clear();
405     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
406         windowSize_ = LOW_VERSION_WINDOW_SIZE;
407     } else {
408         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
409     }
410     int mode = SyncOperation::TransferSyncMode(inMode);
411     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
412         sessionId_ = context->GetRequestSessionId();
413     } else {
414         sessionId_ = context->GetResponseSessionId();
415     }
416 }
417 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)418 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
419     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
420 {
421     WaterMark localMark = 0;
422     WaterMark deleteMark = 0;
423     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
424     GetLocalDeleteSyncWaterMark(context, deleteMark);
425     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
426 }
427 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const428 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
429     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
430 {
431     WaterMark localMark = 0;
432     int errCode = E_OK;
433     const std::string &deviceId = context->GetDeviceId();
434     std::string queryId = context->GetQuerySyncId();
435     if (syncType != SyncType::QUERY_SYNC_TYPE) {
436         if (isCheckBeforUpdate) {
437             GetLocalWaterMark(syncType, queryId, context, localMark);
438             if (localMark >= dataTimeRange.endTime) {
439                 return E_OK;
440             }
441         }
442         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
443     } else {
444         bool isNeedUpdateMark = true;
445         bool isNeedUpdateDeleteMark = true;
446         if (isCheckBeforUpdate) {
447             WaterMark deleteDataWaterMark = 0;
448             GetLocalWaterMark(syncType, queryId, context, localMark);
449             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
450             if (localMark >= dataTimeRange.endTime) {
451                 isNeedUpdateMark = false;
452             }
453             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
454                 isNeedUpdateDeleteMark = false;
455             }
456         }
457         if (isNeedUpdateMark) {
458             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
459             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
460             if (errCode != E_OK) {
461                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
462                 return errCode;
463             }
464         }
465         if (isNeedUpdateDeleteMark) {
466             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
467                 dataTimeRange.deleteEndTime);
468             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
469         }
470     }
471     if (errCode != E_OK) {
472         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
473     }
474     return errCode;
475 }
476 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const477 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
478     const DeviceID &deviceId, WaterMark &waterMark) const
479 {
480     if (syncType != SyncType::QUERY_SYNC_TYPE) {
481         metadata_->GetPeerWaterMark(deviceId, waterMark);
482         return;
483     }
484     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
485 }
486 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)487 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
488 {
489     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
490 }
491 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const492 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
493     WaterMark &waterMark) const
494 {
495     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
496 }
497 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const498 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
499     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
500 {
501     if (syncType != SyncType::QUERY_SYNC_TYPE) {
502         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
503         return;
504     }
505     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
506         waterMark, context->IsAutoLiftWaterMark());
507 }
508 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)509 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
510     WaterMark maxSendDataTime)
511 {
512     bool isNeedClearRemoteData = false;
513     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
514     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
515         uint64_t clearDeviceDataMark = 0;
516         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
517         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
518     } else {
519         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
520         if (packet == nullptr) {
521             LOGE("[RemoveDeviceDataHandle] get packet object failed");
522             return -E_INVALID_ARGS;
523         }
524         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
525         WaterMark packetLocalMark = packet->GetLocalWaterMark();
526         WaterMark peerMark = 0;
527         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
528         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
529     }
530     if (!isNeedClearRemoteData) {
531         return E_OK;
532     }
533     int errCode = E_OK;
534     if (context->IsNeedClearRemoteStaleData()) {
535         // need to clear remote device history data
536         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
537         if (errCode != E_OK) {
538             (void)SendDataAck(context, message, errCode, maxSendDataTime);
539             return errCode;
540         }
541         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
542             // avoid repeat clear in ack
543             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
544         }
545     }
546     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
547         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
548         if (errCode != E_OK) {
549             (void)SendDataAck(context, message, errCode, maxSendDataTime);
550             return errCode;
551         }
552     }
553     return E_OK;
554 }
555 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)556 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
557     const std::vector<uint64_t> &reserved)
558 {
559     bool isNeedClearRemoteData = false;
560     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
561     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
562     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
563         uint64_t clearDeviceDataMark = 0;
564         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
565         isNeedClearRemoteData = (clearDeviceDataMark != 0);
566     } else if (reserved.empty()) {
567         WaterMark localMark = 0;
568         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
569         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
570     } else {
571         WaterMark peerMark = 0;
572         GetPeerWaterMark(curType, context->GetQuerySyncId(),
573             context->GetDeviceId(), peerMark);
574         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
575     }
576     if (!isNeedClearRemoteData) {
577         return E_OK;
578     }
579     // need to clear remote historydata
580     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
581         label_.c_str(), STR_MASK(GetDeviceId()));
582     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
583     if (errCode != E_OK) {
584         return errCode;
585     }
586     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
587         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
588     }
589     return errCode;
590 }
591 
SetSessionEndTimestamp(Timestamp end)592 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
593 {
594     sessionEndTimestamp_ = end;
595 }
596 
GetSessionEndTimestamp() const597 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
598 {
599     return sessionEndTimestamp_;
600 }
601 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)602 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
603 {
604     ReSendInfo reSendInfo;
605     reSendInfo.start = dataTimeRange.beginTime;
606     reSendInfo.end = dataTimeRange.endTime;
607     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
608     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
609     reSendInfo.packetId = context->GetPacketId();
610     maxSequenceIdHasSent_++;
611     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
612     windowSize_--;
613     ContinueToken token;
614     context->GetContinueToken(token);
615     if (token == nullptr) {
616         isAllDataHasSent_ = true;
617     }
618     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
619         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
620         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
621         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
622 }
623 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)624 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
625     SyncEntry &syncData, int sendCode, int mode)
626 {
627     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
628     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
629     WaterMark localMark = 0;
630     WaterMark peerMark = 0;
631     WaterMark deleteMark = 0;
632     bool needCompressOnSync = false;
633     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
634     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
635     std::string id = context->GetQuerySyncId();
636     GetLocalWaterMark(curType, id, context, localMark);
637     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
638     GetLocalDeleteSyncWaterMark(context, deleteMark);
639     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
640         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
641         packet->SetLastSequence();
642     }
643     int tmpMode = mode;
644     if (mode == SyncModeType::RESPONSE_PULL) {
645         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
646     }
647     packet->SetData(syncData.entries);
648     packet->SetCompressData(syncData.compressedEntries);
649     packet->SetBasicInfo(sendCode, version, tmpMode);
650     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
651     packet->SetWaterMark(localMark, peerMark, deleteMark);
652     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
653         packet->SetEndWaterMark(context->GetEndMark());
654         packet->SetSessionId(context->GetRequestSessionId());
655     }
656     packet->SetQuery(context->GetQuery());
657     packet->SetQueryId(context->GetQuerySyncId());
658     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
659     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
660         packet->SetCompressDataMark();
661         packet->SetCompressAlgo(curAlgo);
662     }
663     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
664     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
665         context->GetQuery().HasOrderBy())) {
666         packet->SetUpdateWaterMark();
667     }
668     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
669         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
670         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
671 }
672 
RequestStart(SingleVerSyncTaskContext * context,int mode)673 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
674 {
675     if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
676         context->SetTaskErrCode(-E_NOT_SUPPORT);
677         return -E_NOT_SUPPORT;
678     }
679     int errCode = RemoveDeviceDataIfNeed(context);
680     if (errCode != E_OK) {
681         context->SetTaskErrCode(errCode);
682         return errCode;
683     }
684     SyncEntry syncData;
685     // get data
686     errCode = GetDataWithPerformanceRecord(context, syncData);
687     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
688 
689     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
690         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
691         return errCode;
692     }
693 
694     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
695     if (packet == nullptr) {
696         LOGE("[DataSync][PushStart] new DataRequestPacket error");
697         return -E_OUT_OF_MEMORY;
698     }
699     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
700     UpdateWaterMark isUpdateWaterMark;
701     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
702     if (errCode == E_OK) {
703         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
704     }
705     FillDataRequestPacket(packet, context, syncData, errCode, mode);
706     errCode = SendDataPacket(curType, packet, context);
707     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
708     if (performance != nullptr) {
709         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
710     }
711     if (errCode == E_OK || errCode == -E_TIMEOUT) {
712         UpdateSendInfo(dataTime, context);
713     }
714     if (errCode == E_OK) {
715         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
716             context->GetQuery().HasOrderBy())) {
717             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
718             return E_OK;
719         }
720         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
721         SaveLocalWaterMark(curType, context, tmpDataTime);
722     }
723     return errCode;
724 }
725 
PushStart(SingleVerSyncTaskContext * context)726 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
727 {
728     if (context == nullptr) {
729         return -E_INVALID_ARGS;
730     }
731     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
732     return RequestStart(context,
733         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
734 }
735 
PushPullStart(SingleVerSyncTaskContext * context)736 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
737 {
738     if (context == nullptr) {
739         return -E_INVALID_ARGS;
740     }
741     return RequestStart(context, context->GetMode());
742 }
743 
PullRequestStart(SingleVerSyncTaskContext * context)744 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
745 {
746     if (context == nullptr) {
747         return -E_INVALID_ARGS;
748     }
749     if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
750         context->SetTaskErrCode(-E_NOT_SUPPORT);
751         return -E_NOT_SUPPORT;
752     }
753     int errCode = RemoveDeviceDataIfNeed(context);
754     if (errCode != E_OK) {
755         context->SetTaskErrCode(errCode);
756         return errCode;
757     }
758     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
759     if (packet == nullptr) {
760         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
761         return -E_OUT_OF_MEMORY;
762     }
763     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
764     WaterMark peerMark = 0;
765     WaterMark localMark = 0;
766     WaterMark deleteMark = 0;
767     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
768         context->GetDeviceId(), peerMark);
769     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
770     GetLocalDeleteSyncWaterMark(context, deleteMark);
771     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
772     WaterMark endMark = context->GetEndMark();
773     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
774 
775     packet->SetBasicInfo(E_OK, version, context->GetMode());
776     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
777     packet->SetWaterMark(localMark, peerMark, deleteMark);
778     packet->SetEndWaterMark(endMark);
779     packet->SetSessionId(context->GetRequestSessionId());
780     packet->SetQuery(context->GetQuery());
781     packet->SetQueryId(context->GetQuerySyncId());
782     packet->SetLastSequence();
783     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
784     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
785 
786     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
787         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark, label_.c_str(),
788         STR_MASK(GetDeviceId()));
789     UpdateSendInfo(dataTime, context);
790     return SendDataPacket(syncType, packet, context);
791 }
792 
PullResponseStart(SingleVerSyncTaskContext * context)793 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
794 {
795     if (context == nullptr) {
796         return -E_INVALID_ARGS;
797     }
798     SyncEntry syncData;
799     // get data
800     int errCode = GetDataWithPerformanceRecord(context, syncData);
801     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
802         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
803             SendPullResponseDataPkt(errCode, syncData, context);
804         }
805         return errCode;
806     }
807     // if send finished
808     int ackCode = E_OK;
809     ContinueToken token = nullptr;
810     context->GetContinueToken(token);
811     if (errCode == E_OK && token == nullptr) {
812         LOGD("[DataSync][PullResponse] send last frame end");
813         ackCode = SEND_FINISHED;
814     }
815     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
816     UpdateWaterMark isUpdateWaterMark;
817     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
818     if (errCode == E_OK) {
819         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
820     }
821     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
822     if (errCode == E_OK || errCode == -E_TIMEOUT) {
823         UpdateSendInfo(dataTime, context);
824     }
825     if (errCode == E_OK) {
826         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
827             context->GetQuery().HasOrderBy())) {
828             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
829             return E_OK;
830         }
831         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
832         SaveLocalWaterMark(curType, context, tmpDataTime);
833     }
834     return errCode;
835 }
836 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)837 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
838     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
839 {
840     WaterMark tmpPeerWatermark = dataTime.endTime;
841     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
842     if (isUpdateWaterMark.normalUpdateMark) {
843         tmpPeerWatermark++;
844     }
845     if (isUpdateWaterMark.deleteUpdateMark) {
846         tmpPeerDeletedWatermark++;
847     }
848     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
849 }
850 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)851 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
852     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
853 {
854     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
855         return;
856     }
857     int errCode = E_OK;
858     if (syncType != SyncType::QUERY_SYNC_TYPE) {
859         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
860     } else {
861         if (peerWatermark != 0) {
862             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
863             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
864             if (errCode != E_OK) {
865                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
866             }
867         }
868         if (peerDeletedWatermark != 0) {
869             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
870                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
871             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
872         }
873     }
874     if (errCode != E_OK) {
875         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
876     }
877 }
878 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)879 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
880 {
881     uint16_t remoteCommunicatorVersion = 0;
882     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
883         -E_NOT_FOUND) {
884         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
885         return -E_VERSION_NOT_SUPPORT;
886     }
887     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
888     if (remoteCommunicatorVersion == 0) {
889         LOGI("[DataSync] set remote version 0");
890         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
891         return E_OK;
892     } else {
893         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
894         if (isControlMsg) {
895             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
896         } else {
897             SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
898         }
899         return -E_WAIT_NEXT_MESSAGE;
900     }
901 }
902 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)903 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
904 {
905     if (context == nullptr || message == nullptr) {
906         return -E_INVALID_ARGS;
907     }
908     auto *packet = message->GetObject<DataRequestPacket>();
909     if (packet == nullptr) {
910         return -E_INVALID_ARGS;
911     }
912     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
913         return DoAbilitySyncIfNeed(context, message);
914     }
915     int32_t sendCode = packet->GetSendCode();
916     if (sendCode == -E_VERSION_NOT_SUPPORT) {
917         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
918         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
919         return -E_WAIT_NEXT_MESSAGE;
920     }
921     // only deal with pull response packet errCode
922     if (sendCode != E_OK && sendCode != SEND_FINISHED &&
923         message->GetSessionId() == context->GetRequestSessionId()) {
924         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
925         return sendCode;
926     }
927     int errCode = RunPermissionCheck(context, message, packet);
928     if (errCode != E_OK) {
929         return errCode;
930     }
931     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
932         errCode = CheckSchemaStrategy(context, message);
933     }
934     if (errCode == E_OK) {
935         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
936     }
937     if (errCode != E_OK) {
938         (void)SendDataAck(context, message, errCode, 0);
939     }
940     return errCode;
941 }
942 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)943 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
944     WaterMark &pullEndWatermark)
945 {
946     int errCode = DataRequestRecvPre(context, message);
947     if (errCode != E_OK) {
948         return errCode;
949     }
950     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
951     const std::vector<SendDataItem> &data = packet->GetData();
952     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
953     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
954         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
955         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
956     context->SetReceiveWaterMarkErr(false);
957     UpdateWaterMark isUpdateWaterMark;
958     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
959     errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
960     if (errCode != E_OK) {
961         return errCode;
962     }
963     if (WaterMarkErrHandle(curType, context, message)) {
964         return E_OK;
965     }
966     GetPullEndWatermark(context, packet, pullEndWatermark);
967     // save data first
968     errCode = SaveData(context, data, curType, packet->GetQuery());
969     if (errCode != E_OK) {
970         (void)SendDataAck(context, message, errCode, dataTime.endTime);
971         return errCode;
972     }
973     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
974         pullEndWatermark = 0;
975         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
976     } else {
977         // if data is empty, we don't know the max timestap of this packet.
978         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
979     }
980     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
981         context->GetRequestSessionId());
982     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
983         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
984     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
985         UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
986     }
987     if (errCode != E_OK) {
988         return errCode;
989     }
990     if (packet->GetSendCode() == SEND_FINISHED) {
991         return -E_RECV_FINISHED;
992     }
993     return errCode;
994 }
995 
SendDataPacket(SyncType syncType,const DataRequestPacket * packet,SingleVerSyncTaskContext * context)996 int SingleVerDataSync::SendDataPacket(SyncType syncType, const DataRequestPacket *packet,
997     SingleVerSyncTaskContext *context)
998 {
999     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1000     if (message == nullptr) {
1001         LOGE("[DataSync][SendDataPacket] new message error");
1002         delete packet;
1003         packet = nullptr;
1004         return -E_OUT_OF_MEMORY;
1005     }
1006     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1007     int errCode = message->SetExternalObject(packet);
1008     if (errCode != E_OK) {
1009         delete packet;
1010         packet = nullptr;
1011         delete message;
1012         message = nullptr;
1013         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1014         return errCode;
1015     }
1016     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1017         context->GetSequenceId(), context->GetRequestSessionId());
1018     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1019     if (performance != nullptr) {
1020         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1021     }
1022     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1023         context, message->GetSessionId());
1024     errCode = Send(context, message, handler, packetLen);
1025     if (errCode != E_OK) {
1026         delete message;
1027         message = nullptr;
1028     }
1029 
1030     return errCode;
1031 }
1032 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1033 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1034     SingleVerSyncTaskContext *context)
1035 {
1036     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1037     if (packet == nullptr) {
1038         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1039         return -E_OUT_OF_MEMORY;
1040     }
1041     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1042     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1043     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1044     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1045     if (message == nullptr) {
1046         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1047         delete packet;
1048         packet = nullptr;
1049         return -E_OUT_OF_MEMORY;
1050     }
1051     int errCode = message->SetExternalObject(packet);
1052     if (errCode != E_OK) {
1053         delete packet;
1054         packet = nullptr;
1055         delete message;
1056         message = nullptr;
1057         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1058         return errCode;
1059     }
1060     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1061         context->GetSequenceId(), context->GetResponseSessionId());
1062     SendResetWatchDogPacket(context, packetLen);
1063     errCode = Send(context, message, nullptr, packetLen);
1064     if (errCode != E_OK) {
1065         delete message;
1066         message = nullptr;
1067     }
1068     return errCode;
1069 }
1070 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1071 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1072 {
1073     SendDataAck(context, message, E_OK, 0);
1074 }
1075 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1076 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1077     WaterMark maxSendDataTime)
1078 {
1079     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1080     if (packet == nullptr) {
1081         return -E_INVALID_ARGS;
1082     }
1083     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1084     if (ackMessage == nullptr) {
1085         LOGE("[DataSync][SendDataAck] new message error");
1086         return -E_OUT_OF_MEMORY;
1087     }
1088     DataAckPacket ack;
1089     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1090     int errCode = ackMessage->SetCopiedObject(ack);
1091     if (errCode != E_OK) {
1092         delete ackMessage;
1093         ackMessage = nullptr;
1094         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1095         return errCode;
1096     }
1097     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1098         message->GetSequenceId(), message->GetSessionId());
1099 
1100     errCode = Send(context, ackMessage, nullptr, 0);
1101     if (errCode != E_OK) {
1102         delete ackMessage;
1103         ackMessage = nullptr;
1104     }
1105     return errCode;
1106 }
1107 
AckPacketIdCheck(const Message * message)1108 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1109 {
1110     if (message == nullptr) {
1111         LOGE("[DataSync] AckRecv message nullptr");
1112         return false;
1113     }
1114     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1115         return true;
1116     }
1117     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1118     if (packet == nullptr) {
1119         return false;
1120     }
1121     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1122     std::lock_guard<std::mutex> lock(lock_);
1123     uint32_t sequenceId = message->GetSequenceId();
1124     if (reSendMap_.count(sequenceId) != 0) {
1125         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1126         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1127             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1128                 originalPacketId);
1129             return false;
1130         }
1131     }
1132     return true;
1133 }
1134 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1135 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1136 {
1137     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1138     if (errCode != E_OK) {
1139         return errCode;
1140     }
1141     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1142     if (packet == nullptr) {
1143         return -E_INVALID_ARGS;
1144     }
1145     int32_t recvCode = packet->GetRecvCode();
1146     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1147         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1148     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1149         LOGE("[DataSync][AckRecv] Version mismatch");
1150         return -E_VERSION_NOT_SUPPORT;
1151     }
1152 
1153     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT) {
1154         // we should ReleaseContinueToken, avoid crash
1155         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1156             STR_MASK(GetDeviceId()));
1157         context->ReleaseContinueToken();
1158         return recvCode;
1159     }
1160     uint64_t data = packet->GetData();
1161     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1162         return DealWaterMarkException(context, data, packet->GetReserved());
1163     }
1164 
1165     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1166         // data only use low 32bit
1167         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1168         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1169             STR_MASK(GetDeviceId()));
1170     }
1171 
1172     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1173         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1174             label_.c_str(), STR_MASK(GetDeviceId()));
1175         return recvCode;
1176     }
1177 
1178     // Judge if send finished
1179     ContinueToken token;
1180     context->GetContinueToken(token);
1181     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1182         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1183         return -E_NO_DATA_SEND;
1184     }
1185 
1186     // send next data
1187     return -E_SEND_DATA;
1188 }
1189 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1190 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1191     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1192 {
1193     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1194         LOGE("[SingleVerDataSync] messageId not available.");
1195         return;
1196     }
1197     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1198     if (ackMessage == nullptr) {
1199         LOGE("[DataSync][SaveDataNotify] new message failed");
1200         return;
1201     }
1202 
1203     DataAckPacket ack;
1204     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1205     ack.SetVersion(pktVersion);
1206     int errCode = ackMessage->SetCopiedObject(ack);
1207     if (errCode != E_OK) {
1208         delete ackMessage;
1209         ackMessage = nullptr;
1210         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1211         return;
1212     }
1213     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1214 
1215     errCode = Send(context, ackMessage, nullptr, 0);
1216     if (errCode != E_OK) {
1217         delete ackMessage;
1218         ackMessage = nullptr;
1219     }
1220     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1221         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1222 }
1223 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1224 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1225     WaterMark &pullEndWatermark) const
1226 {
1227     if (packet == nullptr) {
1228         return;
1229     }
1230     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1231     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1232         WaterMark endMark = packet->GetEndWaterMark();
1233         TimeOffset offset;
1234         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1235         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1236         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1237             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1238     }
1239 }
1240 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1241 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1242     const std::vector<uint64_t> &reserved)
1243 {
1244     WaterMark deletedWaterMark = 0;
1245     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1246     if (curType == SyncType::QUERY_SYNC_TYPE) {
1247         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1248             LOGE("[DataSync] get packet reserve size failed");
1249             return -E_INVALID_ARGS;
1250         }
1251         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1252     }
1253     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1254         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1255     int errCode = SaveLocalWaterMark(curType, context,
1256         {0, 0, ackWaterMark, deletedWaterMark});
1257     if (errCode != E_OK) {
1258         return errCode;
1259     }
1260     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1261     context->IncNegotiationCount();
1262     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1263     if (!context->IsNeedClearRemoteStaleData()) {
1264         return -E_RE_SEND_DATA;
1265     }
1266     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1267     if (errCode != E_OK) {
1268         return errCode;
1269     }
1270     return -E_RE_SEND_DATA;
1271 }
1272 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1273 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1274     const DataRequestPacket *packet)
1275 {
1276     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1277     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1278     if (errCode != E_OK) {
1279         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1280             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1281         }
1282         return -E_NOT_PERMIT;
1283     }
1284     const std::vector<SendDataItem> &data = packet->GetData();
1285     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1286     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1287     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1288         !context->GetReceivcPermitCheck()) {
1289         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1290         if (permitReceive) {
1291             context->SetReceivcPermitCheck(true);
1292         } else {
1293             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1294             return -E_SECURITY_OPTION_CHECK_ERROR;
1295         }
1296     }
1297     return errCode;
1298 }
1299 
1300 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1301 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1302 {
1303     // When mtu less than 30k, we send data with bluetooth
1304     // In order not to block the bluetooth channel, we don't send notify packet here
1305     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1306         return;
1307     }
1308     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1309 
1310     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1311     if (ackMessage == nullptr) {
1312         LOGE("[DataSync][ResetWatchDog] new message failed");
1313         return;
1314     }
1315 
1316     DataAckPacket ack;
1317     ack.SetData(data);
1318     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1319     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1320     int errCode = ackMessage->SetCopiedObject(ack);
1321     if (errCode != E_OK) {
1322         delete ackMessage;
1323         ackMessage = nullptr;
1324         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1325         return;
1326     }
1327     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1328         context->GetSequenceId(), context->GetResponseSessionId());
1329 
1330     errCode = Send(context, ackMessage, nullptr, 0);
1331     if (errCode != E_OK) {
1332         delete ackMessage;
1333         ackMessage = nullptr;
1334         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1335             STR_MASK(GetDeviceId()));
1336     } else {
1337         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1338             STR_MASK(GetDeviceId()));
1339     }
1340 }
1341 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1342 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1343 {
1344     if (context == nullptr) {
1345         return -E_INVALID_ARGS;
1346     }
1347     SyncEntry syncData;
1348     int errCode = GetReSendData(syncData, context, reSendInfo);
1349     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1350         return errCode;
1351     }
1352     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1353     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1354     if (packet == nullptr) {
1355         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1356         return -E_OUT_OF_MEMORY;
1357     }
1358     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1359     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1360     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1361         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1362         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1363         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1364             dataTime.deleteEndTime += 1;
1365         }
1366         if (reSendInfo.end > reSendInfo.start) {
1367             dataTime.endTime += 1;
1368         }
1369         SaveLocalWaterMark(curType, context, dataTime, true);
1370     }
1371     return errCode;
1372 }
1373 
SendReSendPacket(const DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1374 int SingleVerDataSync::SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1375     uint32_t sessionId, uint32_t sequenceId)
1376 {
1377     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1378     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1379     if (message == nullptr) {
1380         LOGE("[DataSync][SendDataPacket] new message error");
1381         delete packet;
1382         packet = nullptr;
1383         return -E_OUT_OF_MEMORY;
1384     }
1385     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1386     int errCode = message->SetExternalObject(packet);
1387     if (errCode != E_OK) {
1388         delete packet;
1389         packet = nullptr;
1390         delete message;
1391         message = nullptr;
1392         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1393         return errCode;
1394     }
1395     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1396     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1397         context, message->GetSessionId());
1398     errCode = Send(context, message, handler, packetLen);
1399     if (errCode != E_OK) {
1400         delete message;
1401         message = nullptr;
1402     }
1403     return errCode;
1404 }
1405 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1406 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1407 {
1408     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1409     int mode = SyncOperation::TransferSyncMode(inMode);
1410     // for pull mode it just need to get data, no need to send data.
1411     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1412         return E_OK;
1413     }
1414     if (context->GetSendPermitCheck()) {
1415         return E_OK;
1416     }
1417     bool isPermitSync = true;
1418     std::string deviceId = context->GetDeviceId();
1419     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1420     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1421         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1422     }
1423     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1424         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1425     if (isPermitSync) {
1426         context->SetSendPermitCheck(true);
1427         return E_OK;
1428     }
1429     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1430         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1431         return -E_SECURITY_OPTION_CHECK_ERROR;
1432     }
1433     if (mode == SyncModeType::RESPONSE_PULL) {
1434         SyncEntry syncData;
1435         SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1436         return -E_SECURITY_OPTION_CHECK_ERROR;
1437     }
1438     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1439         return -E_SECURITY_OPTION_CHECK_ERROR;
1440     }
1441     return E_OK;
1442 }
1443 
GetLabel() const1444 std::string SingleVerDataSync::GetLabel() const
1445 {
1446     return label_;
1447 }
1448 
GetDeviceId() const1449 std::string SingleVerDataSync::GetDeviceId() const
1450 {
1451     return deviceId_;
1452 }
1453 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1454 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1455 {
1456     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1457     if (packet == nullptr) {
1458         LOGE("[WaterMarkErrHandle] get packet object failed");
1459         return -E_INVALID_ARGS;
1460     }
1461     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1462     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1463     WaterMark peerMark = 0;
1464     WaterMark deletedMark = 0;
1465     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1466     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1467         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1468     }
1469     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1470         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1471         context->SetReceiveWaterMarkErr(true);
1472         SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1473         return true;
1474     }
1475     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1476         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1477             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1478             peerMark);
1479         context->SetReceiveWaterMarkErr(true);
1480         SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1481         return true;
1482     }
1483     return false;
1484 }
1485 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1486 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1487 {
1488     auto *packet = message->GetObject<DataRequestPacket>();
1489     if (packet == nullptr) {
1490         return -E_INVALID_ARGS;
1491     }
1492     auto query = packet->GetQuery();
1493     SyncStrategy localStrategy = context->GetSyncStrategy(query);
1494     if (!context->GetIsSchemaSync()) {
1495         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", context->GetIsSchemaSync());
1496         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1497         return -E_NEED_ABILITY_SYNC;
1498     }
1499     if (!localStrategy.permitSync) {
1500         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", localStrategy.permitSync);
1501         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1502         return -E_SCHEMA_MISMATCH;
1503     }
1504     return E_OK;
1505 }
1506 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1507 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1508 {
1509     int mode = SyncOperation::TransferSyncMode(inMode);
1510     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1511         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1512         return;
1513     }
1514 
1515     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1516         storage_->NotifyRemotePushFinished(deviceId_);
1517     }
1518 }
1519 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1520 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1521     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1522 {
1523     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1524     WaterMark localMark = 0;
1525     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1526     ackPacket.SetRecvCode(recvCode);
1527     // send ack packet
1528     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1529         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1530     } else if (recvCode != WATER_MARK_INVALID) {
1531         WaterMark mark = 0;
1532         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1533         ackPacket.SetData(mark);
1534     }
1535     std::vector<uint64_t> reserved {localMark};
1536     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1537     uint64_t packetId = 0;
1538     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1539         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1540     }
1541     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1542         reserved.push_back(packetId);
1543     }
1544     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1545     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1546         WaterMark deletedPeerMark;
1547         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1548         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1549     }
1550     ackPacket.SetReserved(reserved);
1551     ackPacket.SetVersion(version);
1552 }
1553 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1554 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1555     DataSyncReSendInfo reSendInfo)
1556 {
1557     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1558     if (mode == SyncModeType::PULL) {
1559         return E_OK;
1560     }
1561     ContinueToken token = nullptr;
1562     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1563     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1564         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1565     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1566     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1567     int errCode;
1568     if (curType != SyncType::QUERY_SYNC_TYPE) {
1569         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1570             reSendDataSizeInfo);
1571     } else {
1572         QuerySyncObject queryObj = context->GetQuery();
1573         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1574             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1575     }
1576     if (token != nullptr) {
1577         storage_->ReleaseContinueToken(token);
1578     }
1579     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1580         context->SetTaskErrCode(errCode);
1581         return errCode;
1582     }
1583     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1584         return errCode;
1585     }
1586 
1587     int innerCode = InterceptData(syncData);
1588     if (innerCode != E_OK) {
1589         context->SetTaskErrCode(innerCode);
1590         return innerCode;
1591     }
1592 
1593     bool needCompressOnSync = false;
1594     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1595     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1596     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1597     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1598         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1599             { remoteAlgo, version });
1600         if (compressCode != E_OK) {
1601             return compressCode;
1602         }
1603     }
1604     return errCode;
1605 }
1606 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1607 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1608 {
1609     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1610         return E_OK;
1611     }
1612     uint64_t clearRemoteDataMark = 0;
1613     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1614     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1615     if (clearRemoteDataMark == 0) {
1616         return E_OK;
1617     }
1618     int errCode = E_OK;
1619     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1620         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1621         if (errCode != E_OK) {
1622             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1623             return errCode;
1624         }
1625     }
1626     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1627         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1628         if (errCode != E_OK) {
1629             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1630             return errCode;
1631         }
1632     }
1633     return E_OK;
1634 }
1635 
UpdateMtuSize()1636 void SingleVerDataSync::UpdateMtuSize()
1637 {
1638     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1639 }
1640 
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1641 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1642     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1643 {
1644     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1645     WaterMark peerMark = 0;
1646     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1647         peerMark);
1648     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1649     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1650     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1651     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1652     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1653         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1654         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1655         packet->SetLastSequence();
1656     }
1657     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1658         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1659         sendCode = SEND_FINISHED;
1660     }
1661     packet->SetData(syncData.entries);
1662     packet->SetCompressData(syncData.compressedEntries);
1663     packet->SetBasicInfo(sendCode, version, reSendMode);
1664     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1665     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1666     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH) {
1667         packet->SetEndWaterMark(context->GetEndMark());
1668         packet->SetQuery(context->GetQuery());
1669     }
1670     packet->SetQueryId(context->GetQuerySyncId());
1671     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1672         std::vector<uint64_t> reserved {reSendInfo.packetId};
1673         packet->SetReserved(reserved);
1674     }
1675     bool needCompressOnSync = false;
1676     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1677     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1678     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1679     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1680         packet->SetCompressDataMark();
1681         packet->SetCompressAlgo(curAlgo);
1682     }
1683 }
1684 
GetDataSizeSpecInfo(size_t packetSize)1685 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1686 {
1687     bool needCompressOnSync = false;
1688     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1689     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1690     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1691         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1692     return {blockSize, packetSize};
1693 }
1694 
InterceptData(SyncEntry & syncEntry)1695 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1696 {
1697     if (storage_ == nullptr) {
1698         LOGE("Invalid DB. Can not intercept data.");
1699         return -E_INVALID_DB;
1700     }
1701 
1702     // GetLocalDeviceName get local device ID.
1703     // GetDeviceId get remote device ID.
1704     // If intercept data fail, entries will be released.
1705     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId());
1706 }
1707 
ControlCmdStart(SingleVerSyncTaskContext * context)1708 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1709 {
1710     if (context == nullptr) {
1711         return -E_INVALID_ARGS;
1712     }
1713     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1714     if (subManager == nullptr) {
1715         return -E_INVALID_ARGS;
1716     }
1717     int errCode = ControlCmdStartCheck(context);
1718     if (errCode != E_OK) {
1719         return errCode;
1720     }
1721     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1722     if (packet == nullptr) {
1723         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1724         return -E_OUT_OF_MEMORY;
1725     }
1726     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1727         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1728         if (errCode != E_OK) {
1729             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1730             delete packet;
1731             packet = nullptr;
1732             return errCode;
1733         }
1734     }
1735     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1736     errCode = SendControlPacket(packet, context);
1737     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1738         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1739     }
1740     return errCode;
1741 }
1742 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1743 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1744 {
1745     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1746     if (packet == nullptr) {
1747         return -E_INVALID_ARGS;
1748     }
1749     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1750         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1751     int errCode = ControlCmdRequestRecvPre(context, message);
1752     if (errCode != E_OK) {
1753         return errCode;
1754     }
1755     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1756         errCode = SubscribeRequestRecv(context, message);
1757     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1758         errCode = UnsubscribeRequestRecv(context, message);
1759     }
1760     return errCode;
1761 }
1762 
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1763 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1764 {
1765     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1766     if (subManager == nullptr) {
1767         return -E_INVALID_ARGS;
1768     }
1769     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1770     if (errCode != E_OK) {
1771         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1772         return errCode;
1773     }
1774     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1775     if (packet == nullptr) {
1776         return -E_INVALID_ARGS;
1777     }
1778     int32_t recvCode = packet->GetRecvCode();
1779     uint32_t cmdType = packet->GetcontrolCmdType();
1780     if (recvCode != E_OK) {
1781         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1782             STR_MASK(GetDeviceId()), cmdType);
1783         // for unsubscribe no need to do something
1784         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1785         return recvCode;
1786     }
1787     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1788         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1789     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1790         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1791     }
1792     if (errCode != E_OK) {
1793         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1794         return errCode;
1795     }
1796     return -E_NO_DATA_SEND; // means control msg send finished
1797 }
1798 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1799 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1800 {
1801     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1802         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1803         LOGE("[ControlCmdStartCheck] not support controlCmd");
1804         return -E_INVALID_ARGS;
1805     }
1806     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1807         context->GetQuery().HasInKeys() &&
1808         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1809         return -E_NOT_SUPPORT;
1810     }
1811     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1812         return E_OK;
1813     }
1814     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1815     if (permitReceive) {
1816         context->SetReceivcPermitCheck(true);
1817     } else {
1818         return -E_SECURITY_OPTION_CHECK_ERROR;
1819     }
1820     return E_OK;
1821 }
1822 
SendControlPacket(const ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1823 int SingleVerDataSync::SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1824 {
1825     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1826     if (message == nullptr) {
1827         LOGE("[DataSync][SendControlPacket] new message error");
1828         delete packet;
1829         packet = nullptr;
1830         return -E_OUT_OF_MEMORY;
1831     }
1832     uint32_t packetLen = packet->CalculateLen();
1833     int errCode = message->SetExternalObject(packet);
1834     if (errCode != E_OK) {
1835         delete packet;
1836         packet = nullptr;
1837         delete message;
1838         message = nullptr;
1839         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1840         return errCode;
1841     }
1842     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1843         context->GetSequenceId(), context->GetRequestSessionId());
1844     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1845         context, message->GetSessionId());
1846     errCode = Send(context, message, handler, packetLen);
1847     if (errCode != E_OK) {
1848         delete message;
1849         message = nullptr;
1850     }
1851     return errCode;
1852 }
1853 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1854 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1855     uint32_t controlCmdType, const CommErrHandler &handler)
1856 {
1857     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1858     if (ackMessage == nullptr) {
1859         LOGE("[DataSync][SendControlAck] new message error");
1860         return -E_OUT_OF_MEMORY;
1861     }
1862     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1863     ControlAckPacket ack;
1864     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1865     int errCode = ackMessage->SetCopiedObject(ack);
1866     if (errCode != E_OK) {
1867         delete ackMessage;
1868         ackMessage = nullptr;
1869         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1870         return errCode;
1871     }
1872     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1873         message->GetSequenceId(), message->GetSessionId());
1874     errCode = Send(context, ackMessage, handler, 0);
1875     if (errCode != E_OK) {
1876         delete ackMessage;
1877         ackMessage = nullptr;
1878     }
1879     return errCode;
1880 }
1881 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1882 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1883 {
1884     if (context == nullptr || message == nullptr) {
1885         return -E_INVALID_ARGS;
1886     }
1887     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1888     if (packet == nullptr) {
1889         return -E_INVALID_ARGS;
1890     }
1891     uint32_t controlCmdType = packet->GetcontrolCmdType();
1892     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1893         return DoAbilitySyncIfNeed(context, message, true);
1894     }
1895     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1896         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1897         return -E_WAIT_NEXT_MESSAGE;
1898     }
1899     return E_OK;
1900 }
1901 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1902 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1903     const Message *message)
1904 {
1905     uint32_t controlCmdType = packet->GetcontrolCmdType();
1906     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1907         return E_OK;
1908     }
1909     QuerySyncObject syncQuery = packet->GetQuery();
1910     int errCode;
1911     if (!packet->IsAutoSubscribe()) {
1912         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1913         if (errCode != E_OK) {
1914             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1915             SendControlAck(context, message, errCode, controlCmdType);
1916             return -E_WAIT_NEXT_MESSAGE;
1917         }
1918     }
1919     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1920         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1921     if (mode >= SyncModeType::INVALID_MODE) {
1922         LOGE("[SingleVerDataSync] invalid mode");
1923         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1924         return -E_WAIT_NEXT_MESSAGE;
1925     }
1926     errCode = CheckPermitSendData(mode, context);
1927     if (errCode != E_OK) {
1928         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1929         SendControlAck(context, message, errCode, controlCmdType);
1930     }
1931     return errCode;
1932 }
1933 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1934 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1935 {
1936     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1937     if (packet == nullptr) {
1938         return -E_INVALID_ARGS;
1939     }
1940     int errCode = SubscribeRequestRecvPre(context, packet, message);
1941     if (errCode != E_OK) {
1942         return errCode;
1943     }
1944     uint32_t controlCmdType = packet->GetcontrolCmdType();
1945     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1946     if (subscribeManager == nullptr) {
1947         LOGE("[SingleVerDataSync] subscribeManager check failed");
1948         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1949         return -E_INVALID_ARGS;
1950     }
1951     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
1952     if (errCode != E_OK) {
1953         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1954             STR_MASK(GetDeviceId()));
1955         SendControlAck(context, message, errCode, controlCmdType);
1956         return errCode;
1957     }
1958     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1959     if (errCode != E_OK) {
1960         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1961             STR_MASK(GetDeviceId()));
1962         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1963         SendControlAck(context, message, errCode, controlCmdType);
1964         return errCode;
1965     }
1966     errCode = SendControlAck(context, message, E_OK, controlCmdType);
1967     if (errCode != E_OK) {
1968         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1969         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1970         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1971             STR_MASK(GetDeviceId()));
1972         return errCode;
1973     }
1974     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1975     return errCode;
1976 }
1977 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1978 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1979 {
1980     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1981     if (packet == nullptr) {
1982         return -E_INVALID_ARGS;
1983     }
1984     uint32_t controlCmdType = packet->GetcontrolCmdType();
1985     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1986     if (subscribeManager == nullptr) {
1987         LOGE("[SingleVerDataSync] subscribeManager check failed");
1988         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1989         return -E_INVALID_ARGS;
1990     }
1991     int errCode;
1992     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
1993     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
1994         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
1995         if (errCode != E_OK) {
1996             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1997                 STR_MASK(GetDeviceId()));
1998             SendControlAck(context, message, errCode, controlCmdType);
1999             return errCode;
2000         }
2001     }
2002     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2003     if (errCode != E_OK) {
2004         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2005             STR_MASK(GetDeviceId()));
2006         return errCode;
2007     }
2008     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2009     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2010     return errCode;
2011 }
2012 
PutDataMsg(Message * message)2013 void SingleVerDataSync::PutDataMsg(Message *message)
2014 {
2015     return msgSchedule_.PutMsg(message);
2016 }
2017 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2018 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2019     bool &isNeedContinue)
2020 {
2021     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2022 }
2023 
IsNeedReloadQueue()2024 bool SingleVerDataSync::IsNeedReloadQueue()
2025 {
2026     return msgSchedule_.IsNeedReloadQueue();
2027 }
2028 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2029 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2030 {
2031     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2032 }
2033 
ClearDataMsg()2034 void SingleVerDataSync::ClearDataMsg()
2035 {
2036     msgSchedule_.ClearMsg();
2037 }
2038 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2039 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2040     const std::shared_ptr<SubscribeManager> &subscribeManager)
2041 {
2042     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2043         storage_->RemoveSubscribe(queryId);
2044     }
2045 }
2046 } // namespace DistributedDB
2047