• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43     storage_ = nullptr;
44     communicateHandle_ = nullptr;
45     metadata_ = nullptr;
46 }
47 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49     const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52         return -E_INVALID_ARGS;
53     }
54     storage_ = static_cast<SyncGenericInterface *>(inStorage);
55     communicateHandle_ = inCommunicateHandle;
56     metadata_ = inMetadata;
57     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58     std::vector<uint8_t> label = inStorage->GetIdentifier();
59     label.resize(3); // only show 3 Bytes enough
60     label_ = DBCommon::VectorToHexString(label);
61     deviceId_ = deviceId;
62     msgSchedule_.Initialize(label_, deviceId_);
63     return E_OK;
64 }
65 
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68     std::lock_guard<std::mutex> lock(lock_);
69     int errCode = CheckPermitSendData(mode, context);
70     if (errCode != E_OK) {
71         return errCode;
72     }
73     if (sessionId_ != 0) { // auto sync timeout resend
74         return ReSendData(context);
75     }
76     ResetSyncStatus(mode, context);
77     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78     int tmpMode = SyncOperation::TransferSyncMode(mode);
79     if (tmpMode == SyncModeType::PUSH) {
80         errCode = PushStart(context);
81     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82         errCode = PushPullStart(context);
83     } else if (tmpMode == SyncModeType::PULL) {
84         errCode = PullRequestStart(context);
85     } else {
86         errCode = PullResponseStart(context);
87     }
88     if (context->IsSkipTimeoutError(errCode)) {
89         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
90         // just return to avoid higher pressure for send.
91         return E_OK;
92     }
93     if (errCode != E_OK) {
94         LOGE("[DataSync] SendStart errCode=%d", errCode);
95         return errCode;
96     }
97     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
98         LOGE("wait for recv finished for push and pull mode");
99         return -E_EKEYREVOKED;
100     }
101     return InnerSyncStart(context);
102 }
103 
InnerSyncStart(SingleVerSyncTaskContext * context)104 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
105 {
106     int errCode = CheckPermitSendData(mode_, context);
107     if (errCode != E_OK) {
108         return errCode;
109     }
110     while (true) {
111         if (windowSize_ <= 0 || isAllDataHasSent_) {
112             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
113                 label_.c_str(), STR_MASK(deviceId_));
114             return E_OK;
115         }
116         int mode = SyncOperation::TransferSyncMode(mode_);
117         if (mode == SyncModeType::PULL) {
118             LOGE("[DataSync] unexpected error");
119             return -E_INVALID_ARGS;
120         }
121         context->IncSequenceId();
122         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
123             errCode = PushStart(context);
124         } else {
125             errCode = PullResponseStart(context);
126         }
127         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
128             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
129             isAllDataHasSent_ = true;
130             return -E_EKEYREVOKED;
131         }
132         if (context->IsSkipTimeoutError(errCode)) {
133             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
134             // just return to avoid higher pressure for send.
135             return E_OK;
136         }
137         if (errCode != E_OK) {
138             LOGE("[DataSync] InnerSend errCode=%d", errCode);
139             return errCode;
140         }
141     }
142     return E_OK;
143 }
144 
InnerClearSyncStatus()145 void SingleVerDataSync::InnerClearSyncStatus()
146 {
147     sessionId_ = 0;
148     reSendMap_.clear();
149     windowSize_ = 0;
150     maxSequenceIdHasSent_ = 0;
151     isAllDataHasSent_ = false;
152 }
153 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)154 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
155 {
156     if (message == nullptr) {
157         LOGE("[DataSync] AckRecv message nullptr");
158         return -E_INVALID_ARGS;
159     }
160     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
161     if (packet == nullptr) {
162         return -E_INVALID_ARGS;
163     }
164     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
165     uint32_t sessionId = message->GetSessionId();
166     uint32_t sequenceId = message->GetSequenceId();
167 
168     std::lock_guard<std::mutex> lock(lock_);
169     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
170         windowSize_, label_.c_str(), STR_MASK(deviceId_));
171     if (sessionId != sessionId_) {
172         LOGI("[DataSync] ignore ack,sessionId is different");
173         return E_OK;
174     }
175     Timestamp lastQueryTime = 0;
176     if (reSendMap_.count(sequenceId) != 0) {
177         lastQueryTime = reSendMap_[sequenceId].end;
178         reSendMap_.erase(sequenceId);
179         windowSize_++;
180     } else {
181         LOGI("[DataSync] ack seqId not in map");
182         return E_OK;
183     }
184     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
185         Timestamp dbLastQueryTime = 0;
186         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
187         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
188             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
189         }
190         if (errCode != E_OK) {
191             return errCode;
192         }
193     }
194     if (!isAllDataHasSent_) {
195         return InnerSyncStart(context);
196     } else if (reSendMap_.empty()) {
197         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198         InnerClearSyncStatus();
199         return -E_FINISHED;
200     }
201     return E_OK;
202 }
203 
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206     std::lock_guard<std::mutex> lock(lock_);
207     InnerClearSyncStatus();
208 }
209 
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212     if (reSendMap_.empty()) {
213         LOGI("[DataSync] ReSend map empty");
214         return -E_INTERNAL_ERROR;
215     }
216     uint32_t sequenceId = reSendMap_.begin()->first;
217     ReSendInfo reSendInfo = reSendMap_.begin()->second;
218     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221         windowSize_, label_.c_str(), STR_MASK(deviceId_));
222     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224     return ReSend(context, dataReSendInfo);
225 }
226 
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229     std::string deviceInfo;
230     if (communicateHandle_ != nullptr) {
231         communicateHandle_->GetLocalIdentity(deviceInfo);
232     }
233     return deviceInfo;
234 }
235 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237     uint32_t packetLen)
238 {
239     bool startFeedDogRet = false;
240     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244     }
245     SendConfig sendConfig;
246     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
248     if (errCode != E_OK) {
249         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
250         if (startFeedDogRet) {
251             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
252         }
253     }
254     return errCode;
255 }
256 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)257 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
258     std::vector<SendDataItem> &outData, size_t packetSize)
259 {
260     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
261     if (performance != nullptr) {
262         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
263     }
264     context->StartFeedDogForGetData(context->GetResponseSessionId());
265     int errCode = GetData(context, packetSize, outData);
266     context->StopFeedDogForGetData();
267     if (performance != nullptr) {
268         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
269     }
270     return errCode;
271 }
272 
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)273 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
274 {
275     int errCode;
276     UpdateMtuSize();
277     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
278         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
279         LOGI("[DataSync][GetData] resend data");
280         errCode = GetUnsyncData(context, outData, packetSize);
281     } else {
282         ContinueToken token;
283         context->GetContinueToken(token);
284         if (token == nullptr) {
285             errCode = GetUnsyncData(context, outData, packetSize);
286         } else {
287             LOGD("[DataSync][GetData] get data from token");
288             // if there is data to send, read out data, and update local watermark, send data
289             errCode = GetNextUnsyncData(context, outData, packetSize);
290         }
291     }
292     if (errCode == -E_UNFINISHED) {
293         LOGD("[DataSync][GetData] not finished.");
294     }
295     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
296         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
297         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
298     }
299     return errCode;
300 }
301 
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)302 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
303 {
304     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
305     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
306         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
307     bool needCompressOnSync = false;
308     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
309     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
310     int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
311     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
312         context->SetTaskErrCode(errCode);
313         return errCode;
314     }
315 
316     int innerCode = InterceptData(syncOutData);
317     if (innerCode != E_OK) {
318         context->SetTaskErrCode(innerCode);
319         return innerCode;
320     }
321 
322     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
323     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
324         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
325             { remoteAlgo, version });
326         if (compressCode != E_OK) {
327             return compressCode;
328         }
329     }
330     return errCode;
331 }
332 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)333 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
334     size_t packetSize)
335 {
336     SyncTimeRange waterRange;
337     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
338     WaterMark startMark = 0;
339     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
340     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
341     if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
342         return E_OK;
343     }
344     waterRange.beginTime = startMark;
345     if (curType == SyncType::QUERY_SYNC_TYPE) {
346         WaterMark deletedStartMark = 0;
347         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
348         Timestamp lastQueryTimestamp = 0;
349         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
350             lastQueryTimestamp);
351         if (errCode != E_OK) {
352             return errCode;
353         }
354         waterRange.deleteBeginTime = deletedStartMark;
355         waterRange.lastQueryTime = lastQueryTimestamp;
356     }
357     return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
358 }
359 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)360 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
361     DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
362 {
363     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
364     ContinueToken token = nullptr;
365     context->GetContinueToken(token);
366     if (token != nullptr) {
367         storage_->ReleaseContinueToken(token);
368     }
369     int errCode;
370     if (curType != SyncType::QUERY_SYNC_TYPE) {
371         errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
372             syncDataSizeInfo);
373     } else {
374         QuerySyncObject queryObj = context->GetQuery();
375         errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
376     }
377     context->SetContinueToken(token);
378     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
379         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
380     }
381     return errCode;
382 }
383 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)384 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
385     size_t packetSize)
386 {
387     ContinueToken token;
388     context->GetContinueToken(token);
389     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
390     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
391     context->SetContinueToken(token);
392     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
393         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
394     }
395     return errCode;
396 }
397 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)398 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
399     SyncType curType, const QuerySyncObject &query)
400 {
401     if (inData.empty()) {
402         return E_OK;
403     }
404     StoreInfo info = {
405         storage_->GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
406         storage_->GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
407         storage_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
408     };
409     std::string clientId;
410     int errCode = E_OK;
411     if (RuntimeContext::GetInstance()->TranslateDeviceId(context->GetDeviceId(), info, clientId) == E_OK) {
412         errCode = metadata_->SaveClientId(context->GetDeviceId(), clientId);
413         if (errCode != E_OK) {
414             LOGW("[DataSync] record clientId failed %d", errCode);
415         }
416     }
417     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
418     if (performance != nullptr) {
419         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
420     }
421 
422     const std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
423     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
424     // query only support prefix key and don't have query in packet in 104 version
425     errCode = storage_->PutSyncDataWithQuery(query, inData, context->GetDeviceId());
426     if (performance != nullptr) {
427         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428     }
429     if (errCode != E_OK) {
430         LOGE("[DataSync][SaveData] save sync data failed,errCode=%d", errCode);
431     }
432     return errCode;
433 }
434 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437     mode_ = inMode;
438     maxSequenceIdHasSent_ = 0;
439     isAllDataHasSent_ = false;
440     context->ReSetSequenceId();
441     reSendMap_.clear();
442     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443         windowSize_ = LOW_VERSION_WINDOW_SIZE;
444     } else {
445         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446     }
447     int mode = SyncOperation::TransferSyncMode(inMode);
448     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449         sessionId_ = context->GetRequestSessionId();
450     } else {
451         sessionId_ = context->GetResponseSessionId();
452     }
453 }
454 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458     WaterMark localMark = 0;
459     WaterMark deleteMark = 0;
460     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461     GetLocalDeleteSyncWaterMark(context, deleteMark);
462     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468     WaterMark localMark = 0;
469     int errCode = E_OK;
470     const std::string &deviceId = context->GetDeviceId();
471     std::string queryId = context->GetQuerySyncId();
472     if (syncType != SyncType::QUERY_SYNC_TYPE) {
473         if (isCheckBeforUpdate) {
474             GetLocalWaterMark(syncType, queryId, context, localMark);
475             if (localMark >= dataTimeRange.endTime) {
476                 return E_OK;
477             }
478         }
479         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
480     } else {
481         bool isNeedUpdateMark = true;
482         bool isNeedUpdateDeleteMark = true;
483         if (isCheckBeforUpdate) {
484             WaterMark deleteDataWaterMark = 0;
485             GetLocalWaterMark(syncType, queryId, context, localMark);
486             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487             if (localMark >= dataTimeRange.endTime) {
488                 isNeedUpdateMark = false;
489             }
490             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491                 isNeedUpdateDeleteMark = false;
492             }
493         }
494         if (isNeedUpdateMark) {
495             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
497             if (errCode != E_OK) {
498                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
499                 return errCode;
500             }
501         }
502         if (isNeedUpdateDeleteMark) {
503             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
504                 dataTimeRange.deleteEndTime);
505             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
506         }
507     }
508     if (errCode != E_OK) {
509         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
510     }
511     return errCode;
512 }
513 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const514 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
515     const DeviceID &deviceId, WaterMark &waterMark) const
516 {
517     if (syncType != SyncType::QUERY_SYNC_TYPE) {
518         metadata_->GetPeerWaterMark(deviceId, waterMark);
519         return;
520     }
521     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
522 }
523 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)524 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
525 {
526     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
527 }
528 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const529 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
530     WaterMark &waterMark) const
531 {
532     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
533 }
534 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const535 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
536     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
537 {
538     if (syncType != SyncType::QUERY_SYNC_TYPE) {
539         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
540         return;
541     }
542     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
543         waterMark, context->IsAutoLiftWaterMark());
544 }
545 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)546 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
547     WaterMark maxSendDataTime)
548 {
549     bool isNeedClearRemoteData = false;
550     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
551     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
552         uint64_t clearDeviceDataMark = 0;
553         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
554         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
555     } else {
556         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
557         if (packet == nullptr) {
558             LOGE("[RemoveDeviceDataHandle] get packet object failed");
559             return -E_INVALID_ARGS;
560         }
561         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
562         WaterMark packetLocalMark = packet->GetLocalWaterMark();
563         WaterMark peerMark = 0;
564         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
565         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
566     }
567     if (!isNeedClearRemoteData) {
568         return E_OK;
569     }
570     int errCode = E_OK;
571     if (context->IsNeedClearRemoteStaleData()) {
572         // need to clear remote device history data
573         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
574         if (errCode != E_OK) {
575             (void)SendDataAck(context, message, errCode, maxSendDataTime);
576             return errCode;
577         }
578         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
579             // avoid repeat clear in ack
580             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
581         }
582     }
583     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
584         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
585         if (errCode != E_OK) {
586             (void)SendDataAck(context, message, errCode, maxSendDataTime);
587             return errCode;
588         }
589     }
590     return E_OK;
591 }
592 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)593 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
594     const std::vector<uint64_t> &reserved)
595 {
596     bool isNeedClearRemoteData = false;
597     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
598     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
599     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
600         uint64_t clearDeviceDataMark = 0;
601         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
602         isNeedClearRemoteData = (clearDeviceDataMark != 0);
603     } else if (reserved.empty()) {
604         WaterMark localMark = 0;
605         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
606         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
607     } else {
608         WaterMark peerMark = 0;
609         GetPeerWaterMark(curType, context->GetQuerySyncId(),
610             context->GetDeviceId(), peerMark);
611         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
612     }
613     if (!isNeedClearRemoteData) {
614         return E_OK;
615     }
616     // need to clear remote historydata
617     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
618         label_.c_str(), STR_MASK(GetDeviceId()));
619     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
620     if (errCode != E_OK) {
621         return errCode;
622     }
623     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
624         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
625     }
626     return errCode;
627 }
628 
SetSessionEndTimestamp(Timestamp end)629 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
630 {
631     sessionEndTimestamp_ = end;
632 }
633 
GetSessionEndTimestamp() const634 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
635 {
636     return sessionEndTimestamp_;
637 }
638 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)639 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
640 {
641     ReSendInfo reSendInfo;
642     reSendInfo.start = dataTimeRange.beginTime;
643     reSendInfo.end = dataTimeRange.endTime;
644     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
645     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
646     reSendInfo.packetId = context->GetPacketId();
647     maxSequenceIdHasSent_++;
648     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
649     windowSize_--;
650     ContinueToken token;
651     context->GetContinueToken(token);
652     if (token == nullptr) {
653         isAllDataHasSent_ = true;
654     }
655     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
656         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
657         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
658         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
659 }
660 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)661 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
662     SyncEntry &syncData, int sendCode, int mode)
663 {
664     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
665     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
666     WaterMark localMark = 0;
667     WaterMark peerMark = 0;
668     WaterMark deleteMark = 0;
669     bool needCompressOnSync = false;
670     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
671     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
672     std::string id = context->GetQuerySyncId();
673     GetLocalWaterMark(curType, id, context, localMark);
674     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
675     GetLocalDeleteSyncWaterMark(context, deleteMark);
676     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
677         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
678         packet->SetLastSequence();
679     }
680     int tmpMode = mode;
681     if (mode == SyncModeType::RESPONSE_PULL) {
682         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
683     }
684     packet->SetData(syncData.entries);
685     packet->SetCompressData(syncData.compressedEntries);
686     packet->SetBasicInfo(sendCode, version, tmpMode);
687     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
688     packet->SetWaterMark(localMark, peerMark, deleteMark);
689     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
690         packet->SetEndWaterMark(context->GetEndMark());
691         packet->SetSessionId(context->GetRequestSessionId());
692     }
693     packet->SetQuery(context->GetQuery());
694     packet->SetQueryId(context->GetQuerySyncId());
695     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
696     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
697         packet->SetCompressDataMark();
698         packet->SetCompressAlgo(curAlgo);
699     }
700     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
701     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
702         context->GetQuery().HasOrderBy())) {
703         packet->SetUpdateWaterMark();
704     }
705     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
706         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
707         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
708 }
709 
RequestStart(SingleVerSyncTaskContext * context,int mode)710 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
711 {
712     int errCode = QuerySyncCheck(context);
713     if (errCode != E_OK) {
714         return errCode;
715     }
716     errCode = RemoveDeviceDataIfNeed(context);
717     if (errCode != E_OK) {
718         context->SetTaskErrCode(errCode);
719         return errCode;
720     }
721     SyncEntry syncData;
722     // get data
723     errCode = GetMatchData(context, syncData);
724     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
725 
726     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
727         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
728         return errCode;
729     }
730 
731     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
732     if (packet == nullptr) {
733         LOGE("[DataSync][PushStart] new DataRequestPacket error");
734         return -E_OUT_OF_MEMORY;
735     }
736     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
737     UpdateWaterMark isUpdateWaterMark;
738     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
739     if (errCode == E_OK) {
740         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
741     }
742     FillDataRequestPacket(packet, context, syncData, errCode, mode);
743     errCode = SendDataPacket(curType, packet, context);
744     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
745     if (performance != nullptr) {
746         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
747     }
748     if (errCode == E_OK || errCode == -E_TIMEOUT) {
749         UpdateSendInfo(dataTime, context);
750     }
751     if (errCode == E_OK) {
752         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
753             context->GetQuery().HasOrderBy())) {
754             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
755             return E_OK;
756         }
757         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
758         SaveLocalWaterMark(curType, context, tmpDataTime);
759     }
760     return errCode;
761 }
762 
PushStart(SingleVerSyncTaskContext * context)763 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
764 {
765     if (context == nullptr) {
766         return -E_INVALID_ARGS;
767     }
768     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
769     return RequestStart(context,
770         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
771 }
772 
PushPullStart(SingleVerSyncTaskContext * context)773 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
774 {
775     if (context == nullptr) {
776         return -E_INVALID_ARGS;
777     }
778     return RequestStart(context, context->GetMode());
779 }
780 
PullRequestStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
782 {
783     if (context == nullptr) {
784         return -E_INVALID_ARGS;
785     }
786     int errCode = QuerySyncCheck(context);
787     if (errCode != E_OK) {
788         return errCode;
789     }
790     errCode = RemoveDeviceDataIfNeed(context);
791     if (errCode != E_OK) {
792         context->SetTaskErrCode(errCode);
793         return errCode;
794     }
795     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
796     if (packet == nullptr) {
797         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
798         return -E_OUT_OF_MEMORY;
799     }
800     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
801     WaterMark peerMark = 0;
802     WaterMark localMark = 0;
803     WaterMark deleteMark = 0;
804     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
805         context->GetDeviceId(), peerMark);
806     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
807     GetLocalDeleteSyncWaterMark(context, deleteMark);
808     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
809     WaterMark endMark = context->GetEndMark();
810     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
811 
812     packet->SetBasicInfo(E_OK, version, context->GetMode());
813     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
814     packet->SetWaterMark(localMark, peerMark, deleteMark);
815     packet->SetEndWaterMark(endMark);
816     packet->SetSessionId(context->GetRequestSessionId());
817     packet->SetQuery(context->GetQuery());
818     packet->SetQueryId(context->GetQuerySyncId());
819     packet->SetLastSequence();
820     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
821     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
822 
823     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
824         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark, label_.c_str(),
825         STR_MASK(GetDeviceId()));
826     UpdateSendInfo(dataTime, context);
827     return SendDataPacket(syncType, packet, context);
828 }
829 
PullResponseStart(SingleVerSyncTaskContext * context)830 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
831 {
832     if (context == nullptr) {
833         return -E_INVALID_ARGS;
834     }
835     SyncEntry syncData;
836     // get data
837     int errCode = GetMatchData(context, syncData);
838     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
839         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
840             (void)SendPullResponseDataPkt(errCode, syncData, context);
841         }
842         return errCode;
843     }
844     // if send finished
845     int ackCode = E_OK;
846     ContinueToken token = nullptr;
847     context->GetContinueToken(token);
848     if (errCode == E_OK && token == nullptr) {
849         LOGD("[DataSync][PullResponse] send last frame end");
850         ackCode = SEND_FINISHED;
851     }
852     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
853     UpdateWaterMark isUpdateWaterMark;
854     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
855     if (errCode == E_OK) {
856         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
857     }
858     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
859     if (errCode == E_OK || errCode == -E_TIMEOUT) {
860         UpdateSendInfo(dataTime, context);
861     }
862     if (errCode == E_OK) {
863         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
864             context->GetQuery().HasOrderBy())) {
865             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
866             return E_OK;
867         }
868         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
869         SaveLocalWaterMark(curType, context, tmpDataTime);
870     }
871     return errCode;
872 }
873 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)874 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
875     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
876 {
877     WaterMark tmpPeerWatermark = dataTime.endTime;
878     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
879     if (isUpdateWaterMark.normalUpdateMark) {
880         tmpPeerWatermark++;
881     }
882     if (isUpdateWaterMark.deleteUpdateMark) {
883         tmpPeerDeletedWatermark++;
884     }
885     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
886 }
887 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)888 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
889     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
890 {
891     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
892         return;
893     }
894     int errCode = E_OK;
895     if (syncType != SyncType::QUERY_SYNC_TYPE) {
896         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
897     } else {
898         if (peerWatermark != 0) {
899             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
900             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
901             if (errCode != E_OK) {
902                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
903             }
904         }
905         if (peerDeletedWatermark != 0) {
906             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
907                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
908             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
909         }
910     }
911     if (errCode != E_OK) {
912         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
913     }
914 }
915 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)916 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
917 {
918     uint16_t remoteCommunicatorVersion = 0;
919     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
920         -E_NOT_FOUND) {
921         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
922         return -E_VERSION_NOT_SUPPORT;
923     }
924     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
925     if (remoteCommunicatorVersion == 0) {
926         LOGI("[DataSync] set remote version 0");
927         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
928         return E_OK;
929     } else {
930         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
931         if (isControlMsg) {
932             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
933         } else {
934             (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
935         }
936         return -E_NEED_ABILITY_SYNC;
937     }
938 }
939 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)940 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
941 {
942     if (context == nullptr || message == nullptr) {
943         return -E_INVALID_ARGS;
944     }
945     auto *packet = message->GetObject<DataRequestPacket>();
946     if (packet == nullptr) {
947         return -E_INVALID_ARGS;
948     }
949     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
950         return DoAbilitySyncIfNeed(context, message);
951     }
952     int32_t sendCode = packet->GetSendCode();
953     if (sendCode == -E_VERSION_NOT_SUPPORT) {
954         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
955         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
956         return -E_WAIT_NEXT_MESSAGE;
957     }
958     // only deal with pull response packet errCode
959     if (sendCode != E_OK && sendCode != SEND_FINISHED &&
960         message->GetSessionId() == context->GetRequestSessionId()) {
961         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
962         return sendCode;
963     }
964     int errCode = RunPermissionCheck(context, message, packet);
965     if (errCode != E_OK) {
966         return errCode;
967     }
968     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
969         errCode = CheckSchemaStrategy(context, message);
970     }
971     if (errCode == E_OK) {
972         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
973     }
974     if (errCode != E_OK) {
975         (void)SendDataAck(context, message, errCode, 0);
976     }
977     return errCode;
978 }
979 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)980 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
981     WaterMark &pullEndWatermark)
982 {
983     int errCode = DataRequestRecvPre(context, message);
984     if (errCode != E_OK) {
985         return errCode;
986     }
987     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
988     const std::vector<SendDataItem> &data = packet->GetData();
989     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
990     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
991         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
992         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
993     context->SetReceiveWaterMarkErr(false);
994     UpdateWaterMark isUpdateWaterMark;
995     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
996     errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
997     if (errCode != E_OK) {
998         return errCode;
999     }
1000     Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1001     if (WaterMarkErrHandle(curType, context, message)) {
1002         return E_OK;
1003     }
1004     GetPullEndWatermark(context, packet, pullEndWatermark);
1005     // save data first
1006     errCode = SaveData(context, data, curType, packet->GetQuery());
1007     if (errCode != E_OK) {
1008         (void)SendDataAck(context, message, errCode, dataTime.endTime);
1009         return errCode;
1010     }
1011     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1012         pullEndWatermark = 0;
1013         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1014     } else {
1015         // if data is empty, we don't know the max timestap of this packet.
1016         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1017     }
1018     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1019         context->GetRequestSessionId());
1020     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1021         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1022     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1023         UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1024     }
1025     if (errCode != E_OK) {
1026         return errCode;
1027     }
1028     if (packet->GetSendCode() == SEND_FINISHED) {
1029         return -E_RECV_FINISHED;
1030     }
1031     return errCode;
1032 }
1033 
SendDataPacket(SyncType syncType,const DataRequestPacket * packet,SingleVerSyncTaskContext * context)1034 int SingleVerDataSync::SendDataPacket(SyncType syncType, const DataRequestPacket *packet,
1035     SingleVerSyncTaskContext *context)
1036 {
1037     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1038     if (message == nullptr) {
1039         LOGE("[DataSync][SendDataPacket] new message error");
1040         delete packet;
1041         packet = nullptr;
1042         return -E_OUT_OF_MEMORY;
1043     }
1044     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1045     int errCode = message->SetExternalObject(packet);
1046     if (errCode != E_OK) {
1047         delete packet;
1048         packet = nullptr;
1049         delete message;
1050         message = nullptr;
1051         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1052         return errCode;
1053     }
1054     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1055         context->GetSequenceId(), context->GetRequestSessionId());
1056     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1057     if (performance != nullptr) {
1058         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1059     }
1060     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1061         context, message->GetSessionId());
1062     errCode = Send(context, message, handler, packetLen);
1063     if (errCode != E_OK) {
1064         delete message;
1065         message = nullptr;
1066     }
1067 
1068     return errCode;
1069 }
1070 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1071 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1072     SingleVerSyncTaskContext *context)
1073 {
1074     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1075     if (packet == nullptr) {
1076         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1077         return -E_OUT_OF_MEMORY;
1078     }
1079     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1080     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1081     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1082     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1083     if (message == nullptr) {
1084         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1085         delete packet;
1086         packet = nullptr;
1087         return -E_OUT_OF_MEMORY;
1088     }
1089     int errCode = message->SetExternalObject(packet);
1090     if (errCode != E_OK) {
1091         delete packet;
1092         packet = nullptr;
1093         delete message;
1094         message = nullptr;
1095         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1096         return errCode;
1097     }
1098     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1099         context->GetSequenceId(), context->GetResponseSessionId());
1100     SendResetWatchDogPacket(context, packetLen);
1101     errCode = Send(context, message, nullptr, packetLen);
1102     if (errCode != E_OK) {
1103         delete message;
1104         message = nullptr;
1105     }
1106     return errCode;
1107 }
1108 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1109 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1110 {
1111     (void)SendDataAck(context, message, E_OK, 0);
1112 }
1113 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1114 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1115     WaterMark maxSendDataTime)
1116 {
1117     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1118     if (packet == nullptr) {
1119         return -E_INVALID_ARGS;
1120     }
1121     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1122     if (ackMessage == nullptr) {
1123         LOGE("[DataSync][SendDataAck] new message error");
1124         return -E_OUT_OF_MEMORY;
1125     }
1126     DataAckPacket ack;
1127     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1128     int errCode = ackMessage->SetCopiedObject(ack);
1129     if (errCode != E_OK) {
1130         delete ackMessage;
1131         ackMessage = nullptr;
1132         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1133         return errCode;
1134     }
1135     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1136         message->GetSequenceId(), message->GetSessionId());
1137 
1138     errCode = Send(context, ackMessage, nullptr, 0);
1139     if (errCode != E_OK) {
1140         delete ackMessage;
1141         ackMessage = nullptr;
1142     }
1143     return errCode;
1144 }
1145 
AckPacketIdCheck(const Message * message)1146 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1147 {
1148     if (message == nullptr) {
1149         LOGE("[DataSync] AckRecv message nullptr");
1150         return false;
1151     }
1152     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1153         return true;
1154     }
1155     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1156     if (packet == nullptr) {
1157         return false;
1158     }
1159     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1160     std::lock_guard<std::mutex> lock(lock_);
1161     uint32_t sequenceId = message->GetSequenceId();
1162     if (reSendMap_.count(sequenceId) != 0) {
1163         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1164         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1165             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1166                 originalPacketId);
1167             return false;
1168         }
1169     }
1170     return true;
1171 }
1172 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1173 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1174 {
1175     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1176     if (errCode != E_OK) {
1177         return errCode;
1178     }
1179     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1180     if (packet == nullptr) {
1181         return -E_INVALID_ARGS;
1182     }
1183     int32_t recvCode = packet->GetRecvCode();
1184     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1185         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1186     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1187         LOGE("[DataSync][AckRecv] Version mismatch");
1188         return -E_VERSION_NOT_SUPPORT;
1189     }
1190 
1191     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT) {
1192         // we should ReleaseContinueToken, avoid crash
1193         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1194             STR_MASK(GetDeviceId()));
1195         context->ReleaseContinueToken();
1196         return recvCode;
1197     }
1198     uint64_t data = packet->GetData();
1199     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1200         return DealWaterMarkException(context, data, packet->GetReserved());
1201     }
1202 
1203     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1204         // data only use low 32bit
1205         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1206         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1207             STR_MASK(GetDeviceId()));
1208     }
1209 
1210     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1211         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1212             label_.c_str(), STR_MASK(GetDeviceId()));
1213         return recvCode;
1214     }
1215 
1216     // Judge if send finished
1217     ContinueToken token;
1218     context->GetContinueToken(token);
1219     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1220         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1221         return -E_NO_DATA_SEND;
1222     }
1223 
1224     // send next data
1225     return -E_SEND_DATA;
1226 }
1227 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1228 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1229     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1230 {
1231     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1232         LOGE("[SingleVerDataSync] messageId not available.");
1233         return;
1234     }
1235     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1236     if (ackMessage == nullptr) {
1237         LOGE("[DataSync][SaveDataNotify] new message failed");
1238         return;
1239     }
1240 
1241     DataAckPacket ack;
1242     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1243     ack.SetVersion(pktVersion);
1244     int errCode = ackMessage->SetCopiedObject(ack);
1245     if (errCode != E_OK) {
1246         delete ackMessage;
1247         ackMessage = nullptr;
1248         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1249         return;
1250     }
1251     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1252 
1253     errCode = Send(context, ackMessage, nullptr, 0);
1254     if (errCode != E_OK) {
1255         delete ackMessage;
1256         ackMessage = nullptr;
1257     }
1258     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1259         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1260 }
1261 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1262 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1263     WaterMark &pullEndWatermark) const
1264 {
1265     if (packet == nullptr) {
1266         return;
1267     }
1268     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1269     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1270         WaterMark endMark = packet->GetEndWaterMark();
1271         TimeOffset offset;
1272         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1273         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1274         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1275             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1276     }
1277 }
1278 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1279 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1280     const std::vector<uint64_t> &reserved)
1281 {
1282     WaterMark deletedWaterMark = 0;
1283     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1284     if (curType == SyncType::QUERY_SYNC_TYPE) {
1285         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1286             LOGE("[DataSync] get packet reserve size failed");
1287             return -E_INVALID_ARGS;
1288         }
1289         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1290     }
1291     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1292         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1293     int errCode = SaveLocalWaterMark(curType, context,
1294         {0, 0, ackWaterMark, deletedWaterMark});
1295     if (errCode != E_OK) {
1296         return errCode;
1297     }
1298     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1299     context->IncNegotiationCount();
1300     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1301     if (!context->IsNeedClearRemoteStaleData()) {
1302         return -E_RE_SEND_DATA;
1303     }
1304     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1305     if (errCode != E_OK) {
1306         return errCode;
1307     }
1308     return -E_RE_SEND_DATA;
1309 }
1310 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1311 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1312     const DataRequestPacket *packet)
1313 {
1314     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1315     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1316     if (errCode != E_OK) {
1317         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1318             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1319         }
1320         return -E_NOT_PERMIT;
1321     }
1322     const std::vector<SendDataItem> &data = packet->GetData();
1323     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1324     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1325     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1326         !context->GetReceivcPermitCheck()) {
1327         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1328         if (permitReceive) {
1329             context->SetReceivcPermitCheck(true);
1330         } else {
1331             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1332             return -E_SECURITY_OPTION_CHECK_ERROR;
1333         }
1334     }
1335     return errCode;
1336 }
1337 
1338 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1339 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1340 {
1341     // When mtu less than 30k, we send data with bluetooth
1342     // In order not to block the bluetooth channel, we don't send notify packet here
1343     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1344         return;
1345     }
1346     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1347 
1348     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1349     if (ackMessage == nullptr) {
1350         LOGE("[DataSync][ResetWatchDog] new message failed");
1351         return;
1352     }
1353 
1354     DataAckPacket ack;
1355     ack.SetData(data);
1356     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1357     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1358     int errCode = ackMessage->SetCopiedObject(ack);
1359     if (errCode != E_OK) {
1360         delete ackMessage;
1361         ackMessage = nullptr;
1362         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1363         return;
1364     }
1365     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1366         context->GetSequenceId(), context->GetResponseSessionId());
1367 
1368     errCode = Send(context, ackMessage, nullptr, 0);
1369     if (errCode != E_OK) {
1370         delete ackMessage;
1371         ackMessage = nullptr;
1372         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1373             STR_MASK(GetDeviceId()));
1374     } else {
1375         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1376             STR_MASK(GetDeviceId()));
1377     }
1378 }
1379 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1380 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1381 {
1382     if (context == nullptr) {
1383         return -E_INVALID_ARGS;
1384     }
1385     SyncEntry syncData;
1386     int errCode = GetReSendData(syncData, context, reSendInfo);
1387     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1388         return errCode;
1389     }
1390     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1391     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1392     if (packet == nullptr) {
1393         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1394         return -E_OUT_OF_MEMORY;
1395     }
1396     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1397     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1398     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1399         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1400         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1401         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1402             dataTime.deleteEndTime += 1;
1403         }
1404         if (reSendInfo.end > reSendInfo.start) {
1405             dataTime.endTime += 1;
1406         }
1407         errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1408         if (errCode != E_OK) {
1409             LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1410         }
1411     }
1412     return errCode;
1413 }
1414 
SendReSendPacket(const DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1415 int SingleVerDataSync::SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1416     uint32_t sessionId, uint32_t sequenceId)
1417 {
1418     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1419     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1420     if (message == nullptr) {
1421         LOGE("[DataSync][SendDataPacket] new message error");
1422         delete packet;
1423         packet = nullptr;
1424         return -E_OUT_OF_MEMORY;
1425     }
1426     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1427     int errCode = message->SetExternalObject(packet);
1428     if (errCode != E_OK) {
1429         delete packet;
1430         packet = nullptr;
1431         delete message;
1432         message = nullptr;
1433         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1434         return errCode;
1435     }
1436     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1437     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1438         context, message->GetSessionId());
1439     errCode = Send(context, message, handler, packetLen);
1440     if (errCode != E_OK) {
1441         delete message;
1442         message = nullptr;
1443     }
1444     return errCode;
1445 }
1446 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1447 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1448 {
1449     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1450     int mode = SyncOperation::TransferSyncMode(inMode);
1451     // for pull mode it just need to get data, no need to send data.
1452     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1453         return E_OK;
1454     }
1455     if (context->GetSendPermitCheck()) {
1456         return E_OK;
1457     }
1458     bool isPermitSync = true;
1459     std::string deviceId = context->GetDeviceId();
1460     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1461     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1462         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1463     }
1464     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1465         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1466     if (isPermitSync) {
1467         context->SetSendPermitCheck(true);
1468         return E_OK;
1469     }
1470     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1471         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1472         return -E_SECURITY_OPTION_CHECK_ERROR;
1473     }
1474     if (mode == SyncModeType::RESPONSE_PULL) {
1475         SyncEntry syncData;
1476         (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1477         return -E_SECURITY_OPTION_CHECK_ERROR;
1478     }
1479     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1480         return -E_SECURITY_OPTION_CHECK_ERROR;
1481     }
1482     return E_OK;
1483 }
1484 
GetLabel() const1485 std::string SingleVerDataSync::GetLabel() const
1486 {
1487     return label_;
1488 }
1489 
GetDeviceId() const1490 std::string SingleVerDataSync::GetDeviceId() const
1491 {
1492     return deviceId_;
1493 }
1494 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1495 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1496 {
1497     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1498     if (packet == nullptr) {
1499         LOGE("[WaterMarkErrHandle] get packet object failed");
1500         return -E_INVALID_ARGS;
1501     }
1502     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1503     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1504     WaterMark peerMark = 0;
1505     WaterMark deletedMark = 0;
1506     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1507     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1508         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1509     }
1510     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1511         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1512         context->SetReceiveWaterMarkErr(true);
1513         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1514         return true;
1515     }
1516     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1517         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1518             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1519             peerMark);
1520         context->SetReceiveWaterMarkErr(true);
1521         (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1522         return true;
1523     }
1524     return false;
1525 }
1526 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1527 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1528 {
1529     auto *packet = message->GetObject<DataRequestPacket>();
1530     if (packet == nullptr) {
1531         return -E_INVALID_ARGS;
1532     }
1533     auto query = packet->GetQuery();
1534     std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1535     if (!schemaSyncStatus.second) {
1536         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1537         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1538         return -E_NEED_ABILITY_SYNC;
1539     }
1540     if (!schemaSyncStatus.first) {
1541         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1542         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1543         return -E_SCHEMA_MISMATCH;
1544     }
1545     return E_OK;
1546 }
1547 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1548 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1549 {
1550     int mode = SyncOperation::TransferSyncMode(inMode);
1551     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1552         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1553         return;
1554     }
1555 
1556     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1557         storage_->NotifyRemotePushFinished(deviceId_);
1558     }
1559 }
1560 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1561 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1562     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1563 {
1564     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1565     WaterMark localMark = 0;
1566     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1567     ackPacket.SetRecvCode(recvCode);
1568     // send ack packet
1569     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1570         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1571     } else if (recvCode != WATER_MARK_INVALID) {
1572         WaterMark mark = 0;
1573         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1574         ackPacket.SetData(mark);
1575     }
1576     std::vector<uint64_t> reserved {localMark};
1577     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1578     uint64_t packetId = 0;
1579     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1580         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1581     }
1582     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1583         reserved.push_back(packetId);
1584     }
1585     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1586     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1587         WaterMark deletedPeerMark;
1588         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1589         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1590     }
1591     ackPacket.SetReserved(reserved);
1592     ackPacket.SetVersion(version);
1593 }
1594 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1595 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1596     DataSyncReSendInfo reSendInfo)
1597 {
1598     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1599     if (mode == SyncModeType::PULL) {
1600         return E_OK;
1601     }
1602     ContinueToken token = nullptr;
1603     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1604     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1605         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1606     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1607     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1608     int errCode;
1609     if (curType != SyncType::QUERY_SYNC_TYPE) {
1610         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1611             reSendDataSizeInfo);
1612     } else {
1613         QuerySyncObject queryObj = context->GetQuery();
1614         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1615             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1616     }
1617     if (token != nullptr) {
1618         storage_->ReleaseContinueToken(token);
1619     }
1620     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1621         context->SetTaskErrCode(errCode);
1622         return errCode;
1623     }
1624     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1625         return errCode;
1626     }
1627 
1628     int innerCode = InterceptData(syncData);
1629     if (innerCode != E_OK) {
1630         context->SetTaskErrCode(innerCode);
1631         return innerCode;
1632     }
1633 
1634     bool needCompressOnSync = false;
1635     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1636     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1637     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1638     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1639         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1640             { remoteAlgo, version });
1641         if (compressCode != E_OK) {
1642             return compressCode;
1643         }
1644     }
1645     return errCode;
1646 }
1647 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1648 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1649 {
1650     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1651         return E_OK;
1652     }
1653     uint64_t clearRemoteDataMark = 0;
1654     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1655     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1656     if (clearRemoteDataMark == 0) {
1657         return E_OK;
1658     }
1659     int errCode = E_OK;
1660     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1661         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1662         if (errCode != E_OK) {
1663             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1664             return errCode;
1665         }
1666     }
1667     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1668         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1669         if (errCode != E_OK) {
1670             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1671             return errCode;
1672         }
1673     }
1674     return E_OK;
1675 }
1676 
UpdateMtuSize()1677 void SingleVerDataSync::UpdateMtuSize()
1678 {
1679     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1680 }
1681 
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1682 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1683     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1684 {
1685     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1686     WaterMark peerMark = 0;
1687     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1688         peerMark);
1689     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1690     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1691     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1692     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1693     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1694         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1695         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1696         packet->SetLastSequence();
1697     }
1698     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1699         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1700         sendCode = SEND_FINISHED;
1701     }
1702     packet->SetData(syncData.entries);
1703     packet->SetCompressData(syncData.compressedEntries);
1704     packet->SetBasicInfo(sendCode, version, reSendMode);
1705     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1706     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1707     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH) {
1708         packet->SetEndWaterMark(context->GetEndMark());
1709         packet->SetQuery(context->GetQuery());
1710     }
1711     packet->SetQueryId(context->GetQuerySyncId());
1712     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1713         std::vector<uint64_t> reserved {reSendInfo.packetId};
1714         packet->SetReserved(reserved);
1715     }
1716     if (reSendMode == SyncModeType::PULL) {
1717         // resend pull packet dont set compress type
1718         return;
1719     }
1720     bool needCompressOnSync = false;
1721     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1722     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1723     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1724     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1725         packet->SetCompressDataMark();
1726         packet->SetCompressAlgo(curAlgo);
1727     }
1728 }
1729 
GetDataSizeSpecInfo(size_t packetSize)1730 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1731 {
1732     bool needCompressOnSync = false;
1733     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1734     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1735     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1736         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1737     return {blockSize, packetSize};
1738 }
1739 
InterceptData(SyncEntry & syncEntry)1740 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1741 {
1742     if (storage_ == nullptr) {
1743         LOGE("Invalid DB. Can not intercept data.");
1744         return -E_INVALID_DB;
1745     }
1746 
1747     // GetLocalDeviceName get local device ID.
1748     // GetDeviceId get remote device ID.
1749     // If intercept data fail, entries will be released.
1750     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId());
1751 }
1752 
ControlCmdStart(SingleVerSyncTaskContext * context)1753 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1754 {
1755     if (context == nullptr) {
1756         return -E_INVALID_ARGS;
1757     }
1758     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1759     if (subManager == nullptr) {
1760         return -E_INVALID_ARGS;
1761     }
1762     int errCode = ControlCmdStartCheck(context);
1763     if (errCode != E_OK) {
1764         return errCode;
1765     }
1766     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1767     if (packet == nullptr) {
1768         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1769         return -E_OUT_OF_MEMORY;
1770     }
1771     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1772         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1773         if (errCode != E_OK) {
1774             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1775             delete packet;
1776             packet = nullptr;
1777             return errCode;
1778         }
1779     }
1780     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1781     errCode = SendControlPacket(packet, context);
1782     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1783         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1784     }
1785     return errCode;
1786 }
1787 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1788 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1789 {
1790     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1791     if (packet == nullptr) {
1792         return -E_INVALID_ARGS;
1793     }
1794     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1795         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1796     int errCode = ControlCmdRequestRecvPre(context, message);
1797     if (errCode != E_OK) {
1798         return errCode;
1799     }
1800     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1801         errCode = SubscribeRequestRecv(context, message);
1802     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1803         errCode = UnsubscribeRequestRecv(context, message);
1804     }
1805     return errCode;
1806 }
1807 
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1808 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1809 {
1810     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1811     if (subManager == nullptr) {
1812         return -E_INVALID_ARGS;
1813     }
1814     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1815     if (errCode != E_OK) {
1816         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1817         return errCode;
1818     }
1819     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1820     if (packet == nullptr) {
1821         return -E_INVALID_ARGS;
1822     }
1823     int32_t recvCode = packet->GetRecvCode();
1824     uint32_t cmdType = packet->GetcontrolCmdType();
1825     if (recvCode != E_OK) {
1826         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1827             STR_MASK(GetDeviceId()), cmdType);
1828         // for unsubscribe no need to do something
1829         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1830         return recvCode;
1831     }
1832     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1833         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1834     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1835         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1836     }
1837     if (errCode != E_OK) {
1838         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1839         return errCode;
1840     }
1841     return -E_NO_DATA_SEND; // means control msg send finished
1842 }
1843 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1844 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1845 {
1846     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1847         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1848         LOGE("[ControlCmdStartCheck] not support controlCmd");
1849         return -E_INVALID_ARGS;
1850     }
1851     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1852         context->GetQuery().HasInKeys() &&
1853         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1854         return -E_NOT_SUPPORT;
1855     }
1856     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1857         return E_OK;
1858     }
1859     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1860     if (permitReceive) {
1861         context->SetReceivcPermitCheck(true);
1862     } else {
1863         return -E_SECURITY_OPTION_CHECK_ERROR;
1864     }
1865     return E_OK;
1866 }
1867 
SendControlPacket(const ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1868 int SingleVerDataSync::SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1869 {
1870     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1871     if (message == nullptr) {
1872         LOGE("[DataSync][SendControlPacket] new message error");
1873         delete packet;
1874         packet = nullptr;
1875         return -E_OUT_OF_MEMORY;
1876     }
1877     uint32_t packetLen = packet->CalculateLen();
1878     int errCode = message->SetExternalObject(packet);
1879     if (errCode != E_OK) {
1880         delete packet;
1881         packet = nullptr;
1882         delete message;
1883         message = nullptr;
1884         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1885         return errCode;
1886     }
1887     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1888         context->GetSequenceId(), context->GetRequestSessionId());
1889     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1890         context, message->GetSessionId());
1891     errCode = Send(context, message, handler, packetLen);
1892     if (errCode != E_OK) {
1893         delete message;
1894         message = nullptr;
1895     }
1896     return errCode;
1897 }
1898 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1899 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1900     uint32_t controlCmdType, const CommErrHandler &handler)
1901 {
1902     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1903     if (ackMessage == nullptr) {
1904         LOGE("[DataSync][SendControlAck] new message error");
1905         return -E_OUT_OF_MEMORY;
1906     }
1907     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1908     ControlAckPacket ack;
1909     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1910     int errCode = ackMessage->SetCopiedObject(ack);
1911     if (errCode != E_OK) {
1912         delete ackMessage;
1913         ackMessage = nullptr;
1914         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1915         return errCode;
1916     }
1917     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1918         message->GetSequenceId(), message->GetSessionId());
1919     errCode = Send(context, ackMessage, handler, 0);
1920     if (errCode != E_OK) {
1921         delete ackMessage;
1922         ackMessage = nullptr;
1923     }
1924     return errCode;
1925 }
1926 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1927 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1928 {
1929     if (context == nullptr || message == nullptr) {
1930         return -E_INVALID_ARGS;
1931     }
1932     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1933     if (packet == nullptr) {
1934         return -E_INVALID_ARGS;
1935     }
1936     uint32_t controlCmdType = packet->GetcontrolCmdType();
1937     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1938         return DoAbilitySyncIfNeed(context, message, true);
1939     }
1940     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1941         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1942         return -E_WAIT_NEXT_MESSAGE;
1943     }
1944     return E_OK;
1945 }
1946 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1947 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1948     const Message *message)
1949 {
1950     uint32_t controlCmdType = packet->GetcontrolCmdType();
1951     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1952         return E_OK;
1953     }
1954     QuerySyncObject syncQuery = packet->GetQuery();
1955     int errCode;
1956     if (!packet->IsAutoSubscribe()) {
1957         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1958         if (errCode != E_OK) {
1959             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1960             SendControlAck(context, message, errCode, controlCmdType);
1961             return -E_WAIT_NEXT_MESSAGE;
1962         }
1963     }
1964     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1965         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1966     if (mode >= SyncModeType::INVALID_MODE) {
1967         LOGE("[SingleVerDataSync] invalid mode");
1968         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1969         return -E_WAIT_NEXT_MESSAGE;
1970     }
1971     errCode = CheckPermitSendData(mode, context);
1972     if (errCode != E_OK) {
1973         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1974         SendControlAck(context, message, errCode, controlCmdType);
1975     }
1976     return errCode;
1977 }
1978 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1979 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1980 {
1981     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1982     if (packet == nullptr) {
1983         return -E_INVALID_ARGS;
1984     }
1985     int errCode = SubscribeRequestRecvPre(context, packet, message);
1986     if (errCode != E_OK) {
1987         return errCode;
1988     }
1989     uint32_t controlCmdType = packet->GetcontrolCmdType();
1990     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1991     if (subscribeManager == nullptr) {
1992         LOGE("[SingleVerDataSync] subscribeManager check failed");
1993         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1994         return -E_INVALID_ARGS;
1995     }
1996     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
1997     if (errCode != E_OK) {
1998         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1999             STR_MASK(GetDeviceId()));
2000         SendControlAck(context, message, errCode, controlCmdType);
2001         return errCode;
2002     }
2003     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2004     if (errCode != E_OK) {
2005         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2006             STR_MASK(GetDeviceId()));
2007         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2008         SendControlAck(context, message, errCode, controlCmdType);
2009         return errCode;
2010     }
2011     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2012     if (errCode != E_OK) {
2013         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2014         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2015         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2016             STR_MASK(GetDeviceId()));
2017         return errCode;
2018     }
2019     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2020     return errCode;
2021 }
2022 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2023 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2024 {
2025     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2026     if (packet == nullptr) {
2027         return -E_INVALID_ARGS;
2028     }
2029     uint32_t controlCmdType = packet->GetcontrolCmdType();
2030     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2031     if (subscribeManager == nullptr) {
2032         LOGE("[SingleVerDataSync] subscribeManager check failed");
2033         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2034         return -E_INVALID_ARGS;
2035     }
2036     int errCode;
2037     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2038     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2039         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2040         if (errCode != E_OK) {
2041             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2042                 STR_MASK(GetDeviceId()));
2043             SendControlAck(context, message, errCode, controlCmdType);
2044             return errCode;
2045         }
2046     }
2047     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2048     if (errCode != E_OK) {
2049         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2050             STR_MASK(GetDeviceId()));
2051         return errCode;
2052     }
2053     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2054     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2055     return errCode;
2056 }
2057 
PutDataMsg(Message * message)2058 void SingleVerDataSync::PutDataMsg(Message *message)
2059 {
2060     return msgSchedule_.PutMsg(message);
2061 }
2062 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2063 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2064     bool &isNeedContinue)
2065 {
2066     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2067 }
2068 
IsNeedReloadQueue()2069 bool SingleVerDataSync::IsNeedReloadQueue()
2070 {
2071     return msgSchedule_.IsNeedReloadQueue();
2072 }
2073 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2074 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2075 {
2076     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2077 }
2078 
ClearDataMsg()2079 void SingleVerDataSync::ClearDataMsg()
2080 {
2081     msgSchedule_.ClearMsg();
2082 }
2083 
QuerySyncCheck(SingleVerSyncTaskContext * context)2084 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2085 {
2086     if (context == nullptr) {
2087         return -E_INVALID_ARGS;
2088     }
2089     bool isCheckStatus = false;
2090     int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2091     if (errCode != E_OK) {
2092         return errCode;
2093     }
2094     if (!isCheckStatus) {
2095         context->SetTaskErrCode(-E_NOT_SUPPORT);
2096         return -E_NOT_SUPPORT;
2097     }
2098     return E_OK;
2099 }
2100 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2101 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2102     const std::shared_ptr<SubscribeManager> &subscribeManager)
2103 {
2104     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2105         storage_->RemoveSubscribe(queryId);
2106     }
2107 }
2108 } // namespace DistributedDB
2109