1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 storage_ = nullptr;
44 communicateHandle_ = nullptr;
45 metadata_ = nullptr;
46 }
47
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49 std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52 return -E_INVALID_ARGS;
53 }
54 storage_ = static_cast<SyncGenericInterface *>(inStorage);
55 communicateHandle_ = inCommunicateHandle;
56 metadata_ = inMetadata;
57 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58 std::vector<uint8_t> label = inStorage->GetIdentifier();
59 label.resize(3); // only show 3 Bytes enough
60 label_ = DBCommon::VectorToHexString(label);
61 deviceId_ = deviceId;
62 msgSchedule_.Initialize(label_, deviceId_);
63 return E_OK;
64 }
65
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68 std::lock_guard<std::mutex> lock(lock_);
69 if (sessionId_ != 0) { // auto sync timeout resend
70 return ReSendData(context);
71 }
72 ResetSyncStatus(mode, context);
73 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
74 int tmpMode = SyncOperation::TransferSyncMode(mode);
75 int errCode = E_OK;
76 if (tmpMode == SyncModeType::PUSH) {
77 errCode = PushStart(context);
78 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
79 errCode = PushPullStart(context);
80 } else if (tmpMode == SyncModeType::PULL) {
81 errCode = PullRequestStart(context);
82 } else {
83 errCode = PullResponseStart(context);
84 }
85 if (context->IsSkipTimeoutError(errCode)) {
86 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
87 // just return to avoid higher pressure for send.
88 return E_OK;
89 }
90 if (errCode != E_OK) {
91 LOGE("[DataSync] SendStart errCode=%d", errCode);
92 return errCode;
93 }
94 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
95 LOGE("wait for recv finished for push and pull mode");
96 return -E_EKEYREVOKED;
97 }
98 return InnerSyncStart(context);
99 }
100
InnerSyncStart(SingleVerSyncTaskContext * context)101 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
102 {
103 int errCode = CheckPermitSendData(mode_, context);
104 if (errCode != E_OK) {
105 return errCode;
106 }
107 while (true) {
108 if (windowSize_ <= 0 || isAllDataHasSent_) {
109 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
110 label_.c_str(), STR_MASK(deviceId_));
111 return E_OK;
112 }
113 int mode = SyncOperation::TransferSyncMode(mode_);
114 if (mode == SyncModeType::PULL) {
115 LOGE("[DataSync] unexpected error");
116 return -E_INVALID_ARGS;
117 }
118 context->IncSequenceId();
119 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
120 errCode = PushStart(context);
121 } else {
122 errCode = PullResponseStart(context);
123 }
124 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
125 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
126 isAllDataHasSent_ = true;
127 return -E_EKEYREVOKED;
128 }
129 if (context->IsSkipTimeoutError(errCode)) {
130 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
131 // just return to avoid higher pressure for send.
132 return E_OK;
133 }
134 if (errCode != E_OK) {
135 LOGE("[DataSync] InnerSend errCode=%d", errCode);
136 return errCode;
137 }
138 }
139 return E_OK;
140 }
141
InnerClearSyncStatus()142 void SingleVerDataSync::InnerClearSyncStatus()
143 {
144 sessionId_ = 0;
145 reSendMap_.clear();
146 windowSize_ = 0;
147 maxSequenceIdHasSent_ = 0;
148 isAllDataHasSent_ = false;
149 }
150
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)151 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
152 {
153 if (message == nullptr) {
154 LOGE("[DataSync] AckRecv message nullptr");
155 return -E_INVALID_ARGS;
156 }
157 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
158 if (packet == nullptr) {
159 return -E_INVALID_ARGS;
160 }
161 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
162 uint32_t sessionId = message->GetSessionId();
163 uint32_t sequenceId = message->GetSequenceId();
164
165 std::lock_guard<std::mutex> lock(lock_);
166 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
167 windowSize_, label_.c_str(), STR_MASK(deviceId_));
168 if (sessionId != sessionId_) {
169 LOGI("[DataSync] ignore ack,sessionId is different");
170 return E_OK;
171 }
172 Timestamp lastQueryTime = 0;
173 if (reSendMap_.count(sequenceId) != 0) {
174 lastQueryTime = reSendMap_[sequenceId].end;
175 reSendMap_.erase(sequenceId);
176 windowSize_++;
177 } else {
178 LOGI("[DataSync] ack seqId not in map");
179 return E_OK;
180 }
181 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
182 Timestamp dbLastQueryTime = 0;
183 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
184 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
185 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
186 }
187 if (errCode != E_OK) {
188 return errCode;
189 }
190 }
191 if (!isAllDataHasSent_) {
192 return InnerSyncStart(context);
193 } else if (reSendMap_.size() == 0) {
194 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
195 InnerClearSyncStatus();
196 return -E_FINISHED;
197 }
198 return E_OK;
199 }
200
ClearSyncStatus()201 void SingleVerDataSync::ClearSyncStatus()
202 {
203 std::lock_guard<std::mutex> lock(lock_);
204 InnerClearSyncStatus();
205 }
206
ReSendData(SingleVerSyncTaskContext * context)207 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
208 {
209 if (reSendMap_.empty()) {
210 LOGI("[DataSync] ReSend map empty");
211 return -E_INTERNAL_ERROR;
212 }
213 uint32_t sequenceId = reSendMap_.begin()->first;
214 ReSendInfo reSendInfo = reSendMap_.begin()->second;
215 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
216 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
217 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
218 windowSize_, label_.c_str(), STR_MASK(deviceId_));
219 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
220 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
221 return ReSend(context, dataReSendInfo);
222 }
223
GetLocalDeviceName()224 std::string SingleVerDataSync::GetLocalDeviceName()
225 {
226 std::string deviceInfo;
227 if (communicateHandle_ != nullptr) {
228 communicateHandle_->GetLocalIdentity(deviceInfo);
229 }
230 return deviceInfo;
231 }
232
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)233 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
234 uint32_t packetLen)
235 {
236 bool startFeedDogRet = false;
237 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
238 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
239 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
240 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
241 }
242 SendConfig sendConfig;
243 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
244 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
245 if (errCode != E_OK) {
246 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
247 if (startFeedDogRet) {
248 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
249 }
250 }
251 return errCode;
252 }
253
GetData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)254 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData, size_t packetSize)
255 {
256 int errCode;
257 UpdateMtuSize();
258 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
259 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
260 LOGI("[DataSync][GetData] resend data");
261 errCode = GetUnsyncData(context, outData, packetSize);
262 } else {
263 ContinueToken token;
264 context->GetContinueToken(token);
265 if (token == nullptr) {
266 errCode = GetUnsyncData(context, outData, packetSize);
267 } else {
268 LOGD("[DataSync][GetData] get data from token");
269 // if there is data to send, read out data, and update local watermark, send data
270 errCode = GetNextUnsyncData(context, outData, packetSize);
271 }
272 }
273 if (errCode == -E_UNFINISHED) {
274 LOGD("[DataSync][GetData] not finished.");
275 }
276 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
277 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
278 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
279 }
280 return errCode;
281 }
282
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)283 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
284 {
285 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
286 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
287 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
288 bool needCompressOnSync = false;
289 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
290 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
291 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
292 if (performance != nullptr) {
293 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
294 }
295 context->StartFeedDogForGetData(context->GetResponseSessionId());
296 int errCode = GetData(context, syncOutData.entries, packetSize);
297 context->StopFeedDogForGetData();
298 if (performance != nullptr) {
299 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
300 }
301 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
302 context->SetTaskErrCode(errCode);
303 return errCode;
304 }
305
306 int innerCode = InterceptData(syncOutData);
307 if (innerCode != E_OK) {
308 context->SetTaskErrCode(innerCode);
309 return innerCode;
310 }
311
312 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
313 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
314 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
315 { remoteAlgo, version });
316 if (compressCode != E_OK) {
317 return compressCode;
318 }
319 }
320 return errCode;
321 }
322
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)323 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
324 size_t packetSize)
325 {
326 int errCode;
327 WaterMark startMark = 0;
328 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
329 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
330 WaterMark endMark = MAX_TIMESTAMP;
331 if ((startMark > endMark)) {
332 return E_OK;
333 }
334 ContinueToken token = nullptr;
335 context->GetContinueToken(token);
336 if (token != nullptr) {
337 storage_->ReleaseContinueToken(token);
338 }
339 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
340 if (curType != SyncType::QUERY_SYNC_TYPE) {
341 errCode = storage_->GetSyncData(startMark, endMark, outData, token, syncDataSizeInfo);
342 } else {
343 WaterMark deletedStartMark = 0;
344 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
345 Timestamp lastQueryTimestamp = 0;
346 errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(),
347 context->GetDeviceId(), lastQueryTimestamp);
348 if (errCode == E_OK) {
349 QuerySyncObject queryObj = context->GetQuery();
350 errCode = storage_->GetSyncData(queryObj,
351 SyncTimeRange{ startMark, deletedStartMark, endMark, endMark, lastQueryTimestamp},
352 syncDataSizeInfo, token, outData);
353 }
354 }
355 context->SetContinueToken(token);
356 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
357 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
358 }
359 return errCode;
360 }
361
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)362 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
363 size_t packetSize)
364 {
365 ContinueToken token;
366 context->GetContinueToken(token);
367 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
368 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
369 context->SetContinueToken(token);
370 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
371 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
372 }
373 return errCode;
374 }
375
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)376 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
377 SyncType curType, const QuerySyncObject &query)
378 {
379 if (inData.empty()) {
380 return E_OK;
381 }
382 StoreInfo info = {
383 storage_->GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
384 storage_->GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
385 storage_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
386 };
387 std::string clientId;
388 int errCode = E_OK;
389 if (RuntimeContext::GetInstance()->TranslateDeviceId(context->GetDeviceId(), info, clientId) == E_OK) {
390 errCode = metadata_->SaveClientId(context->GetDeviceId(), clientId);
391 if (errCode != E_OK) {
392 LOGW("[DataSync] record clientId failed %d", errCode);
393 }
394 }
395 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
396 if (performance != nullptr) {
397 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
398 }
399
400 const std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
401 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
402 // query only support prefix key and don't have query in packet in 104 version
403 errCode = storage_->PutSyncDataWithQuery(query, inData, context->GetDeviceId());
404 if (performance != nullptr) {
405 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
406 }
407 if (errCode != E_OK) {
408 LOGE("[DataSync][SaveData] save sync data failed,errCode=%d", errCode);
409 }
410 return errCode;
411 }
412
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)413 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
414 {
415 mode_ = inMode;
416 maxSequenceIdHasSent_ = 0;
417 isAllDataHasSent_ = false;
418 context->ReSetSequenceId();
419 reSendMap_.clear();
420 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
421 windowSize_ = LOW_VERSION_WINDOW_SIZE;
422 } else {
423 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
424 }
425 int mode = SyncOperation::TransferSyncMode(inMode);
426 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
427 sessionId_ = context->GetRequestSessionId();
428 } else {
429 sessionId_ = context->GetResponseSessionId();
430 }
431 }
432
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)433 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
434 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
435 {
436 WaterMark localMark = 0;
437 WaterMark deleteMark = 0;
438 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
439 GetLocalDeleteSyncWaterMark(context, deleteMark);
440 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
441 }
442
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const443 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
444 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
445 {
446 WaterMark localMark = 0;
447 int errCode = E_OK;
448 const std::string &deviceId = context->GetDeviceId();
449 std::string queryId = context->GetQuerySyncId();
450 if (syncType != SyncType::QUERY_SYNC_TYPE) {
451 if (isCheckBeforUpdate) {
452 GetLocalWaterMark(syncType, queryId, context, localMark);
453 if (localMark >= dataTimeRange.endTime) {
454 return E_OK;
455 }
456 }
457 errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
458 } else {
459 bool isNeedUpdateMark = true;
460 bool isNeedUpdateDeleteMark = true;
461 if (isCheckBeforUpdate) {
462 WaterMark deleteDataWaterMark = 0;
463 GetLocalWaterMark(syncType, queryId, context, localMark);
464 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
465 if (localMark >= dataTimeRange.endTime) {
466 isNeedUpdateMark = false;
467 }
468 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
469 isNeedUpdateDeleteMark = false;
470 }
471 }
472 if (isNeedUpdateMark) {
473 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
474 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
475 if (errCode != E_OK) {
476 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
477 return errCode;
478 }
479 }
480 if (isNeedUpdateDeleteMark) {
481 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
482 dataTimeRange.deleteEndTime);
483 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
484 }
485 }
486 if (errCode != E_OK) {
487 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
488 }
489 return errCode;
490 }
491
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const492 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
493 const DeviceID &deviceId, WaterMark &waterMark) const
494 {
495 if (syncType != SyncType::QUERY_SYNC_TYPE) {
496 metadata_->GetPeerWaterMark(deviceId, waterMark);
497 return;
498 }
499 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
500 }
501
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)502 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
503 {
504 metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
505 }
506
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const507 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
508 WaterMark &waterMark) const
509 {
510 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
511 }
512
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const513 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
514 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
515 {
516 if (syncType != SyncType::QUERY_SYNC_TYPE) {
517 metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
518 return;
519 }
520 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
521 waterMark, context->IsAutoLiftWaterMark());
522 }
523
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)524 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
525 WaterMark maxSendDataTime)
526 {
527 bool isNeedClearRemoteData = false;
528 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
529 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
530 uint64_t clearDeviceDataMark = 0;
531 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
532 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
533 } else {
534 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
535 if (packet == nullptr) {
536 LOGE("[RemoveDeviceDataHandle] get packet object failed");
537 return -E_INVALID_ARGS;
538 }
539 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
540 WaterMark packetLocalMark = packet->GetLocalWaterMark();
541 WaterMark peerMark = 0;
542 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
543 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
544 }
545 if (!isNeedClearRemoteData) {
546 return E_OK;
547 }
548 int errCode = E_OK;
549 if (context->IsNeedClearRemoteStaleData()) {
550 // need to clear remote device history data
551 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
552 if (errCode != E_OK) {
553 (void)SendDataAck(context, message, errCode, maxSendDataTime);
554 return errCode;
555 }
556 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
557 // avoid repeat clear in ack
558 metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
559 }
560 }
561 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
562 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
563 if (errCode != E_OK) {
564 (void)SendDataAck(context, message, errCode, maxSendDataTime);
565 return errCode;
566 }
567 }
568 return E_OK;
569 }
570
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)571 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
572 const std::vector<uint64_t> &reserved)
573 {
574 bool isNeedClearRemoteData = false;
575 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
576 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
577 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
578 uint64_t clearDeviceDataMark = 0;
579 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
580 isNeedClearRemoteData = (clearDeviceDataMark != 0);
581 } else if (reserved.empty()) {
582 WaterMark localMark = 0;
583 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
584 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
585 } else {
586 WaterMark peerMark = 0;
587 GetPeerWaterMark(curType, context->GetQuerySyncId(),
588 context->GetDeviceId(), peerMark);
589 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
590 }
591 if (!isNeedClearRemoteData) {
592 return E_OK;
593 }
594 // need to clear remote historydata
595 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
596 label_.c_str(), STR_MASK(GetDeviceId()));
597 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
598 if (errCode != E_OK) {
599 return errCode;
600 }
601 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
602 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
603 }
604 return errCode;
605 }
606
SetSessionEndTimestamp(Timestamp end)607 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
608 {
609 sessionEndTimestamp_ = end;
610 }
611
GetSessionEndTimestamp() const612 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
613 {
614 return sessionEndTimestamp_;
615 }
616
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)617 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
618 {
619 ReSendInfo reSendInfo;
620 reSendInfo.start = dataTimeRange.beginTime;
621 reSendInfo.end = dataTimeRange.endTime;
622 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
623 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
624 reSendInfo.packetId = context->GetPacketId();
625 maxSequenceIdHasSent_++;
626 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
627 windowSize_--;
628 ContinueToken token;
629 context->GetContinueToken(token);
630 if (token == nullptr) {
631 isAllDataHasSent_ = true;
632 }
633 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
634 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
635 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
636 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
637 }
638
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)639 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
640 SyncEntry &syncData, int sendCode, int mode)
641 {
642 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
643 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
644 WaterMark localMark = 0;
645 WaterMark peerMark = 0;
646 WaterMark deleteMark = 0;
647 bool needCompressOnSync = false;
648 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
649 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
650 std::string id = context->GetQuerySyncId();
651 GetLocalWaterMark(curType, id, context, localMark);
652 GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
653 GetLocalDeleteSyncWaterMark(context, deleteMark);
654 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
655 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
656 packet->SetLastSequence();
657 }
658 int tmpMode = mode;
659 if (mode == SyncModeType::RESPONSE_PULL) {
660 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
661 }
662 packet->SetData(syncData.entries);
663 packet->SetCompressData(syncData.compressedEntries);
664 packet->SetBasicInfo(sendCode, version, tmpMode);
665 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
666 packet->SetWaterMark(localMark, peerMark, deleteMark);
667 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
668 packet->SetEndWaterMark(context->GetEndMark());
669 packet->SetSessionId(context->GetRequestSessionId());
670 }
671 packet->SetQuery(context->GetQuery());
672 packet->SetQueryId(context->GetQuerySyncId());
673 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
674 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
675 packet->SetCompressDataMark();
676 packet->SetCompressAlgo(curAlgo);
677 }
678 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
679 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
680 context->GetQuery().HasOrderBy())) {
681 packet->SetUpdateWaterMark();
682 }
683 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
684 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
685 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
686 }
687
RequestStart(SingleVerSyncTaskContext * context,int mode)688 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
689 {
690 if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
691 context->SetTaskErrCode(-E_NOT_SUPPORT);
692 return -E_NOT_SUPPORT;
693 }
694 int errCode = RemoveDeviceDataIfNeed(context);
695 if (errCode != E_OK) {
696 context->SetTaskErrCode(errCode);
697 return errCode;
698 }
699 SyncEntry syncData;
700 // get data
701 errCode = GetDataWithPerformanceRecord(context, syncData);
702 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
703
704 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
705 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
706 return errCode;
707 }
708
709 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
710 if (packet == nullptr) {
711 LOGE("[DataSync][PushStart] new DataRequestPacket error");
712 return -E_OUT_OF_MEMORY;
713 }
714 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
715 UpdateWaterMark isUpdateWaterMark;
716 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
717 if (errCode == E_OK) {
718 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
719 }
720 FillDataRequestPacket(packet, context, syncData, errCode, mode);
721 errCode = SendDataPacket(curType, packet, context);
722 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
723 if (performance != nullptr) {
724 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
725 }
726 if (errCode == E_OK || errCode == -E_TIMEOUT) {
727 UpdateSendInfo(dataTime, context);
728 }
729 if (errCode == E_OK) {
730 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
731 context->GetQuery().HasOrderBy())) {
732 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
733 return E_OK;
734 }
735 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
736 SaveLocalWaterMark(curType, context, tmpDataTime);
737 }
738 return errCode;
739 }
740
PushStart(SingleVerSyncTaskContext * context)741 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
742 {
743 if (context == nullptr) {
744 return -E_INVALID_ARGS;
745 }
746 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
747 return RequestStart(context,
748 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
749 }
750
PushPullStart(SingleVerSyncTaskContext * context)751 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
752 {
753 if (context == nullptr) {
754 return -E_INVALID_ARGS;
755 }
756 return RequestStart(context, context->GetMode());
757 }
758
PullRequestStart(SingleVerSyncTaskContext * context)759 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
760 {
761 if (context == nullptr) {
762 return -E_INVALID_ARGS;
763 }
764 if (!SingleVerDataSyncUtils::QuerySyncCheck(context)) {
765 context->SetTaskErrCode(-E_NOT_SUPPORT);
766 return -E_NOT_SUPPORT;
767 }
768 int errCode = RemoveDeviceDataIfNeed(context);
769 if (errCode != E_OK) {
770 context->SetTaskErrCode(errCode);
771 return errCode;
772 }
773 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
774 if (packet == nullptr) {
775 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
776 return -E_OUT_OF_MEMORY;
777 }
778 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
779 WaterMark peerMark = 0;
780 WaterMark localMark = 0;
781 WaterMark deleteMark = 0;
782 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
783 context->GetDeviceId(), peerMark);
784 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
785 GetLocalDeleteSyncWaterMark(context, deleteMark);
786 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
787 WaterMark endMark = context->GetEndMark();
788 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
789
790 packet->SetBasicInfo(E_OK, version, context->GetMode());
791 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
792 packet->SetWaterMark(localMark, peerMark, deleteMark);
793 packet->SetEndWaterMark(endMark);
794 packet->SetSessionId(context->GetRequestSessionId());
795 packet->SetQuery(context->GetQuery());
796 packet->SetQueryId(context->GetQuerySyncId());
797 packet->SetLastSequence();
798 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
799 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
800
801 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
802 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark, label_.c_str(),
803 STR_MASK(GetDeviceId()));
804 UpdateSendInfo(dataTime, context);
805 return SendDataPacket(syncType, packet, context);
806 }
807
PullResponseStart(SingleVerSyncTaskContext * context)808 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
809 {
810 if (context == nullptr) {
811 return -E_INVALID_ARGS;
812 }
813 SyncEntry syncData;
814 // get data
815 int errCode = GetDataWithPerformanceRecord(context, syncData);
816 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
817 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
818 SendPullResponseDataPkt(errCode, syncData, context);
819 }
820 return errCode;
821 }
822 // if send finished
823 int ackCode = E_OK;
824 ContinueToken token = nullptr;
825 context->GetContinueToken(token);
826 if (errCode == E_OK && token == nullptr) {
827 LOGD("[DataSync][PullResponse] send last frame end");
828 ackCode = SEND_FINISHED;
829 }
830 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
831 UpdateWaterMark isUpdateWaterMark;
832 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
833 if (errCode == E_OK) {
834 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
835 }
836 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
837 if (errCode == E_OK || errCode == -E_TIMEOUT) {
838 UpdateSendInfo(dataTime, context);
839 }
840 if (errCode == E_OK) {
841 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
842 context->GetQuery().HasOrderBy())) {
843 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
844 return E_OK;
845 }
846 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
847 SaveLocalWaterMark(curType, context, tmpDataTime);
848 }
849 return errCode;
850 }
851
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)852 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
853 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
854 {
855 WaterMark tmpPeerWatermark = dataTime.endTime;
856 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
857 if (isUpdateWaterMark.normalUpdateMark) {
858 tmpPeerWatermark++;
859 }
860 if (isUpdateWaterMark.deleteUpdateMark) {
861 tmpPeerDeletedWatermark++;
862 }
863 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
864 }
865
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)866 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
867 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
868 {
869 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
870 return;
871 }
872 int errCode = E_OK;
873 if (syncType != SyncType::QUERY_SYNC_TYPE) {
874 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
875 } else {
876 if (peerWatermark != 0) {
877 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
878 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
879 if (errCode != E_OK) {
880 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
881 }
882 }
883 if (peerDeletedWatermark != 0) {
884 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
885 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
886 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
887 }
888 }
889 if (errCode != E_OK) {
890 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
891 }
892 }
893
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)894 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
895 {
896 uint16_t remoteCommunicatorVersion = 0;
897 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
898 -E_NOT_FOUND) {
899 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
900 return -E_VERSION_NOT_SUPPORT;
901 }
902 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
903 if (remoteCommunicatorVersion == 0) {
904 LOGI("[DataSync] set remote version 0");
905 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
906 return E_OK;
907 } else {
908 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
909 if (isControlMsg) {
910 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
911 } else {
912 SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
913 }
914 return -E_WAIT_NEXT_MESSAGE;
915 }
916 }
917
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)918 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
919 {
920 if (context == nullptr || message == nullptr) {
921 return -E_INVALID_ARGS;
922 }
923 auto *packet = message->GetObject<DataRequestPacket>();
924 if (packet == nullptr) {
925 return -E_INVALID_ARGS;
926 }
927 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
928 return DoAbilitySyncIfNeed(context, message);
929 }
930 int32_t sendCode = packet->GetSendCode();
931 if (sendCode == -E_VERSION_NOT_SUPPORT) {
932 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
933 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
934 return -E_WAIT_NEXT_MESSAGE;
935 }
936 // only deal with pull response packet errCode
937 if (sendCode != E_OK && sendCode != SEND_FINISHED &&
938 message->GetSessionId() == context->GetRequestSessionId()) {
939 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
940 return sendCode;
941 }
942 int errCode = RunPermissionCheck(context, message, packet);
943 if (errCode != E_OK) {
944 return errCode;
945 }
946 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
947 errCode = CheckSchemaStrategy(context, message);
948 }
949 if (errCode == E_OK) {
950 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
951 }
952 if (errCode != E_OK) {
953 (void)SendDataAck(context, message, errCode, 0);
954 }
955 return errCode;
956 }
957
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)958 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
959 WaterMark &pullEndWatermark)
960 {
961 int errCode = DataRequestRecvPre(context, message);
962 if (errCode != E_OK) {
963 return errCode;
964 }
965 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
966 const std::vector<SendDataItem> &data = packet->GetData();
967 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
968 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
969 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
970 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
971 context->SetReceiveWaterMarkErr(false);
972 UpdateWaterMark isUpdateWaterMark;
973 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
974 errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
975 if (errCode != E_OK) {
976 return errCode;
977 }
978 if (WaterMarkErrHandle(curType, context, message)) {
979 return E_OK;
980 }
981 GetPullEndWatermark(context, packet, pullEndWatermark);
982 // save data first
983 errCode = SaveData(context, data, curType, packet->GetQuery());
984 if (errCode != E_OK) {
985 (void)SendDataAck(context, message, errCode, dataTime.endTime);
986 return errCode;
987 }
988 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
989 pullEndWatermark = 0;
990 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
991 } else {
992 // if data is empty, we don't know the max timestap of this packet.
993 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
994 }
995 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
996 context->GetRequestSessionId());
997 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
998 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
999 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1000 UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1001 }
1002 if (errCode != E_OK) {
1003 return errCode;
1004 }
1005 if (packet->GetSendCode() == SEND_FINISHED) {
1006 return -E_RECV_FINISHED;
1007 }
1008 return errCode;
1009 }
1010
SendDataPacket(SyncType syncType,const DataRequestPacket * packet,SingleVerSyncTaskContext * context)1011 int SingleVerDataSync::SendDataPacket(SyncType syncType, const DataRequestPacket *packet,
1012 SingleVerSyncTaskContext *context)
1013 {
1014 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1015 if (message == nullptr) {
1016 LOGE("[DataSync][SendDataPacket] new message error");
1017 delete packet;
1018 packet = nullptr;
1019 return -E_OUT_OF_MEMORY;
1020 }
1021 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1022 int errCode = message->SetExternalObject(packet);
1023 if (errCode != E_OK) {
1024 delete packet;
1025 packet = nullptr;
1026 delete message;
1027 message = nullptr;
1028 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1029 return errCode;
1030 }
1031 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1032 context->GetSequenceId(), context->GetRequestSessionId());
1033 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1034 if (performance != nullptr) {
1035 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1036 }
1037 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1038 context, message->GetSessionId());
1039 errCode = Send(context, message, handler, packetLen);
1040 if (errCode != E_OK) {
1041 delete message;
1042 message = nullptr;
1043 }
1044
1045 return errCode;
1046 }
1047
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1048 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1049 SingleVerSyncTaskContext *context)
1050 {
1051 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1052 if (packet == nullptr) {
1053 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1054 return -E_OUT_OF_MEMORY;
1055 }
1056 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1057 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1058 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1059 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1060 if (message == nullptr) {
1061 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1062 delete packet;
1063 packet = nullptr;
1064 return -E_OUT_OF_MEMORY;
1065 }
1066 int errCode = message->SetExternalObject(packet);
1067 if (errCode != E_OK) {
1068 delete packet;
1069 packet = nullptr;
1070 delete message;
1071 message = nullptr;
1072 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1073 return errCode;
1074 }
1075 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1076 context->GetSequenceId(), context->GetResponseSessionId());
1077 SendResetWatchDogPacket(context, packetLen);
1078 errCode = Send(context, message, nullptr, packetLen);
1079 if (errCode != E_OK) {
1080 delete message;
1081 message = nullptr;
1082 }
1083 return errCode;
1084 }
1085
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1086 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1087 {
1088 SendDataAck(context, message, E_OK, 0);
1089 }
1090
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1091 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1092 WaterMark maxSendDataTime)
1093 {
1094 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1095 if (packet == nullptr) {
1096 return -E_INVALID_ARGS;
1097 }
1098 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1099 if (ackMessage == nullptr) {
1100 LOGE("[DataSync][SendDataAck] new message error");
1101 return -E_OUT_OF_MEMORY;
1102 }
1103 DataAckPacket ack;
1104 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1105 int errCode = ackMessage->SetCopiedObject(ack);
1106 if (errCode != E_OK) {
1107 delete ackMessage;
1108 ackMessage = nullptr;
1109 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1110 return errCode;
1111 }
1112 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1113 message->GetSequenceId(), message->GetSessionId());
1114
1115 errCode = Send(context, ackMessage, nullptr, 0);
1116 if (errCode != E_OK) {
1117 delete ackMessage;
1118 ackMessage = nullptr;
1119 }
1120 return errCode;
1121 }
1122
AckPacketIdCheck(const Message * message)1123 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1124 {
1125 if (message == nullptr) {
1126 LOGE("[DataSync] AckRecv message nullptr");
1127 return false;
1128 }
1129 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1130 return true;
1131 }
1132 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1133 if (packet == nullptr) {
1134 return false;
1135 }
1136 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1137 std::lock_guard<std::mutex> lock(lock_);
1138 uint32_t sequenceId = message->GetSequenceId();
1139 if (reSendMap_.count(sequenceId) != 0) {
1140 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1141 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1142 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1143 originalPacketId);
1144 return false;
1145 }
1146 }
1147 return true;
1148 }
1149
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1150 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1151 {
1152 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1153 if (errCode != E_OK) {
1154 return errCode;
1155 }
1156 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1157 if (packet == nullptr) {
1158 return -E_INVALID_ARGS;
1159 }
1160 int32_t recvCode = packet->GetRecvCode();
1161 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1162 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1163 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1164 LOGE("[DataSync][AckRecv] Version mismatch");
1165 return -E_VERSION_NOT_SUPPORT;
1166 }
1167
1168 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT) {
1169 // we should ReleaseContinueToken, avoid crash
1170 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1171 STR_MASK(GetDeviceId()));
1172 context->ReleaseContinueToken();
1173 return recvCode;
1174 }
1175 uint64_t data = packet->GetData();
1176 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1177 return DealWaterMarkException(context, data, packet->GetReserved());
1178 }
1179
1180 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1181 // data only use low 32bit
1182 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1183 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1184 STR_MASK(GetDeviceId()));
1185 }
1186
1187 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1188 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1189 label_.c_str(), STR_MASK(GetDeviceId()));
1190 return recvCode;
1191 }
1192
1193 // Judge if send finished
1194 ContinueToken token;
1195 context->GetContinueToken(token);
1196 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1197 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1198 return -E_NO_DATA_SEND;
1199 }
1200
1201 // send next data
1202 return -E_SEND_DATA;
1203 }
1204
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1205 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1206 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1207 {
1208 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1209 LOGE("[SingleVerDataSync] messageId not available.");
1210 return;
1211 }
1212 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1213 if (ackMessage == nullptr) {
1214 LOGE("[DataSync][SaveDataNotify] new message failed");
1215 return;
1216 }
1217
1218 DataAckPacket ack;
1219 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1220 ack.SetVersion(pktVersion);
1221 int errCode = ackMessage->SetCopiedObject(ack);
1222 if (errCode != E_OK) {
1223 delete ackMessage;
1224 ackMessage = nullptr;
1225 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1226 return;
1227 }
1228 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1229
1230 errCode = Send(context, ackMessage, nullptr, 0);
1231 if (errCode != E_OK) {
1232 delete ackMessage;
1233 ackMessage = nullptr;
1234 }
1235 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1236 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1237 }
1238
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1239 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1240 WaterMark &pullEndWatermark) const
1241 {
1242 if (packet == nullptr) {
1243 return;
1244 }
1245 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1246 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1247 WaterMark endMark = packet->GetEndWaterMark();
1248 TimeOffset offset;
1249 metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1250 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1251 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1252 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1253 }
1254 }
1255
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1256 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1257 const std::vector<uint64_t> &reserved)
1258 {
1259 WaterMark deletedWaterMark = 0;
1260 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1261 if (curType == SyncType::QUERY_SYNC_TYPE) {
1262 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1263 LOGE("[DataSync] get packet reserve size failed");
1264 return -E_INVALID_ARGS;
1265 }
1266 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1267 }
1268 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1269 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1270 int errCode = SaveLocalWaterMark(curType, context,
1271 {0, 0, ackWaterMark, deletedWaterMark});
1272 if (errCode != E_OK) {
1273 return errCode;
1274 }
1275 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1276 context->IncNegotiationCount();
1277 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1278 if (!context->IsNeedClearRemoteStaleData()) {
1279 return -E_RE_SEND_DATA;
1280 }
1281 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1282 if (errCode != E_OK) {
1283 return errCode;
1284 }
1285 return -E_RE_SEND_DATA;
1286 }
1287
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1288 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1289 const DataRequestPacket *packet)
1290 {
1291 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1292 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1293 if (errCode != E_OK) {
1294 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1295 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1296 }
1297 return -E_NOT_PERMIT;
1298 }
1299 const std::vector<SendDataItem> &data = packet->GetData();
1300 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1301 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1302 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1303 !context->GetReceivcPermitCheck()) {
1304 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1305 if (permitReceive) {
1306 context->SetReceivcPermitCheck(true);
1307 } else {
1308 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1309 return -E_SECURITY_OPTION_CHECK_ERROR;
1310 }
1311 }
1312 return errCode;
1313 }
1314
1315 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1316 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1317 {
1318 // When mtu less than 30k, we send data with bluetooth
1319 // In order not to block the bluetooth channel, we don't send notify packet here
1320 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1321 return;
1322 }
1323 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1324
1325 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1326 if (ackMessage == nullptr) {
1327 LOGE("[DataSync][ResetWatchDog] new message failed");
1328 return;
1329 }
1330
1331 DataAckPacket ack;
1332 ack.SetData(data);
1333 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1334 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1335 int errCode = ackMessage->SetCopiedObject(ack);
1336 if (errCode != E_OK) {
1337 delete ackMessage;
1338 ackMessage = nullptr;
1339 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1340 return;
1341 }
1342 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1343 context->GetSequenceId(), context->GetResponseSessionId());
1344
1345 errCode = Send(context, ackMessage, nullptr, 0);
1346 if (errCode != E_OK) {
1347 delete ackMessage;
1348 ackMessage = nullptr;
1349 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1350 STR_MASK(GetDeviceId()));
1351 } else {
1352 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1353 STR_MASK(GetDeviceId()));
1354 }
1355 }
1356
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1357 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1358 {
1359 if (context == nullptr) {
1360 return -E_INVALID_ARGS;
1361 }
1362 SyncEntry syncData;
1363 int errCode = GetReSendData(syncData, context, reSendInfo);
1364 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1365 return errCode;
1366 }
1367 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1368 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1369 if (packet == nullptr) {
1370 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1371 return -E_OUT_OF_MEMORY;
1372 }
1373 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1374 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1375 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1376 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1377 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1378 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1379 dataTime.deleteEndTime += 1;
1380 }
1381 if (reSendInfo.end > reSendInfo.start) {
1382 dataTime.endTime += 1;
1383 }
1384 SaveLocalWaterMark(curType, context, dataTime, true);
1385 }
1386 return errCode;
1387 }
1388
SendReSendPacket(const DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1389 int SingleVerDataSync::SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1390 uint32_t sessionId, uint32_t sequenceId)
1391 {
1392 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1393 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1394 if (message == nullptr) {
1395 LOGE("[DataSync][SendDataPacket] new message error");
1396 delete packet;
1397 packet = nullptr;
1398 return -E_OUT_OF_MEMORY;
1399 }
1400 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1401 int errCode = message->SetExternalObject(packet);
1402 if (errCode != E_OK) {
1403 delete packet;
1404 packet = nullptr;
1405 delete message;
1406 message = nullptr;
1407 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1408 return errCode;
1409 }
1410 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1411 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1412 context, message->GetSessionId());
1413 errCode = Send(context, message, handler, packetLen);
1414 if (errCode != E_OK) {
1415 delete message;
1416 message = nullptr;
1417 }
1418 return errCode;
1419 }
1420
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1421 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1422 {
1423 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1424 int mode = SyncOperation::TransferSyncMode(inMode);
1425 // for pull mode it just need to get data, no need to send data.
1426 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1427 return E_OK;
1428 }
1429 if (context->GetSendPermitCheck()) {
1430 return E_OK;
1431 }
1432 bool isPermitSync = true;
1433 std::string deviceId = context->GetDeviceId();
1434 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1435 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1436 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1437 }
1438 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1439 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1440 if (isPermitSync) {
1441 context->SetSendPermitCheck(true);
1442 return E_OK;
1443 }
1444 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1445 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1446 return -E_SECURITY_OPTION_CHECK_ERROR;
1447 }
1448 if (mode == SyncModeType::RESPONSE_PULL) {
1449 SyncEntry syncData;
1450 SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1451 return -E_SECURITY_OPTION_CHECK_ERROR;
1452 }
1453 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1454 return -E_SECURITY_OPTION_CHECK_ERROR;
1455 }
1456 return E_OK;
1457 }
1458
GetLabel() const1459 std::string SingleVerDataSync::GetLabel() const
1460 {
1461 return label_;
1462 }
1463
GetDeviceId() const1464 std::string SingleVerDataSync::GetDeviceId() const
1465 {
1466 return deviceId_;
1467 }
1468
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1469 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1470 {
1471 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1472 if (packet == nullptr) {
1473 LOGE("[WaterMarkErrHandle] get packet object failed");
1474 return -E_INVALID_ARGS;
1475 }
1476 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1477 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1478 WaterMark peerMark = 0;
1479 WaterMark deletedMark = 0;
1480 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1481 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1482 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1483 }
1484 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1485 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1486 context->SetReceiveWaterMarkErr(true);
1487 SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1488 return true;
1489 }
1490 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1491 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1492 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1493 peerMark);
1494 context->SetReceiveWaterMarkErr(true);
1495 SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1496 return true;
1497 }
1498 return false;
1499 }
1500
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1501 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1502 {
1503 auto *packet = message->GetObject<DataRequestPacket>();
1504 if (packet == nullptr) {
1505 return -E_INVALID_ARGS;
1506 }
1507 auto query = packet->GetQuery();
1508 SyncStrategy localStrategy = context->GetSyncStrategy(query);
1509 if (!context->GetIsSchemaSync()) {
1510 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", context->GetIsSchemaSync());
1511 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1512 return -E_NEED_ABILITY_SYNC;
1513 }
1514 if (!localStrategy.permitSync) {
1515 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", localStrategy.permitSync);
1516 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1517 return -E_SCHEMA_MISMATCH;
1518 }
1519 return E_OK;
1520 }
1521
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1522 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1523 {
1524 int mode = SyncOperation::TransferSyncMode(inMode);
1525 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1526 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1527 return;
1528 }
1529
1530 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1531 storage_->NotifyRemotePushFinished(deviceId_);
1532 }
1533 }
1534
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1535 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1536 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1537 {
1538 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1539 WaterMark localMark = 0;
1540 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1541 ackPacket.SetRecvCode(recvCode);
1542 // send ack packet
1543 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1544 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1545 } else if (recvCode != WATER_MARK_INVALID) {
1546 WaterMark mark = 0;
1547 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1548 ackPacket.SetData(mark);
1549 }
1550 std::vector<uint64_t> reserved {localMark};
1551 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1552 uint64_t packetId = 0;
1553 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1554 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1555 }
1556 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1557 reserved.push_back(packetId);
1558 }
1559 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1560 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1561 WaterMark deletedPeerMark;
1562 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1563 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1564 }
1565 ackPacket.SetReserved(reserved);
1566 ackPacket.SetVersion(version);
1567 }
1568
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1569 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1570 DataSyncReSendInfo reSendInfo)
1571 {
1572 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1573 if (mode == SyncModeType::PULL) {
1574 return E_OK;
1575 }
1576 ContinueToken token = nullptr;
1577 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1578 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1579 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1580 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1581 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1582 int errCode;
1583 if (curType != SyncType::QUERY_SYNC_TYPE) {
1584 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1585 reSendDataSizeInfo);
1586 } else {
1587 QuerySyncObject queryObj = context->GetQuery();
1588 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1589 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1590 }
1591 if (token != nullptr) {
1592 storage_->ReleaseContinueToken(token);
1593 }
1594 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1595 context->SetTaskErrCode(errCode);
1596 return errCode;
1597 }
1598 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1599 return errCode;
1600 }
1601
1602 int innerCode = InterceptData(syncData);
1603 if (innerCode != E_OK) {
1604 context->SetTaskErrCode(innerCode);
1605 return innerCode;
1606 }
1607
1608 bool needCompressOnSync = false;
1609 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1610 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1611 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1612 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1613 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1614 { remoteAlgo, version });
1615 if (compressCode != E_OK) {
1616 return compressCode;
1617 }
1618 }
1619 return errCode;
1620 }
1621
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1622 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1623 {
1624 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1625 return E_OK;
1626 }
1627 uint64_t clearRemoteDataMark = 0;
1628 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1629 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1630 if (clearRemoteDataMark == 0) {
1631 return E_OK;
1632 }
1633 int errCode = E_OK;
1634 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1635 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1636 if (errCode != E_OK) {
1637 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1638 return errCode;
1639 }
1640 }
1641 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1642 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1643 if (errCode != E_OK) {
1644 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1645 return errCode;
1646 }
1647 }
1648 return E_OK;
1649 }
1650
UpdateMtuSize()1651 void SingleVerDataSync::UpdateMtuSize()
1652 {
1653 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1654 }
1655
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1656 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1657 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1658 {
1659 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1660 WaterMark peerMark = 0;
1661 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1662 peerMark);
1663 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1664 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1665 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1666 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1667 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1668 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1669 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1670 packet->SetLastSequence();
1671 }
1672 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1673 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1674 sendCode = SEND_FINISHED;
1675 }
1676 packet->SetData(syncData.entries);
1677 packet->SetCompressData(syncData.compressedEntries);
1678 packet->SetBasicInfo(sendCode, version, reSendMode);
1679 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1680 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1681 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH) {
1682 packet->SetEndWaterMark(context->GetEndMark());
1683 packet->SetQuery(context->GetQuery());
1684 }
1685 packet->SetQueryId(context->GetQuerySyncId());
1686 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1687 std::vector<uint64_t> reserved {reSendInfo.packetId};
1688 packet->SetReserved(reserved);
1689 }
1690 bool needCompressOnSync = false;
1691 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1692 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1693 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1694 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1695 packet->SetCompressDataMark();
1696 packet->SetCompressAlgo(curAlgo);
1697 }
1698 }
1699
GetDataSizeSpecInfo(size_t packetSize)1700 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1701 {
1702 bool needCompressOnSync = false;
1703 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1704 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1705 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1706 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1707 return {blockSize, packetSize};
1708 }
1709
InterceptData(SyncEntry & syncEntry)1710 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1711 {
1712 if (storage_ == nullptr) {
1713 LOGE("Invalid DB. Can not intercept data.");
1714 return -E_INVALID_DB;
1715 }
1716
1717 // GetLocalDeviceName get local device ID.
1718 // GetDeviceId get remote device ID.
1719 // If intercept data fail, entries will be released.
1720 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId());
1721 }
1722
ControlCmdStart(SingleVerSyncTaskContext * context)1723 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1724 {
1725 if (context == nullptr) {
1726 return -E_INVALID_ARGS;
1727 }
1728 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1729 if (subManager == nullptr) {
1730 return -E_INVALID_ARGS;
1731 }
1732 int errCode = ControlCmdStartCheck(context);
1733 if (errCode != E_OK) {
1734 return errCode;
1735 }
1736 ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1737 if (packet == nullptr) {
1738 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1739 return -E_OUT_OF_MEMORY;
1740 }
1741 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1742 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1743 if (errCode != E_OK) {
1744 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1745 delete packet;
1746 packet = nullptr;
1747 return errCode;
1748 }
1749 }
1750 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1751 errCode = SendControlPacket(packet, context);
1752 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1753 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1754 }
1755 return errCode;
1756 }
1757
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1758 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1759 {
1760 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1761 if (packet == nullptr) {
1762 return -E_INVALID_ARGS;
1763 }
1764 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1765 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1766 int errCode = ControlCmdRequestRecvPre(context, message);
1767 if (errCode != E_OK) {
1768 return errCode;
1769 }
1770 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1771 errCode = SubscribeRequestRecv(context, message);
1772 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1773 errCode = UnsubscribeRequestRecv(context, message);
1774 }
1775 return errCode;
1776 }
1777
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1778 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1779 {
1780 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1781 if (subManager == nullptr) {
1782 return -E_INVALID_ARGS;
1783 }
1784 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1785 if (errCode != E_OK) {
1786 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1787 return errCode;
1788 }
1789 const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1790 if (packet == nullptr) {
1791 return -E_INVALID_ARGS;
1792 }
1793 int32_t recvCode = packet->GetRecvCode();
1794 uint32_t cmdType = packet->GetcontrolCmdType();
1795 if (recvCode != E_OK) {
1796 LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1797 STR_MASK(GetDeviceId()), cmdType);
1798 // for unsubscribe no need to do something
1799 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1800 return recvCode;
1801 }
1802 if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1803 errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1804 } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1805 subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1806 }
1807 if (errCode != E_OK) {
1808 LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1809 return errCode;
1810 }
1811 return -E_NO_DATA_SEND; // means control msg send finished
1812 }
1813
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1814 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1815 {
1816 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1817 (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1818 LOGE("[ControlCmdStartCheck] not support controlCmd");
1819 return -E_INVALID_ARGS;
1820 }
1821 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1822 context->GetQuery().HasInKeys() &&
1823 context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1824 return -E_NOT_SUPPORT;
1825 }
1826 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1827 return E_OK;
1828 }
1829 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_);
1830 if (permitReceive) {
1831 context->SetReceivcPermitCheck(true);
1832 } else {
1833 return -E_SECURITY_OPTION_CHECK_ERROR;
1834 }
1835 return E_OK;
1836 }
1837
SendControlPacket(const ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1838 int SingleVerDataSync::SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1839 {
1840 Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1841 if (message == nullptr) {
1842 LOGE("[DataSync][SendControlPacket] new message error");
1843 delete packet;
1844 packet = nullptr;
1845 return -E_OUT_OF_MEMORY;
1846 }
1847 uint32_t packetLen = packet->CalculateLen();
1848 int errCode = message->SetExternalObject(packet);
1849 if (errCode != E_OK) {
1850 delete packet;
1851 packet = nullptr;
1852 delete message;
1853 message = nullptr;
1854 LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1855 return errCode;
1856 }
1857 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1858 context->GetSequenceId(), context->GetRequestSessionId());
1859 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1860 context, message->GetSessionId());
1861 errCode = Send(context, message, handler, packetLen);
1862 if (errCode != E_OK) {
1863 delete message;
1864 message = nullptr;
1865 }
1866 return errCode;
1867 }
1868
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1869 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1870 uint32_t controlCmdType, const CommErrHandler &handler)
1871 {
1872 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1873 if (ackMessage == nullptr) {
1874 LOGE("[DataSync][SendControlAck] new message error");
1875 return -E_OUT_OF_MEMORY;
1876 }
1877 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1878 ControlAckPacket ack;
1879 ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1880 int errCode = ackMessage->SetCopiedObject(ack);
1881 if (errCode != E_OK) {
1882 delete ackMessage;
1883 ackMessage = nullptr;
1884 LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1885 return errCode;
1886 }
1887 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1888 message->GetSequenceId(), message->GetSessionId());
1889 errCode = Send(context, ackMessage, handler, 0);
1890 if (errCode != E_OK) {
1891 delete ackMessage;
1892 ackMessage = nullptr;
1893 }
1894 return errCode;
1895 }
1896
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1897 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1898 {
1899 if (context == nullptr || message == nullptr) {
1900 return -E_INVALID_ARGS;
1901 }
1902 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1903 if (packet == nullptr) {
1904 return -E_INVALID_ARGS;
1905 }
1906 uint32_t controlCmdType = packet->GetcontrolCmdType();
1907 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1908 return DoAbilitySyncIfNeed(context, message, true);
1909 }
1910 if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1911 SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1912 return -E_WAIT_NEXT_MESSAGE;
1913 }
1914 return E_OK;
1915 }
1916
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1917 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1918 const Message *message)
1919 {
1920 uint32_t controlCmdType = packet->GetcontrolCmdType();
1921 if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1922 return E_OK;
1923 }
1924 QuerySyncObject syncQuery = packet->GetQuery();
1925 int errCode;
1926 if (!packet->IsAutoSubscribe()) {
1927 errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1928 if (errCode != E_OK) {
1929 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1930 SendControlAck(context, message, errCode, controlCmdType);
1931 return -E_WAIT_NEXT_MESSAGE;
1932 }
1933 }
1934 int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1935 static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1936 if (mode >= SyncModeType::INVALID_MODE) {
1937 LOGE("[SingleVerDataSync] invalid mode");
1938 SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1939 return -E_WAIT_NEXT_MESSAGE;
1940 }
1941 errCode = CheckPermitSendData(mode, context);
1942 if (errCode != E_OK) {
1943 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1944 SendControlAck(context, message, errCode, controlCmdType);
1945 }
1946 return errCode;
1947 }
1948
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1949 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1950 {
1951 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1952 if (packet == nullptr) {
1953 return -E_INVALID_ARGS;
1954 }
1955 int errCode = SubscribeRequestRecvPre(context, packet, message);
1956 if (errCode != E_OK) {
1957 return errCode;
1958 }
1959 uint32_t controlCmdType = packet->GetcontrolCmdType();
1960 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1961 if (subscribeManager == nullptr) {
1962 LOGE("[SingleVerDataSync] subscribeManager check failed");
1963 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1964 return -E_INVALID_ARGS;
1965 }
1966 errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
1967 if (errCode != E_OK) {
1968 LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1969 STR_MASK(GetDeviceId()));
1970 SendControlAck(context, message, errCode, controlCmdType);
1971 return errCode;
1972 }
1973 errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1974 if (errCode != E_OK) {
1975 LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1976 STR_MASK(GetDeviceId()));
1977 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1978 SendControlAck(context, message, errCode, controlCmdType);
1979 return errCode;
1980 }
1981 errCode = SendControlAck(context, message, E_OK, controlCmdType);
1982 if (errCode != E_OK) {
1983 subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1984 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
1985 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1986 STR_MASK(GetDeviceId()));
1987 return errCode;
1988 }
1989 subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
1990 return errCode;
1991 }
1992
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1993 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1994 {
1995 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1996 if (packet == nullptr) {
1997 return -E_INVALID_ARGS;
1998 }
1999 uint32_t controlCmdType = packet->GetcontrolCmdType();
2000 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2001 if (subscribeManager == nullptr) {
2002 LOGE("[SingleVerDataSync] subscribeManager check failed");
2003 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2004 return -E_INVALID_ARGS;
2005 }
2006 int errCode;
2007 std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2008 if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2009 errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2010 if (errCode != E_OK) {
2011 LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2012 STR_MASK(GetDeviceId()));
2013 SendControlAck(context, message, errCode, controlCmdType);
2014 return errCode;
2015 }
2016 }
2017 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2018 if (errCode != E_OK) {
2019 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2020 STR_MASK(GetDeviceId()));
2021 return errCode;
2022 }
2023 subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2024 metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2025 return errCode;
2026 }
2027
PutDataMsg(Message * message)2028 void SingleVerDataSync::PutDataMsg(Message *message)
2029 {
2030 return msgSchedule_.PutMsg(message);
2031 }
2032
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2033 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2034 bool &isNeedContinue)
2035 {
2036 return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2037 }
2038
IsNeedReloadQueue()2039 bool SingleVerDataSync::IsNeedReloadQueue()
2040 {
2041 return msgSchedule_.IsNeedReloadQueue();
2042 }
2043
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2044 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2045 {
2046 msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2047 }
2048
ClearDataMsg()2049 void SingleVerDataSync::ClearDataMsg()
2050 {
2051 msgSchedule_.ClearMsg();
2052 }
2053
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2054 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2055 const std::shared_ptr<SubscribeManager> &subscribeManager)
2056 {
2057 if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2058 storage_->RemoveSubscribe(queryId);
2059 }
2060 }
2061 } // namespace DistributedDB
2062