• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34     : mtuSize_(0),
35       storage_(nullptr),
36       communicateHandle_(nullptr),
37       metadata_(nullptr)
38 {
39 }
40 
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43     storage_ = nullptr;
44     communicateHandle_ = nullptr;
45     metadata_ = nullptr;
46 }
47 
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49     std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51     if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52         return -E_INVALID_ARGS;
53     }
54     storage_ = static_cast<SyncGenericInterface *>(inStorage);
55     communicateHandle_ = inCommunicateHandle;
56     metadata_ = inMetadata;
57     mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58     std::vector<uint8_t> label = inStorage->GetIdentifier();
59     label.resize(3); // only show 3 Bytes enough
60     label_ = DBCommon::VectorToHexString(label);
61     deviceId_ = deviceId;
62     msgSchedule_.Initialize(label_, deviceId_);
63     return E_OK;
64 }
65 
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68     std::lock_guard<std::mutex> lock(lock_);
69     if (sessionId_ != 0) { // auto sync timeout resend
70         return ReSendData(context);
71     }
72     ResetSyncStatus(mode, context);
73     LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
74     int tmpMode = SyncOperation::TransferSyncMode(mode);
75     int errCode = E_OK;
76     if (tmpMode == SyncModeType::PUSH) {
77         errCode = PushStart(context);
78     } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79         errCode = PushPullStart(context);
80     } else if (tmpMode == SyncModeType::PULL) {
81         errCode = PullRequestStart(context);
82     } else {
83         errCode = PullResponseStart(context);
84     }
85     if (context->IsSkipTimeoutError(errCode)) {
86         // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
87         // just return to avoid higher pressure for send.
88         return E_OK;
89     }
90     if (errCode != E_OK) {
91         LOGE("[DataSync] SendStart errCode=%d", errCode);
92         return errCode;
93     }
94     if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
95         LOGE("wait for recv finished for push and pull mode");
96         return -E_EKEYREVOKED;
97     }
98     return InnerSyncStart(context);
99 }
100 
InnerSyncStart(SingleVerSyncTaskContext * context)101 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
102 {
103     int errCode = CheckPermitSendData(mode_, context);
104     if (errCode != E_OK) {
105         return errCode;
106     }
107     while (true) {
108         if (windowSize_ <= 0 || isAllDataHasSent_) {
109             LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
110                 label_.c_str(), STR_MASK(deviceId_));
111             return E_OK;
112         }
113         int mode = SyncOperation::TransferSyncMode(mode_);
114         if (mode == SyncModeType::PULL) {
115             LOGE("[DataSync] unexpected error");
116             return -E_INVALID_ARGS;
117         }
118         context->IncSequenceId();
119         if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
120             errCode = PushStart(context);
121         } else {
122             errCode = PullResponseStart(context);
123         }
124         if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
125             LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
126             isAllDataHasSent_ = true;
127             return -E_EKEYREVOKED;
128         }
129         if (context->IsSkipTimeoutError(errCode)) {
130             // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
131             // just return to avoid higher pressure for send.
132             return E_OK;
133         }
134         if (errCode != E_OK) {
135             LOGE("[DataSync] InnerSend errCode=%d", errCode);
136             return errCode;
137         }
138     }
139     return E_OK;
140 }
141 
InnerClearSyncStatus()142 void SingleVerDataSync::InnerClearSyncStatus()
143 {
144     sessionId_ = 0;
145     reSendMap_.clear();
146     windowSize_ = 0;
147     maxSequenceIdHasSent_ = 0;
148     isAllDataHasSent_ = false;
149 }
150 
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)151 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
152 {
153     if (message == nullptr) {
154         LOGE("[DataSync] AckRecv message nullptr");
155         return -E_INVALID_ARGS;
156     }
157     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
158     if (packet == nullptr) {
159         return -E_INVALID_ARGS;
160     }
161     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
162     uint32_t sessionId = message->GetSessionId();
163     uint32_t sequenceId = message->GetSequenceId();
164 
165     std::lock_guard<std::mutex> lock(lock_);
166     LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
167         windowSize_, label_.c_str(), STR_MASK(deviceId_));
168     if (sessionId != sessionId_) {
169         LOGI("[DataSync] ignore ack,sessionId is different");
170         return E_OK;
171     }
172     Timestamp lastQueryTime = 0;
173     if (reSendMap_.count(sequenceId) != 0) {
174         lastQueryTime = reSendMap_[sequenceId].end;
175         reSendMap_.erase(sequenceId);
176         windowSize_++;
177     } else {
178         LOGI("[DataSync] ack seqId not in map");
179         return E_OK;
180     }
181     if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
182         Timestamp dbLastQueryTime = 0;
183         int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
184         if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
185             errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
186         }
187         if (errCode != E_OK) {
188             return errCode;
189         }
190     }
191     if (!isAllDataHasSent_) {
192         return InnerSyncStart(context);
193     } else if (reSendMap_.size() == 0) {
194         context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
195         InnerClearSyncStatus();
196         return -E_FINISHED;
197     }
198     return E_OK;
199 }
200 
ClearSyncStatus()201 void SingleVerDataSync::ClearSyncStatus()
202 {
203     std::lock_guard<std::mutex> lock(lock_);
204     InnerClearSyncStatus();
205 }
206 
ReSendData(SingleVerSyncTaskContext * context)207 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
208 {
209     if (reSendMap_.empty()) {
210         LOGI("[DataSync] ReSend map empty");
211         return -E_INTERNAL_ERROR;
212     }
213     uint32_t sequenceId = reSendMap_.begin()->first;
214     ReSendInfo reSendInfo = reSendMap_.begin()->second;
215     LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
216         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
217         reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
218         windowSize_, label_.c_str(), STR_MASK(deviceId_));
219     DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
220         reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
221     return ReSend(context, dataReSendInfo);
222 }
223 
GetLocalDeviceName()224 std::string SingleVerDataSync::GetLocalDeviceName()
225 {
226     std::string deviceInfo;
227     if (communicateHandle_ != nullptr) {
228         communicateHandle_->GetLocalIdentity(deviceInfo);
229     }
230     return deviceInfo;
231 }
232 
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)233 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
234     uint32_t packetLen)
235 {
236     bool startFeedDogRet = false;
237     if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
238         uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
239             static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
240         startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
241     }
242     SendConfig sendConfig;
243     SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
244     int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
245     if (errCode != E_OK) {
246         LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
247         if (startFeedDogRet) {
248             context->StopFeedDogForSync(SyncDirectionFlag::SEND);
249         }
250     }
251     return errCode;
252 }
253 
GetData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)254 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize)
255 {
256     int errCode;
257     UpdateMtuSize();
258     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
259         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
260         LOGI("[DataSync][GetData] resend data");
261         errCode = GetUnsyncData(context, outData, packetSize);
262     } else {
263         ContinueToken token;
264         context->GetContinueToken(token);
265         if (token == nullptr) {
266             errCode = GetUnsyncData(context, outData, packetSize);
267         } else {
268             LOGD("[DataSync][GetData] get data from token");
269             // if there is data to send, read out data, and update local watermark, send data
270             errCode = GetNextUnsyncData(context, outData, packetSize);
271         }
272     }
273     if (errCode == -E_UNFINISHED) {
274         LOGD("[DataSync][GetData] not finished.");
275     }
276     if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
277         std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
278         SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
279     }
280     return errCode;
281 }
282 
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)283 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
284 {
285     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
286     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
287         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
288     bool needCompressOnSync = false;
289     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
290     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
291     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
292     if (performance != nullptr) {
293         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
294     }
295     context->StartFeedDogForGetData(context->GetResponseSessionId());
296     int errCode = GetData(context, syncOutData.entries, packetSize);
297     context->StopFeedDogForGetData();
298     if (performance != nullptr) {
299         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
300     }
301     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302         context->SetTaskErrCode(errCode);
303         return errCode;
304     }
305 
306     int innerCode = InterceptData(syncOutData);
307     if (innerCode != E_OK) {
308         context->SetTaskErrCode(innerCode);
309         return innerCode;
310     }
311 
312     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
313     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
314         int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
315             { remoteAlgo, version });
316         if (compressCode != E_OK) {
317             return compressCode;
318         }
319     }
320     return errCode;
321 }
322 
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)323 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
324     size_t packetSize)
325 {
326     int errCode;
327     WaterMark startMark = 0;
328     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
329     GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
330     WaterMark endMark = MAX_TIMESTAMP;
331     if ((startMark > endMark)) {
332         return E_OK;
333     }
334     ContinueToken token = nullptr;
335     context->GetContinueToken(token);
336     if (token != nullptr) {
337         storage_->ReleaseContinueToken(token);
338     }
339     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
340     if (curType != SyncType::QUERY_SYNC_TYPE) {
341         errCode = storage_->GetSyncData(startMark, endMark, outData, token, syncDataSizeInfo);
342     } else {
343         WaterMark deletedStartMark = 0;
344         GetLocalDeleteSyncWaterMark(context, deletedStartMark);
345         Timestamp lastQueryTimestamp = 0;
346         errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(),
347             context->GetDeviceId(), lastQueryTimestamp);
348         if (errCode == E_OK) {
349             QuerySyncObject queryObj = context->GetQuery();
350             errCode = storage_->GetSyncData(queryObj,
351                 SyncTimeRange{ startMark, deletedStartMark, endMark, endMark, lastQueryTimestamp},
352                 syncDataSizeInfo, token, outData);
353         }
354     }
355     context->SetContinueToken(token);
356     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
357         LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
358     }
359     return errCode;
360 }
361 
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)362 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
363     size_t packetSize)
364 {
365     ContinueToken token;
366     context->GetContinueToken(token);
367     DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
368     int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
369     context->SetContinueToken(token);
370     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
371         LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
372     }
373     return errCode;
374 }
375 
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)376 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
377     SyncType curType, const QuerySyncObject &query)
378 {
379     if (inData.empty()) {
380         return E_OK;
381     }
382     StoreInfo info = {
383         storage_->GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
384         storage_->GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
385         storage_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
386     };
387     std::string clientId;
388     int errCode = E_OK;
389     if (RuntimeContext::GetInstance()->TranslateDeviceId(context->GetDeviceId(), info, clientId) == E_OK) {
390         errCode = metadata_->SaveClientId(context->GetDeviceId(), clientId);
391         if (errCode != E_OK) {
392             LOGW("[DataSync] record clientId failed %d", errCode);
393         }
394     }
395     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
396     if (performance != nullptr) {
397         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
398     }
399 
400     const std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
401     SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
402     // query only support prefix key and don't have query in packet in 104 version
403     errCode = storage_->PutSyncDataWithQuery(query, inData, context->GetDeviceId());
404     if (performance != nullptr) {
405         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
406     }
407     if (errCode != E_OK) {
408         LOGE("[DataSync][SaveData] save sync data failed,errCode=%d", errCode);
409     }
410     return errCode;
411 }
412 
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)413 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
414 {
415     mode_ = inMode;
416     maxSequenceIdHasSent_ = 0;
417     isAllDataHasSent_ = false;
418     context->ReSetSequenceId();
419     reSendMap_.clear();
420     if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
421         windowSize_ = LOW_VERSION_WINDOW_SIZE;
422     } else {
423         windowSize_ = HIGH_VERSION_WINDOW_SIZE;
424     }
425     int mode = SyncOperation::TransferSyncMode(inMode);
426     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
427         sessionId_ = context->GetRequestSessionId();
428     } else {
429         sessionId_ = context->GetResponseSessionId();
430     }
431 }
432 
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)433 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
434     const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
435 {
436     WaterMark localMark = 0;
437     WaterMark deleteMark = 0;
438     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
439     GetLocalDeleteSyncWaterMark(context, deleteMark);
440     return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
441 }
442 
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const443 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
444     SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
445 {
446     WaterMark localMark = 0;
447     int errCode = E_OK;
448     const std::string &deviceId = context->GetDeviceId();
449     std::string queryId = context->GetQuerySyncId();
450     if (syncType != SyncType::QUERY_SYNC_TYPE) {
451         if (isCheckBeforUpdate) {
452             GetLocalWaterMark(syncType, queryId, context, localMark);
453             if (localMark >= dataTimeRange.endTime) {
454                 return E_OK;
455             }
456         }
457         errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
458     } else {
459         bool isNeedUpdateMark = true;
460         bool isNeedUpdateDeleteMark = true;
461         if (isCheckBeforUpdate) {
462             WaterMark deleteDataWaterMark = 0;
463             GetLocalWaterMark(syncType, queryId, context, localMark);
464             GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
465             if (localMark >= dataTimeRange.endTime) {
466                 isNeedUpdateMark = false;
467             }
468             if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
469                 isNeedUpdateDeleteMark = false;
470             }
471         }
472         if (isNeedUpdateMark) {
473             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
474             errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
475             if (errCode != E_OK) {
476                 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
477                 return errCode;
478             }
479         }
480         if (isNeedUpdateDeleteMark) {
481             LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
482                 dataTimeRange.deleteEndTime);
483             errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
484         }
485     }
486     if (errCode != E_OK) {
487         LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
488     }
489     return errCode;
490 }
491 
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const492 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
493     const DeviceID &deviceId, WaterMark &waterMark) const
494 {
495     if (syncType != SyncType::QUERY_SYNC_TYPE) {
496         metadata_->GetPeerWaterMark(deviceId, waterMark);
497         return;
498     }
499     metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
500 }
501 
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)502 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
503 {
504     metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
505 }
506 
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const507 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
508     WaterMark &waterMark) const
509 {
510     metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
511 }
512 
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const513 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
514     const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
515 {
516     if (syncType != SyncType::QUERY_SYNC_TYPE) {
517         metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
518         return;
519     }
520     metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
521         waterMark, context->IsAutoLiftWaterMark());
522 }
523 
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)524 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
525     WaterMark maxSendDataTime)
526 {
527     bool isNeedClearRemoteData = false;
528     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
529     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
530         uint64_t clearDeviceDataMark = 0;
531         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
532         isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
533     } else {
534         const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
535         if (packet == nullptr) {
536             LOGE("[RemoveDeviceDataHandle] get packet object failed");
537             return -E_INVALID_ARGS;
538         }
539         SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
540         WaterMark packetLocalMark = packet->GetLocalWaterMark();
541         WaterMark peerMark = 0;
542         GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
543         isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
544     }
545     if (!isNeedClearRemoteData) {
546         return E_OK;
547     }
548     int errCode = E_OK;
549     if (context->IsNeedClearRemoteStaleData()) {
550         // need to clear remote device history data
551         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
552         if (errCode != E_OK) {
553             (void)SendDataAck(context, message, errCode, maxSendDataTime);
554             return errCode;
555         }
556         if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
557             // avoid repeat clear in ack
558             metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
559         }
560     }
561     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
562         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
563         if (errCode != E_OK) {
564             (void)SendDataAck(context, message, errCode, maxSendDataTime);
565             return errCode;
566         }
567     }
568     return E_OK;
569 }
570 
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)571 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
572     const std::vector<uint64_t> &reserved)
573 {
574     bool isNeedClearRemoteData = false;
575     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
576     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
577     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
578         uint64_t clearDeviceDataMark = 0;
579         metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
580         isNeedClearRemoteData = (clearDeviceDataMark != 0);
581     } else if (reserved.empty()) {
582         WaterMark localMark = 0;
583         GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
584         isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
585     } else {
586         WaterMark peerMark = 0;
587         GetPeerWaterMark(curType, context->GetQuerySyncId(),
588             context->GetDeviceId(), peerMark);
589         isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
590     }
591     if (!isNeedClearRemoteData) {
592         return E_OK;
593     }
594     // need to clear remote historydata
595     LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
596         label_.c_str(), STR_MASK(GetDeviceId()));
597     int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
598     if (errCode != E_OK) {
599         return errCode;
600     }
601     if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
602         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
603     }
604     return errCode;
605 }
606 
SetSessionEndTimestamp(Timestamp end)607 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
608 {
609     sessionEndTimestamp_ = end;
610 }
611 
GetSessionEndTimestamp() const612 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
613 {
614     return sessionEndTimestamp_;
615 }
616 
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)617 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
618 {
619     ReSendInfo reSendInfo;
620     reSendInfo.start = dataTimeRange.beginTime;
621     reSendInfo.end = dataTimeRange.endTime;
622     reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
623     reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
624     reSendInfo.packetId = context->GetPacketId();
625     maxSequenceIdHasSent_++;
626     reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
627     windowSize_--;
628     ContinueToken token;
629     context->GetContinueToken(token);
630     if (token == nullptr) {
631         isAllDataHasSent_ = true;
632     }
633     LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
634         "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
635         reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
636         reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
637 }
638 
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)639 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
640     SyncEntry &syncData, int sendCode, int mode)
641 {
642     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
643     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
644     WaterMark localMark = 0;
645     WaterMark peerMark = 0;
646     WaterMark deleteMark = 0;
647     bool needCompressOnSync = false;
648     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
649     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
650     std::string id = context->GetQuerySyncId();
651     GetLocalWaterMark(curType, id, context, localMark);
652     GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
653     GetLocalDeleteSyncWaterMark(context, deleteMark);
654     if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
655         (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
656         packet->SetLastSequence();
657     }
658     int tmpMode = mode;
659     if (mode == SyncModeType::RESPONSE_PULL) {
660         tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
661     }
662     packet->SetData(syncData.entries);
663     packet->SetCompressData(syncData.compressedEntries);
664     packet->SetBasicInfo(sendCode, version, tmpMode);
665     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
666     packet->SetWaterMark(localMark, peerMark, deleteMark);
667     if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
668         packet->SetEndWaterMark(context->GetEndMark());
669         packet->SetSessionId(context->GetRequestSessionId());
670     }
671     packet->SetQuery(context->GetQuery());
672     packet->SetQueryId(context->GetQuerySyncId());
673     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
674     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
675         packet->SetCompressDataMark();
676         packet->SetCompressAlgo(curAlgo);
677     }
678     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
679     if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
680         context->GetQuery().HasOrderBy())) {
681         packet->SetUpdateWaterMark();
682     }
683     LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
684         "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
685         STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
686 }
687 
RequestStart(SingleVerSyncTaskContext * context,int mode)688 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
689 {
690     if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
691         context->SetTaskErrCode(-E_NOT_SUPPORT);
692         return -E_NOT_SUPPORT;
693     }
694     int errCode = RemoveDeviceDataIfNeed(context);
695     if (errCode != E_OK) {
696         context->SetTaskErrCode(errCode);
697         return errCode;
698     }
699     SyncEntry syncData;
700     // get data
701     errCode = GetDataWithPerformanceRecord(context, syncData);
702     SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
703 
704     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
705         LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
706         return errCode;
707     }
708 
709     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
710     if (packet == nullptr) {
711         LOGE("[DataSync][PushStart] new DataRequestPacket error");
712         return -E_OUT_OF_MEMORY;
713     }
714     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
715     UpdateWaterMark isUpdateWaterMark;
716     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
717     if (errCode == E_OK) {
718         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
719     }
720     FillDataRequestPacket(packet, context, syncData, errCode, mode);
721     errCode = SendDataPacket(curType, packet, context);
722     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
723     if (performance != nullptr) {
724         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
725     }
726     if (errCode == E_OK || errCode == -E_TIMEOUT) {
727         UpdateSendInfo(dataTime, context);
728     }
729     if (errCode == E_OK) {
730         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
731             context->GetQuery().HasOrderBy())) {
732             LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
733             return E_OK;
734         }
735         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
736         SaveLocalWaterMark(curType, context, tmpDataTime);
737     }
738     return errCode;
739 }
740 
PushStart(SingleVerSyncTaskContext * context)741 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
742 {
743     if (context == nullptr) {
744         return -E_INVALID_ARGS;
745     }
746     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
747     return RequestStart(context,
748         (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
749 }
750 
PushPullStart(SingleVerSyncTaskContext * context)751 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
752 {
753     if (context == nullptr) {
754         return -E_INVALID_ARGS;
755     }
756     return RequestStart(context, context->GetMode());
757 }
758 
PullRequestStart(SingleVerSyncTaskContext * context)759 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
760 {
761     if (context == nullptr) {
762         return -E_INVALID_ARGS;
763     }
764     if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
765         context->SetTaskErrCode(-E_NOT_SUPPORT);
766         return -E_NOT_SUPPORT;
767     }
768     int errCode = RemoveDeviceDataIfNeed(context);
769     if (errCode != E_OK) {
770         context->SetTaskErrCode(errCode);
771         return errCode;
772     }
773     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
774     if (packet == nullptr) {
775         LOGE("[DataSync][PullRequest]new DataRequestPacket error");
776         return -E_OUT_OF_MEMORY;
777     }
778     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
779     WaterMark peerMark = 0;
780     WaterMark localMark = 0;
781     WaterMark deleteMark = 0;
782     GetPeerWaterMark(syncType, context->GetQuerySyncId(),
783         context->GetDeviceId(), peerMark);
784     GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
785     GetLocalDeleteSyncWaterMark(context, deleteMark);
786     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
787     WaterMark endMark = context->GetEndMark();
788     SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
789 
790     packet->SetBasicInfo(E_OK, version, context->GetMode());
791     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
792     packet->SetWaterMark(localMark, peerMark, deleteMark);
793     packet->SetEndWaterMark(endMark);
794     packet->SetSessionId(context->GetRequestSessionId());
795     packet->SetQuery(context->GetQuery());
796     packet->SetQueryId(context->GetQuerySyncId());
797     packet->SetLastSequence();
798     SingleVerDataSyncUtils::SetPacketId(packet, context, version);
799     context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
800 
801     LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
802         "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark, label_.c_str(),
803         STR_MASK(GetDeviceId()));
804     UpdateSendInfo(dataTime, context);
805     return SendDataPacket(syncType, packet, context);
806 }
807 
PullResponseStart(SingleVerSyncTaskContext * context)808 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
809 {
810     if (context == nullptr) {
811         return -E_INVALID_ARGS;
812     }
813     SyncEntry syncData;
814     // get data
815     int errCode = GetDataWithPerformanceRecord(context, syncData);
816     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
817         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
818             SendPullResponseDataPkt(errCode, syncData, context);
819         }
820         return errCode;
821     }
822     // if send finished
823     int ackCode = E_OK;
824     ContinueToken token = nullptr;
825     context->GetContinueToken(token);
826     if (errCode == E_OK && token == nullptr) {
827         LOGD("[DataSync][PullResponse] send last frame end");
828         ackCode = SEND_FINISHED;
829     }
830     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
831     UpdateWaterMark isUpdateWaterMark;
832     SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
833     if (errCode == E_OK) {
834         SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
835     }
836     errCode = SendPullResponseDataPkt(ackCode, syncData, context);
837     if (errCode == E_OK || errCode == -E_TIMEOUT) {
838         UpdateSendInfo(dataTime, context);
839     }
840     if (errCode == E_OK) {
841         if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
842             context->GetQuery().HasOrderBy())) {
843             LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
844             return E_OK;
845         }
846         SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
847         SaveLocalWaterMark(curType, context, tmpDataTime);
848     }
849     return errCode;
850 }
851 
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)852 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
853     const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
854 {
855     WaterMark tmpPeerWatermark = dataTime.endTime;
856     WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
857     if (isUpdateWaterMark.normalUpdateMark) {
858         tmpPeerWatermark++;
859     }
860     if (isUpdateWaterMark.deleteUpdateMark) {
861         tmpPeerDeletedWatermark++;
862     }
863     UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
864 }
865 
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)866 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
867     const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
868 {
869     if (peerWatermark == 0 && peerDeletedWatermark == 0) {
870         return;
871     }
872     int errCode = E_OK;
873     if (syncType != SyncType::QUERY_SYNC_TYPE) {
874         errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
875     } else {
876         if (peerWatermark != 0) {
877             LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
878             errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
879             if (errCode != E_OK) {
880                 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
881             }
882         }
883         if (peerDeletedWatermark != 0) {
884             LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
885                 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
886             errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
887         }
888     }
889     if (errCode != E_OK) {
890         LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
891     }
892 }
893 
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)894 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
895 {
896     uint16_t remoteCommunicatorVersion = 0;
897     if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
898         -E_NOT_FOUND) {
899         LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
900         return -E_VERSION_NOT_SUPPORT;
901     }
902     // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
903     if (remoteCommunicatorVersion == 0) {
904         LOGI("[DataSync] set remote version 0");
905         context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
906         return E_OK;
907     } else {
908         LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
909         if (isControlMsg) {
910             SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
911         } else {
912             SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
913         }
914         return -E_WAIT_NEXT_MESSAGE;
915     }
916 }
917 
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)918 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
919 {
920     if (context == nullptr || message == nullptr) {
921         return -E_INVALID_ARGS;
922     }
923     auto *packet = message->GetObject<DataRequestPacket>();
924     if (packet == nullptr) {
925         return -E_INVALID_ARGS;
926     }
927     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
928         return DoAbilitySyncIfNeed(context, message);
929     }
930     int32_t sendCode = packet->GetSendCode();
931     if (sendCode == -E_VERSION_NOT_SUPPORT) {
932         LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
933         (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
934         return -E_WAIT_NEXT_MESSAGE;
935     }
936     // only deal with pull response packet errCode
937     if (sendCode != E_OK && sendCode != SEND_FINISHED &&
938         message->GetSessionId() == context->GetRequestSessionId()) {
939         LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
940         return sendCode;
941     }
942     int errCode = RunPermissionCheck(context, message, packet);
943     if (errCode != E_OK) {
944         return errCode;
945     }
946     if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
947         errCode = CheckSchemaStrategy(context, message);
948     }
949     if (errCode == E_OK) {
950         errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
951     }
952     if (errCode != E_OK) {
953         (void)SendDataAck(context, message, errCode, 0);
954     }
955     return errCode;
956 }
957 
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)958 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
959     WaterMark &pullEndWatermark)
960 {
961     int errCode = DataRequestRecvPre(context, message);
962     if (errCode != E_OK) {
963         return errCode;
964     }
965     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
966     const std::vector<SendDataItem> &data = packet->GetData();
967     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
968     LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
969         " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
970         STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
971     context->SetReceiveWaterMarkErr(false);
972     UpdateWaterMark isUpdateWaterMark;
973     SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
974     errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
975     if (errCode != E_OK) {
976         return errCode;
977     }
978     if (WaterMarkErrHandle(curType, context, message)) {
979         return E_OK;
980     }
981     GetPullEndWatermark(context, packet, pullEndWatermark);
982     // save data first
983     errCode = SaveData(context, data, curType, packet->GetQuery());
984     if (errCode != E_OK) {
985         (void)SendDataAck(context, message, errCode, dataTime.endTime);
986         return errCode;
987     }
988     if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
989         pullEndWatermark = 0;
990         errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
991     } else {
992         // if data is empty, we don't know the max timestap of this packet.
993         errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
994     }
995     RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
996         context->GetRequestSessionId());
997     if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
998         UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
999     } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1000         UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1001     }
1002     if (errCode != E_OK) {
1003         return errCode;
1004     }
1005     if (packet->GetSendCode() == SEND_FINISHED) {
1006         return -E_RECV_FINISHED;
1007     }
1008     return errCode;
1009 }
1010 
SendDataPacket(SyncType syncType,const DataRequestPacket * packet,SingleVerSyncTaskContext * context)1011 int SingleVerDataSync::SendDataPacket(SyncType syncType, const DataRequestPacket *packet,
1012     SingleVerSyncTaskContext *context)
1013 {
1014     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1015     if (message == nullptr) {
1016         LOGE("[DataSync][SendDataPacket] new message error");
1017         delete packet;
1018         packet = nullptr;
1019         return -E_OUT_OF_MEMORY;
1020     }
1021     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1022     int errCode = message->SetExternalObject(packet);
1023     if (errCode != E_OK) {
1024         delete packet;
1025         packet = nullptr;
1026         delete message;
1027         message = nullptr;
1028         LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1029         return errCode;
1030     }
1031     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1032         context->GetSequenceId(), context->GetRequestSessionId());
1033     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1034     if (performance != nullptr) {
1035         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1036     }
1037     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1038         context, message->GetSessionId());
1039     errCode = Send(context, message, handler, packetLen);
1040     if (errCode != E_OK) {
1041         delete message;
1042         message = nullptr;
1043     }
1044 
1045     return errCode;
1046 }
1047 
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1048 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1049     SingleVerSyncTaskContext *context)
1050 {
1051     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1052     if (packet == nullptr) {
1053         LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1054         return -E_OUT_OF_MEMORY;
1055     }
1056     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1057     FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1058     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1059     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1060     if (message == nullptr) {
1061         LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1062         delete packet;
1063         packet = nullptr;
1064         return -E_OUT_OF_MEMORY;
1065     }
1066     int errCode = message->SetExternalObject(packet);
1067     if (errCode != E_OK) {
1068         delete packet;
1069         packet = nullptr;
1070         delete message;
1071         message = nullptr;
1072         LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1073         return errCode;
1074     }
1075     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1076         context->GetSequenceId(), context->GetResponseSessionId());
1077     SendResetWatchDogPacket(context, packetLen);
1078     errCode = Send(context, message, nullptr, packetLen);
1079     if (errCode != E_OK) {
1080         delete message;
1081         message = nullptr;
1082     }
1083     return errCode;
1084 }
1085 
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1086 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1087 {
1088     SendDataAck(context, message, E_OK, 0);
1089 }
1090 
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1091 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1092     WaterMark maxSendDataTime)
1093 {
1094     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1095     if (packet == nullptr) {
1096         return -E_INVALID_ARGS;
1097     }
1098     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1099     if (ackMessage == nullptr) {
1100         LOGE("[DataSync][SendDataAck] new message error");
1101         return -E_OUT_OF_MEMORY;
1102     }
1103     DataAckPacket ack;
1104     SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1105     int errCode = ackMessage->SetCopiedObject(ack);
1106     if (errCode != E_OK) {
1107         delete ackMessage;
1108         ackMessage = nullptr;
1109         LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1110         return errCode;
1111     }
1112     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1113         message->GetSequenceId(), message->GetSessionId());
1114 
1115     errCode = Send(context, ackMessage, nullptr, 0);
1116     if (errCode != E_OK) {
1117         delete ackMessage;
1118         ackMessage = nullptr;
1119     }
1120     return errCode;
1121 }
1122 
AckPacketIdCheck(const Message * message)1123 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1124 {
1125     if (message == nullptr) {
1126         LOGE("[DataSync] AckRecv message nullptr");
1127         return false;
1128     }
1129     if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1130         return true;
1131     }
1132     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1133     if (packet == nullptr) {
1134         return false;
1135     }
1136     uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1137     std::lock_guard<std::mutex> lock(lock_);
1138     uint32_t sequenceId = message->GetSequenceId();
1139     if (reSendMap_.count(sequenceId) != 0) {
1140         uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1141         if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1142             LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1143                 originalPacketId);
1144             return false;
1145         }
1146     }
1147     return true;
1148 }
1149 
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1150 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1151 {
1152     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1153     if (errCode != E_OK) {
1154         return errCode;
1155     }
1156     const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1157     if (packet == nullptr) {
1158         return -E_INVALID_ARGS;
1159     }
1160     int32_t recvCode = packet->GetRecvCode();
1161     LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1162         SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1163     if (recvCode == -E_VERSION_NOT_SUPPORT) {
1164         LOGE("[DataSync][AckRecv] Version mismatch");
1165         return -E_VERSION_NOT_SUPPORT;
1166     }
1167 
1168     if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT) {
1169         // we should ReleaseContinueToken, avoid crash
1170         LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1171             STR_MASK(GetDeviceId()));
1172         context->ReleaseContinueToken();
1173         return recvCode;
1174     }
1175     uint64_t data = packet->GetData();
1176     if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1177         return DealWaterMarkException(context, data, packet->GetReserved());
1178     }
1179 
1180     if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1181         // data only use low 32bit
1182         context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1183         LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1184             STR_MASK(GetDeviceId()));
1185     }
1186 
1187     if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1188         LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1189             label_.c_str(), STR_MASK(GetDeviceId()));
1190         return recvCode;
1191     }
1192 
1193     // Judge if send finished
1194     ContinueToken token;
1195     context->GetContinueToken(token);
1196     if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1197         (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1198         return -E_NO_DATA_SEND;
1199     }
1200 
1201     // send next data
1202     return -E_SEND_DATA;
1203 }
1204 
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1205 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1206     uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1207 {
1208     if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1209         LOGE("[SingleVerDataSync] messageId not available.");
1210         return;
1211     }
1212     Message *ackMessage = new (std::nothrow) Message(inMsgId);
1213     if (ackMessage == nullptr) {
1214         LOGE("[DataSync][SaveDataNotify] new message failed");
1215         return;
1216     }
1217 
1218     DataAckPacket ack;
1219     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1220     ack.SetVersion(pktVersion);
1221     int errCode = ackMessage->SetCopiedObject(ack);
1222     if (errCode != E_OK) {
1223         delete ackMessage;
1224         ackMessage = nullptr;
1225         LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1226         return;
1227     }
1228     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1229 
1230     errCode = Send(context, ackMessage, nullptr, 0);
1231     if (errCode != E_OK) {
1232         delete ackMessage;
1233         ackMessage = nullptr;
1234     }
1235     LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1236         errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1237 }
1238 
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1239 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1240     WaterMark &pullEndWatermark) const
1241 {
1242     if (packet == nullptr) {
1243         return;
1244     }
1245     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1246     if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1247         WaterMark endMark = packet->GetEndWaterMark();
1248         TimeOffset offset;
1249         metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1250         pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1251         LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1252             "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1253     }
1254 }
1255 
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1256 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1257     const std::vector<uint64_t> &reserved)
1258 {
1259     WaterMark deletedWaterMark = 0;
1260     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1261     if (curType == SyncType::QUERY_SYNC_TYPE) {
1262         if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1263             LOGE("[DataSync] get packet reserve size failed");
1264             return -E_INVALID_ARGS;
1265         }
1266         deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1267     }
1268     LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1269         "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1270     int errCode = SaveLocalWaterMark(curType, context,
1271         {0, 0, ackWaterMark, deletedWaterMark});
1272     if (errCode != E_OK) {
1273         return errCode;
1274     }
1275     context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1276     context->IncNegotiationCount();
1277     SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1278     if (!context->IsNeedClearRemoteStaleData()) {
1279         return -E_RE_SEND_DATA;
1280     }
1281     errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1282     if (errCode != E_OK) {
1283         return errCode;
1284     }
1285     return -E_RE_SEND_DATA;
1286 }
1287 
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1288 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1289     const DataRequestPacket *packet)
1290 {
1291     int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1292     int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1293     if (errCode != E_OK) {
1294         if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1295             (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1296         }
1297         return -E_NOT_PERMIT;
1298     }
1299     const std::vector<SendDataItem> &data = packet->GetData();
1300     WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1301     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1302     if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1303         !context->GetReceivcPermitCheck()) {
1304         bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1305         if (permitReceive) {
1306             context->SetReceivcPermitCheck(true);
1307         } else {
1308             (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1309             return -E_SECURITY_OPTION_CHECK_ERROR;
1310         }
1311     }
1312     return errCode;
1313 }
1314 
1315 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1316 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1317 {
1318     // When mtu less than 30k, we send data with bluetooth
1319     // In order not to block the bluetooth channel, we don't send notify packet here
1320     if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1321         return;
1322     }
1323     uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1324 
1325     Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1326     if (ackMessage == nullptr) {
1327         LOGE("[DataSync][ResetWatchDog] new message failed");
1328         return;
1329     }
1330 
1331     DataAckPacket ack;
1332     ack.SetData(data);
1333     ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1334     ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1335     int errCode = ackMessage->SetCopiedObject(ack);
1336     if (errCode != E_OK) {
1337         delete ackMessage;
1338         ackMessage = nullptr;
1339         LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1340         return;
1341     }
1342     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1343         context->GetSequenceId(), context->GetResponseSessionId());
1344 
1345     errCode = Send(context, ackMessage, nullptr, 0);
1346     if (errCode != E_OK) {
1347         delete ackMessage;
1348         ackMessage = nullptr;
1349         LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1350             STR_MASK(GetDeviceId()));
1351     } else {
1352         LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1353             STR_MASK(GetDeviceId()));
1354     }
1355 }
1356 
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1357 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1358 {
1359     if (context == nullptr) {
1360         return -E_INVALID_ARGS;
1361     }
1362     SyncEntry syncData;
1363     int errCode = GetReSendData(syncData, context, reSendInfo);
1364     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1365         return errCode;
1366     }
1367     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1368     DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1369     if (packet == nullptr) {
1370         LOGE("[DataSync][ReSend] new DataRequestPacket error");
1371         return -E_OUT_OF_MEMORY;
1372     }
1373     FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1374     errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1375     if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1376         // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1377         SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1378         if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1379             dataTime.deleteEndTime += 1;
1380         }
1381         if (reSendInfo.end > reSendInfo.start) {
1382             dataTime.endTime += 1;
1383         }
1384         SaveLocalWaterMark(curType, context, dataTime, true);
1385     }
1386     return errCode;
1387 }
1388 
SendReSendPacket(const DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1389 int SingleVerDataSync::SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1390     uint32_t sessionId, uint32_t sequenceId)
1391 {
1392     SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1393     Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1394     if (message == nullptr) {
1395         LOGE("[DataSync][SendDataPacket] new message error");
1396         delete packet;
1397         packet = nullptr;
1398         return -E_OUT_OF_MEMORY;
1399     }
1400     uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1401     int errCode = message->SetExternalObject(packet);
1402     if (errCode != E_OK) {
1403         delete packet;
1404         packet = nullptr;
1405         delete message;
1406         message = nullptr;
1407         LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1408         return errCode;
1409     }
1410     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1411     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1412         context, message->GetSessionId());
1413     errCode = Send(context, message, handler, packetLen);
1414     if (errCode != E_OK) {
1415         delete message;
1416         message = nullptr;
1417     }
1418     return errCode;
1419 }
1420 
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1421 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1422 {
1423     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1424     int mode = SyncOperation::TransferSyncMode(inMode);
1425     // for pull mode it just need to get data, no need to send data.
1426     if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1427         return E_OK;
1428     }
1429     if (context->GetSendPermitCheck()) {
1430         return E_OK;
1431     }
1432     bool isPermitSync = true;
1433     std::string deviceId = context->GetDeviceId();
1434     SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1435     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1436         isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1437     }
1438     LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1439         remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1440     if (isPermitSync) {
1441         context->SetSendPermitCheck(true);
1442         return E_OK;
1443     }
1444     if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1445         context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1446         return -E_SECURITY_OPTION_CHECK_ERROR;
1447     }
1448     if (mode == SyncModeType::RESPONSE_PULL) {
1449         SyncEntry syncData;
1450         SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1451         return -E_SECURITY_OPTION_CHECK_ERROR;
1452     }
1453     if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1454         return -E_SECURITY_OPTION_CHECK_ERROR;
1455     }
1456     return E_OK;
1457 }
1458 
GetLabel() const1459 std::string SingleVerDataSync::GetLabel() const
1460 {
1461     return label_;
1462 }
1463 
GetDeviceId() const1464 std::string SingleVerDataSync::GetDeviceId() const
1465 {
1466     return deviceId_;
1467 }
1468 
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1469 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1470 {
1471     const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1472     if (packet == nullptr) {
1473         LOGE("[WaterMarkErrHandle] get packet object failed");
1474         return -E_INVALID_ARGS;
1475     }
1476     WaterMark packetLocalMark = packet->GetLocalWaterMark();
1477     WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1478     WaterMark peerMark = 0;
1479     WaterMark deletedMark = 0;
1480     GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1481     if (syncType == SyncType::QUERY_SYNC_TYPE) {
1482         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1483     }
1484     if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1485         LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1486         context->SetReceiveWaterMarkErr(true);
1487         SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1488         return true;
1489     }
1490     if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1491         LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1492             "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1493             peerMark);
1494         context->SetReceiveWaterMarkErr(true);
1495         SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1496         return true;
1497     }
1498     return false;
1499 }
1500 
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1501 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1502 {
1503     auto *packet = message->GetObject<DataRequestPacket>();
1504     if (packet == nullptr) {
1505         return -E_INVALID_ARGS;
1506     }
1507     auto query = packet->GetQuery();
1508     SyncStrategy localStrategy = context->GetSyncStrategy(query);
1509     if (!context->GetIsSchemaSync()) {
1510         LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", context->GetIsSchemaSync());
1511         (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1512         return -E_NEED_ABILITY_SYNC;
1513     }
1514     if (!localStrategy.permitSync) {
1515         LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", localStrategy.permitSync);
1516         (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1517         return -E_SCHEMA_MISMATCH;
1518     }
1519     return E_OK;
1520 }
1521 
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1522 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1523 {
1524     int mode = SyncOperation::TransferSyncMode(inMode);
1525     if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1526         (mode != SyncModeType::QUERY_PUSH_PULL)) {
1527         return;
1528     }
1529 
1530     if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId))  {
1531         storage_->NotifyRemotePushFinished(deviceId_);
1532     }
1533 }
1534 
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1535 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1536     const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1537 {
1538     SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1539     WaterMark localMark = 0;
1540     GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1541     ackPacket.SetRecvCode(recvCode);
1542     // send ack packet
1543     if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1544         ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1545     } else if (recvCode != WATER_MARK_INVALID) {
1546         WaterMark mark = 0;
1547         GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1548         ackPacket.SetData(mark);
1549     }
1550     std::vector<uint64_t> reserved {localMark};
1551     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1552     uint64_t packetId = 0;
1553     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1554         packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1555     }
1556     if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1557         reserved.push_back(packetId);
1558     }
1559     // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1560     if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1561         WaterMark deletedPeerMark;
1562         GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1563         reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1564     }
1565     ackPacket.SetReserved(reserved);
1566     ackPacket.SetVersion(version);
1567 }
1568 
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1569 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1570     DataSyncReSendInfo reSendInfo)
1571 {
1572     int mode = SyncOperation::TransferSyncMode(context->GetMode());
1573     if (mode == SyncModeType::PULL) {
1574         return E_OK;
1575     }
1576     ContinueToken token = nullptr;
1577     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1578     size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1579         DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1580     DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1581     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1582     int errCode;
1583     if (curType != SyncType::QUERY_SYNC_TYPE) {
1584         errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1585             reSendDataSizeInfo);
1586     } else {
1587         QuerySyncObject queryObj = context->GetQuery();
1588         errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1589             reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1590     }
1591     if (token != nullptr) {
1592         storage_->ReleaseContinueToken(token);
1593     }
1594     if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1595         context->SetTaskErrCode(errCode);
1596         return errCode;
1597     }
1598     if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1599         return errCode;
1600     }
1601 
1602     int innerCode = InterceptData(syncData);
1603     if (innerCode != E_OK) {
1604         context->SetTaskErrCode(innerCode);
1605         return innerCode;
1606     }
1607 
1608     bool needCompressOnSync = false;
1609     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1610     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1611     CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1612     if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1613         int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1614             { remoteAlgo, version });
1615         if (compressCode != E_OK) {
1616             return compressCode;
1617         }
1618     }
1619     return errCode;
1620 }
1621 
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1622 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1623 {
1624     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1625         return E_OK;
1626     }
1627     uint64_t clearRemoteDataMark = 0;
1628     std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1629     metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1630     if (clearRemoteDataMark == 0) {
1631         return E_OK;
1632     }
1633     int errCode = E_OK;
1634     if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1635         errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1636         if (errCode != E_OK) {
1637             LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1638             return errCode;
1639         }
1640     }
1641     if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1642         errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1643         if (errCode != E_OK) {
1644             LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1645             return errCode;
1646         }
1647     }
1648     return E_OK;
1649 }
1650 
UpdateMtuSize()1651 void SingleVerDataSync::UpdateMtuSize()
1652 {
1653     mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1654 }
1655 
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1656 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1657     DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1658 {
1659     SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1660     WaterMark peerMark = 0;
1661     GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1662         peerMark);
1663     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1664     // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1665     // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1666     int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1667     if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1668         SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1669         LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1670         packet->SetLastSequence();
1671     }
1672     if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1673         context->GetMode() == SyncModeType::RESPONSE_PULL) {
1674         sendCode = SEND_FINISHED;
1675     }
1676     packet->SetData(syncData.entries);
1677     packet->SetCompressData(syncData.compressedEntries);
1678     packet->SetBasicInfo(sendCode, version, reSendMode);
1679     packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1680     packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1681     if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH) {
1682         packet->SetEndWaterMark(context->GetEndMark());
1683         packet->SetQuery(context->GetQuery());
1684     }
1685     packet->SetQueryId(context->GetQuerySyncId());
1686     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1687         std::vector<uint64_t> reserved {reSendInfo.packetId};
1688         packet->SetReserved(reserved);
1689     }
1690     bool needCompressOnSync = false;
1691     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1692     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1693     CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1694     if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1695         packet->SetCompressDataMark();
1696         packet->SetCompressAlgo(curAlgo);
1697     }
1698 }
1699 
GetDataSizeSpecInfo(size_t packetSize)1700 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1701 {
1702     bool needCompressOnSync = false;
1703     uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1704     (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1705     uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1706         mtuSize_ * 100 / compressionRate);  // compressionRate max is 100
1707     return {blockSize, packetSize};
1708 }
1709 
InterceptData(SyncEntry & syncEntry)1710 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1711 {
1712     if (storage_ == nullptr) {
1713         LOGE("Invalid DB. Can not intercept data.");
1714         return -E_INVALID_DB;
1715     }
1716 
1717     // GetLocalDeviceName get local device ID.
1718     // GetDeviceId get remote device ID.
1719     // If intercept data fail, entries will be released.
1720     return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId());
1721 }
1722 
ControlCmdStart(SingleVerSyncTaskContext * context)1723 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1724 {
1725     if (context == nullptr) {
1726         return -E_INVALID_ARGS;
1727     }
1728     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1729     if (subManager == nullptr) {
1730         return -E_INVALID_ARGS;
1731     }
1732     int errCode = ControlCmdStartCheck(context);
1733     if (errCode != E_OK) {
1734         return errCode;
1735     }
1736     ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1737     if (packet == nullptr) {
1738         LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1739         return -E_OUT_OF_MEMORY;
1740     }
1741     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1742         errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1743         if (errCode != E_OK) {
1744             LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1745             delete packet;
1746             packet = nullptr;
1747             return errCode;
1748         }
1749     }
1750     SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1751     errCode = SendControlPacket(packet, context);
1752     if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1753         subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1754     }
1755     return errCode;
1756 }
1757 
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1758 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1759 {
1760     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1761     if (packet == nullptr) {
1762         return -E_INVALID_ARGS;
1763     }
1764     LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1765         STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1766     int errCode = ControlCmdRequestRecvPre(context, message);
1767     if (errCode != E_OK) {
1768         return errCode;
1769     }
1770     if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1771         errCode = SubscribeRequestRecv(context, message);
1772     } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1773         errCode = UnsubscribeRequestRecv(context, message);
1774     }
1775     return errCode;
1776 }
1777 
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1778 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1779 {
1780     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1781     if (subManager == nullptr) {
1782         return -E_INVALID_ARGS;
1783     }
1784     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1785     if (errCode != E_OK) {
1786         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1787         return errCode;
1788     }
1789     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1790     if (packet == nullptr) {
1791         return -E_INVALID_ARGS;
1792     }
1793     int32_t recvCode = packet->GetRecvCode();
1794     uint32_t cmdType = packet->GetcontrolCmdType();
1795     if (recvCode != E_OK) {
1796         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1797             STR_MASK(GetDeviceId()), cmdType);
1798         // for unsubscribe no need to do something
1799         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1800         return recvCode;
1801     }
1802     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1803         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1804     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1805         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1806     }
1807     if (errCode != E_OK) {
1808         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1809         return errCode;
1810     }
1811     return -E_NO_DATA_SEND; // means control msg send finished
1812 }
1813 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1814 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1815 {
1816     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1817         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1818         LOGE("[ControlCmdStartCheck] not support controlCmd");
1819         return -E_INVALID_ARGS;
1820     }
1821     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1822         context->GetQuery().HasInKeys() &&
1823         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1824         return -E_NOT_SUPPORT;
1825     }
1826     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1827         return E_OK;
1828     }
1829     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1830     if (permitReceive) {
1831         context->SetReceivcPermitCheck(true);
1832     } else {
1833         return -E_SECURITY_OPTION_CHECK_ERROR;
1834     }
1835     return E_OK;
1836 }
1837 
SendControlPacket(const ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1838 int SingleVerDataSync::SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1839 {
1840     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1841     if (message == nullptr) {
1842         LOGE("[DataSync][SendControlPacket] new message error");
1843         delete packet;
1844         packet = nullptr;
1845         return -E_OUT_OF_MEMORY;
1846     }
1847     uint32_t packetLen = packet->CalculateLen();
1848     int errCode = message->SetExternalObject(packet);
1849     if (errCode != E_OK) {
1850         delete packet;
1851         packet = nullptr;
1852         delete message;
1853         message = nullptr;
1854         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1855         return errCode;
1856     }
1857     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1858         context->GetSequenceId(), context->GetRequestSessionId());
1859     CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1860         context, message->GetSessionId());
1861     errCode = Send(context, message, handler, packetLen);
1862     if (errCode != E_OK) {
1863         delete message;
1864         message = nullptr;
1865     }
1866     return errCode;
1867 }
1868 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1869 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1870     uint32_t controlCmdType, const CommErrHandler &handler)
1871 {
1872     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1873     if (ackMessage == nullptr) {
1874         LOGE("[DataSync][SendControlAck] new message error");
1875         return -E_OUT_OF_MEMORY;
1876     }
1877     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1878     ControlAckPacket ack;
1879     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1880     int errCode = ackMessage->SetCopiedObject(ack);
1881     if (errCode != E_OK) {
1882         delete ackMessage;
1883         ackMessage = nullptr;
1884         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1885         return errCode;
1886     }
1887     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1888         message->GetSequenceId(), message->GetSessionId());
1889     errCode = Send(context, ackMessage, handler, 0);
1890     if (errCode != E_OK) {
1891         delete ackMessage;
1892         ackMessage = nullptr;
1893     }
1894     return errCode;
1895 }
1896 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1897 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1898 {
1899     if (context == nullptr || message == nullptr) {
1900         return -E_INVALID_ARGS;
1901     }
1902     const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1903     if (packet == nullptr) {
1904         return -E_INVALID_ARGS;
1905     }
1906     uint32_t controlCmdType = packet->GetcontrolCmdType();
1907     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1908         return DoAbilitySyncIfNeed(context, message, true);
1909     }
1910     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1911         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1912         return -E_WAIT_NEXT_MESSAGE;
1913     }
1914     return E_OK;
1915 }
1916 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1917 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1918     const Message *message)
1919 {
1920     uint32_t controlCmdType = packet->GetcontrolCmdType();
1921     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1922         return E_OK;
1923     }
1924     QuerySyncObject syncQuery = packet->GetQuery();
1925     int errCode;
1926     if (!packet->IsAutoSubscribe()) {
1927         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1928         if (errCode != E_OK) {
1929             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1930             SendControlAck(context, message, errCode, controlCmdType);
1931             return -E_WAIT_NEXT_MESSAGE;
1932         }
1933     }
1934     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1935         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1936     if (mode >= SyncModeType::INVALID_MODE) {
1937         LOGE("[SingleVerDataSync] invalid mode");
1938         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1939         return -E_WAIT_NEXT_MESSAGE;
1940     }
1941     errCode = CheckPermitSendData(mode, context);
1942     if (errCode != E_OK) {
1943         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1944         SendControlAck(context, message, errCode, controlCmdType);
1945     }
1946     return errCode;
1947 }
1948 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1949 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1950 {
1951     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1952     if (packet == nullptr) {
1953         return -E_INVALID_ARGS;
1954     }
1955     int errCode = SubscribeRequestRecvPre(context, packet, message);
1956     if (errCode != E_OK) {
1957         return errCode;
1958     }
1959     uint32_t controlCmdType = packet->GetcontrolCmdType();
1960     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1961     if (subscribeManager == nullptr) {
1962         LOGE("[SingleVerDataSync] subscribeManager check failed");
1963         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1964         return -E_INVALID_ARGS;
1965     }
1966     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
1967     if (errCode != E_OK) {
1968         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1969             STR_MASK(GetDeviceId()));
1970         SendControlAck(context, message, errCode, controlCmdType);
1971         return errCode;
1972     }
1973     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1974     if (errCode != E_OK) {
1975         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1976             STR_MASK(GetDeviceId()));
1977         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1978         SendControlAck(context, message, errCode, controlCmdType);
1979         return errCode;
1980     }
1981     errCode = SendControlAck(context, message, E_OK, controlCmdType);
1982     if (errCode != E_OK) {
1983         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1984         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1985         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1986             STR_MASK(GetDeviceId()));
1987         return errCode;
1988     }
1989     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1990     return errCode;
1991 }
1992 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1993 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1994 {
1995     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1996     if (packet == nullptr) {
1997         return -E_INVALID_ARGS;
1998     }
1999     uint32_t controlCmdType = packet->GetcontrolCmdType();
2000     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2001     if (subscribeManager == nullptr) {
2002         LOGE("[SingleVerDataSync] subscribeManager check failed");
2003         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2004         return -E_INVALID_ARGS;
2005     }
2006     int errCode;
2007     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2008     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2009         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2010         if (errCode != E_OK) {
2011             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2012                 STR_MASK(GetDeviceId()));
2013             SendControlAck(context, message, errCode, controlCmdType);
2014             return errCode;
2015         }
2016     }
2017     errCode = SendControlAck(context, message, E_OK, controlCmdType);
2018     if (errCode != E_OK) {
2019         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2020             STR_MASK(GetDeviceId()));
2021         return errCode;
2022     }
2023     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2024     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2025     return errCode;
2026 }
2027 
PutDataMsg(Message * message)2028 void SingleVerDataSync::PutDataMsg(Message *message)
2029 {
2030     return msgSchedule_.PutMsg(message);
2031 }
2032 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2033 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2034     bool &isNeedContinue)
2035 {
2036     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2037 }
2038 
IsNeedReloadQueue()2039 bool SingleVerDataSync::IsNeedReloadQueue()
2040 {
2041     return msgSchedule_.IsNeedReloadQueue();
2042 }
2043 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2044 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2045 {
2046     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2047 }
2048 
ClearDataMsg()2049 void SingleVerDataSync::ClearDataMsg()
2050 {
2051     msgSchedule_.ClearMsg();
2052 }
2053 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2054 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2055     const std::shared_ptr<SubscribeManager> &subscribeManager)
2056 {
2057     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2058         storage_->RemoveSubscribe(queryId);
2059     }
2060 }
2061 } // namespace DistributedDB
2062