1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
SingleVerDataSync()33 SingleVerDataSync::SingleVerDataSync()
34 : mtuSize_(0),
35 storage_(nullptr),
36 communicateHandle_(nullptr),
37 metadata_(nullptr)
38 {
39 }
40
~SingleVerDataSync()41 SingleVerDataSync::~SingleVerDataSync()
42 {
43 storage_ = nullptr;
44 communicateHandle_ = nullptr;
45 metadata_ = nullptr;
46 }
47
Initialize(ISyncInterface * inStorage,ICommunicator * inCommunicateHandle,const std::shared_ptr<Metadata> & inMetadata,const std::string & deviceId)48 int SingleVerDataSync::Initialize(ISyncInterface *inStorage, ICommunicator *inCommunicateHandle,
49 const std::shared_ptr<Metadata> &inMetadata, const std::string &deviceId)
50 {
51 if ((inStorage == nullptr) || (inCommunicateHandle == nullptr) || (inMetadata == nullptr)) {
52 return -E_INVALID_ARGS;
53 }
54 storage_ = static_cast<SyncGenericInterface *>(inStorage);
55 communicateHandle_ = inCommunicateHandle;
56 metadata_ = inMetadata;
57 mtuSize_ = DBConstant::MIN_MTU_SIZE; // default size is 1K, it will update when need sync data.
58 std::vector<uint8_t> label = inStorage->GetIdentifier();
59 label.resize(3); // only show 3 Bytes enough
60 label_ = DBCommon::VectorToHexString(label);
61 deviceId_ = deviceId;
62 msgSchedule_.Initialize(label_, deviceId_);
63 return E_OK;
64 }
65
SyncStart(int mode,SingleVerSyncTaskContext * context)66 int SingleVerDataSync::SyncStart(int mode, SingleVerSyncTaskContext *context)
67 {
68 std::lock_guard<std::mutex> lock(lock_);
69 int errCode = CheckPermitSendData(mode, context);
70 if (errCode != E_OK) {
71 return errCode;
72 }
73 if (sessionId_ != 0) { // auto sync timeout resend
74 return ReSendData(context);
75 }
76 ResetSyncStatus(mode, context);
77 LOGI("[DataSync] SendStart,mode=%d,label=%s,device=%s", mode_, label_.c_str(), STR_MASK(deviceId_));
78 int tmpMode = SyncOperation::TransferSyncMode(mode);
79 if (tmpMode == SyncModeType::PUSH) {
80 errCode = PushStart(context);
81 } else if (tmpMode == SyncModeType::PUSH_AND_PULL) {
82 errCode = PushPullStart(context);
83 } else if (tmpMode == SyncModeType::PULL) {
84 errCode = PullRequestStart(context);
85 } else {
86 errCode = PullResponseStart(context);
87 }
88 if (context->IsSkipTimeoutError(errCode)) {
89 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
90 // just return to avoid higher pressure for send.
91 return E_OK;
92 }
93 if (errCode != E_OK) {
94 LOGE("[DataSync] SendStart errCode=%d", errCode);
95 return errCode;
96 }
97 if (tmpMode == SyncModeType::PUSH_AND_PULL && context->GetTaskErrCode() == -E_EKEYREVOKED) {
98 LOGE("wait for recv finished for push and pull mode");
99 return -E_EKEYREVOKED;
100 }
101 return InnerSyncStart(context);
102 }
103
InnerSyncStart(SingleVerSyncTaskContext * context)104 int SingleVerDataSync::InnerSyncStart(SingleVerSyncTaskContext *context)
105 {
106 int errCode = CheckPermitSendData(mode_, context);
107 if (errCode != E_OK) {
108 return errCode;
109 }
110 while (true) {
111 if (windowSize_ <= 0 || isAllDataHasSent_) {
112 LOGD("[DataSync] InnerDataSync winSize=%d,isAllSent=%d,label=%s,device=%s", windowSize_, isAllDataHasSent_,
113 label_.c_str(), STR_MASK(deviceId_));
114 return E_OK;
115 }
116 int mode = SyncOperation::TransferSyncMode(mode_);
117 if (mode == SyncModeType::PULL) {
118 LOGE("[DataSync] unexpected error");
119 return -E_INVALID_ARGS;
120 }
121 context->IncSequenceId();
122 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
123 errCode = PushStart(context);
124 } else {
125 errCode = PullResponseStart(context);
126 }
127 if ((mode == SyncModeType::PUSH_AND_PULL) && errCode == -E_EKEYREVOKED) {
128 LOGE("[DataSync] wait for recv finished,label=%s,device=%s", label_.c_str(), STR_MASK(deviceId_));
129 isAllDataHasSent_ = true;
130 return -E_EKEYREVOKED;
131 }
132 if (context->IsSkipTimeoutError(errCode)) {
133 // if E_TIMEOUT occurred, means send message pressure is high, put into resend map and wait for resend.
134 // just return to avoid higher pressure for send.
135 return E_OK;
136 }
137 if (errCode != E_OK) {
138 LOGE("[DataSync] InnerSend errCode=%d", errCode);
139 return errCode;
140 }
141 }
142 return E_OK;
143 }
144
InnerClearSyncStatus()145 void SingleVerDataSync::InnerClearSyncStatus()
146 {
147 sessionId_ = 0;
148 reSendMap_.clear();
149 windowSize_ = 0;
150 maxSequenceIdHasSent_ = 0;
151 isAllDataHasSent_ = false;
152 }
153
TryContinueSync(SingleVerSyncTaskContext * context,const Message * message)154 int SingleVerDataSync::TryContinueSync(SingleVerSyncTaskContext *context, const Message *message)
155 {
156 if (message == nullptr) {
157 LOGE("[DataSync] AckRecv message nullptr");
158 return -E_INVALID_ARGS;
159 }
160 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
161 if (packet == nullptr) {
162 return -E_INVALID_ARGS;
163 }
164 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
165 uint32_t sessionId = message->GetSessionId();
166 uint32_t sequenceId = message->GetSequenceId();
167
168 std::lock_guard<std::mutex> lock(lock_);
169 LOGI("[DataSync] recv ack seqId=%" PRIu32 ",packetId=%" PRIu64 ",winSize=%d,label=%s,dev=%s", sequenceId, packetId,
170 windowSize_, label_.c_str(), STR_MASK(deviceId_));
171 if (sessionId != sessionId_) {
172 LOGI("[DataSync] ignore ack,sessionId is different");
173 return E_OK;
174 }
175 Timestamp lastQueryTime = 0;
176 if (reSendMap_.count(sequenceId) != 0) {
177 lastQueryTime = reSendMap_[sequenceId].end;
178 reSendMap_.erase(sequenceId);
179 windowSize_++;
180 } else {
181 LOGI("[DataSync] ack seqId not in map");
182 return E_OK;
183 }
184 if (context->IsQuerySync() && storage_->GetInterfaceType() == ISyncInterface::SYNC_RELATION) {
185 Timestamp dbLastQueryTime = 0;
186 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), dbLastQueryTime);
187 if (errCode == E_OK && dbLastQueryTime < lastQueryTime) {
188 errCode = metadata_->SetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(), lastQueryTime);
189 }
190 if (errCode != E_OK) {
191 return errCode;
192 }
193 }
194 if (!isAllDataHasSent_) {
195 return InnerSyncStart(context);
196 } else if (reSendMap_.empty()) {
197 context->SetOperationStatus(SyncOperation::OP_SEND_FINISHED);
198 InnerClearSyncStatus();
199 return -E_FINISHED;
200 }
201 return E_OK;
202 }
203
ClearSyncStatus()204 void SingleVerDataSync::ClearSyncStatus()
205 {
206 std::lock_guard<std::mutex> lock(lock_);
207 InnerClearSyncStatus();
208 }
209
ReSendData(SingleVerSyncTaskContext * context)210 int SingleVerDataSync::ReSendData(SingleVerSyncTaskContext *context)
211 {
212 if (reSendMap_.empty()) {
213 LOGI("[DataSync] ReSend map empty");
214 return -E_INTERNAL_ERROR;
215 }
216 uint32_t sequenceId = reSendMap_.begin()->first;
217 ReSendInfo reSendInfo = reSendMap_.begin()->second;
218 LOGI("[DataSync] ReSend mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",delStart=%" PRIu64 ",delEnd=%" PRIu64 ","
219 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",windowsize=%d,label=%s,deviceId=%s", mode_, reSendInfo.start,
220 reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, sequenceId, reSendInfo.packetId,
221 windowSize_, label_.c_str(), STR_MASK(deviceId_));
222 DataSyncReSendInfo dataReSendInfo = {sessionId_, sequenceId, reSendInfo.start, reSendInfo.end,
223 reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, reSendInfo.packetId};
224 return ReSend(context, dataReSendInfo);
225 }
226
GetLocalDeviceName()227 std::string SingleVerDataSync::GetLocalDeviceName()
228 {
229 std::string deviceInfo;
230 if (communicateHandle_ != nullptr) {
231 communicateHandle_->GetLocalIdentity(deviceInfo);
232 }
233 return deviceInfo;
234 }
235
Send(SingleVerSyncTaskContext * context,const Message * message,const CommErrHandler & handler,uint32_t packetLen)236 int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *message, const CommErrHandler &handler,
237 uint32_t packetLen)
238 {
239 bool startFeedDogRet = false;
240 if (packetLen > mtuSize_ && mtuSize_ > NOTIFY_MIN_MTU_SIZE) {
241 uint32_t time = static_cast<uint32_t>(static_cast<uint64_t>(packetLen) *
242 static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_); // no overflow
243 startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND);
244 }
245 SendConfig sendConfig;
246 SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig);
247 int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler);
248 if (errCode != E_OK) {
249 LOGE("[DataSync][Send] send message failed, errCode=%d", errCode);
250 if (startFeedDogRet) {
251 context->StopFeedDogForSync(SyncDirectionFlag::SEND);
252 }
253 }
254 return errCode;
255 }
256
GetDataWithPerformanceRecord(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)257 int SingleVerDataSync::GetDataWithPerformanceRecord(SingleVerSyncTaskContext *context,
258 std::vector<SendDataItem> &outData, size_t packetSize)
259 {
260 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
261 if (performance != nullptr) {
262 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_READ_DATA);
263 }
264 context->StartFeedDogForGetData(context->GetResponseSessionId());
265 int errCode = GetData(context, packetSize, outData);
266 context->StopFeedDogForGetData();
267 if (performance != nullptr) {
268 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_READ_DATA);
269 }
270 return errCode;
271 }
272
GetData(SingleVerSyncTaskContext * context,size_t packetSize,std::vector<SendDataItem> & outData)273 int SingleVerDataSync::GetData(SingleVerSyncTaskContext *context, size_t packetSize, std::vector<SendDataItem> &outData)
274 {
275 int errCode;
276 UpdateMtuSize();
277 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
278 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
279 LOGI("[DataSync][GetData] resend data");
280 errCode = GetUnsyncData(context, outData, packetSize);
281 } else {
282 ContinueToken token;
283 context->GetContinueToken(token);
284 if (token == nullptr) {
285 errCode = GetUnsyncData(context, outData, packetSize);
286 } else {
287 LOGD("[DataSync][GetData] get data from token");
288 // if there is data to send, read out data, and update local watermark, send data
289 errCode = GetNextUnsyncData(context, outData, packetSize);
290 }
291 }
292 if (errCode == -E_UNFINISHED) {
293 LOGD("[DataSync][GetData] not finished.");
294 }
295 if (SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
296 std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
297 SingleVerDataSyncUtils::TransDbDataItemToSendDataItem(localHashName, outData);
298 }
299 return errCode;
300 }
301
GetMatchData(SingleVerSyncTaskContext * context,SyncEntry & syncOutData)302 int SingleVerDataSync::GetMatchData(SingleVerSyncTaskContext *context, SyncEntry &syncOutData)
303 {
304 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
305 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
306 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
307 bool needCompressOnSync = false;
308 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
309 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
310 int errCode = GetDataWithPerformanceRecord(context, syncOutData.entries, packetSize);
311 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
312 context->SetTaskErrCode(errCode);
313 return errCode;
314 }
315
316 int innerCode = InterceptData(syncOutData);
317 if (innerCode != E_OK) {
318 context->SetTaskErrCode(innerCode);
319 return innerCode;
320 }
321
322 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
323 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
324 int compressCode = GenericSingleVerKvEntry::Compress(syncOutData.entries, syncOutData.compressedEntries,
325 { remoteAlgo, version });
326 if (compressCode != E_OK) {
327 return compressCode;
328 }
329 }
330 return errCode;
331 }
332
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)333 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
334 size_t packetSize)
335 {
336 SyncTimeRange waterRange;
337 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
338 WaterMark startMark = 0;
339 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
340 GetLocalWaterMark(curType, context->GetQuerySyncId(), context, startMark);
341 if ((waterRange.endTime == 0) || (startMark > waterRange.endTime)) {
342 return E_OK;
343 }
344 waterRange.beginTime = startMark;
345 if (curType == SyncType::QUERY_SYNC_TYPE) {
346 WaterMark deletedStartMark = 0;
347 GetLocalDeleteSyncWaterMark(context, deletedStartMark);
348 Timestamp lastQueryTimestamp = 0;
349 int errCode = metadata_->GetLastQueryTime(context->GetQuerySyncId(), context->GetDeviceId(),
350 lastQueryTimestamp);
351 if (errCode != E_OK) {
352 return errCode;
353 }
354 waterRange.deleteBeginTime = deletedStartMark;
355 waterRange.lastQueryTime = lastQueryTimestamp;
356 }
357 return GetUnsyncData(context, outData, syncDataSizeInfo, waterRange);
358 }
359
GetUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,DataSizeSpecInfo syncDataSizeInfo,SyncTimeRange & waterMarkInfo)360 int SingleVerDataSync::GetUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
361 DataSizeSpecInfo syncDataSizeInfo, SyncTimeRange &waterMarkInfo)
362 {
363 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
364 ContinueToken token = nullptr;
365 context->GetContinueToken(token);
366 if (token != nullptr) {
367 storage_->ReleaseContinueToken(token);
368 }
369 int errCode;
370 if (curType != SyncType::QUERY_SYNC_TYPE) {
371 errCode = storage_->GetSyncData(waterMarkInfo.beginTime, waterMarkInfo.endTime, outData, token,
372 syncDataSizeInfo);
373 } else {
374 QuerySyncObject queryObj = context->GetQuery();
375 errCode = storage_->GetSyncData(queryObj, waterMarkInfo, syncDataSizeInfo, token, outData);
376 }
377 context->SetContinueToken(token);
378 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
379 LOGE("[DataSync][GetUnsyncData] get unsync data failed,errCode=%d", errCode);
380 }
381 return errCode;
382 }
383
GetNextUnsyncData(SingleVerSyncTaskContext * context,std::vector<SendDataItem> & outData,size_t packetSize)384 int SingleVerDataSync::GetNextUnsyncData(SingleVerSyncTaskContext *context, std::vector<SendDataItem> &outData,
385 size_t packetSize)
386 {
387 ContinueToken token;
388 context->GetContinueToken(token);
389 DataSizeSpecInfo syncDataSizeInfo = GetDataSizeSpecInfo(packetSize);
390 int errCode = storage_->GetSyncDataNext(outData, token, syncDataSizeInfo);
391 context->SetContinueToken(token);
392 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
393 LOGE("[DataSync][GetNextUnsyncData] get next unsync data failed, errCode=%d", errCode);
394 }
395 return errCode;
396 }
397
SaveData(const SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,SyncType curType,const QuerySyncObject & query)398 int SingleVerDataSync::SaveData(const SingleVerSyncTaskContext *context, const std::vector<SendDataItem> &inData,
399 SyncType curType, const QuerySyncObject &query)
400 {
401 if (inData.empty()) {
402 return E_OK;
403 }
404 StoreInfo info = {
405 storage_->GetDbProperties().GetStringProp(DBProperties::USER_ID, ""),
406 storage_->GetDbProperties().GetStringProp(DBProperties::APP_ID, ""),
407 storage_->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "")
408 };
409 std::string clientId;
410 int errCode = E_OK;
411 if (RuntimeContext::GetInstance()->TranslateDeviceId(context->GetDeviceId(), info, clientId) == E_OK) {
412 errCode = metadata_->SaveClientId(context->GetDeviceId(), clientId);
413 if (errCode != E_OK) {
414 LOGW("[DataSync] record clientId failed %d", errCode);
415 }
416 }
417 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
418 if (performance != nullptr) {
419 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SAVE_DATA);
420 }
421
422 const std::string localHashName = DBCommon::TransferHashString(GetLocalDeviceName());
423 SingleVerDataSyncUtils::TransSendDataItemToLocal(context, localHashName, inData);
424 // query only support prefix key and don't have query in packet in 104 version
425 errCode = storage_->PutSyncDataWithQuery(query, inData, context->GetDeviceId());
426 if (performance != nullptr) {
427 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SAVE_DATA);
428 }
429 if (errCode != E_OK) {
430 LOGE("[DataSync][SaveData] save sync data failed,errCode=%d", errCode);
431 }
432 return errCode;
433 }
434
ResetSyncStatus(int inMode,SingleVerSyncTaskContext * context)435 void SingleVerDataSync::ResetSyncStatus(int inMode, SingleVerSyncTaskContext *context)
436 {
437 mode_ = inMode;
438 maxSequenceIdHasSent_ = 0;
439 isAllDataHasSent_ = false;
440 context->ReSetSequenceId();
441 reSendMap_.clear();
442 if (context->GetRemoteSoftwareVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
443 windowSize_ = LOW_VERSION_WINDOW_SIZE;
444 } else {
445 windowSize_ = HIGH_VERSION_WINDOW_SIZE;
446 }
447 int mode = SyncOperation::TransferSyncMode(inMode);
448 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::PULL) {
449 sessionId_ = context->GetRequestSessionId();
450 } else {
451 sessionId_ = context->GetResponseSessionId();
452 }
453 }
454
GetSyncDataTimeRange(SyncType syncType,SingleVerSyncTaskContext * context,const std::vector<SendDataItem> & inData,UpdateWaterMark & isUpdate)455 SyncTimeRange SingleVerDataSync::GetSyncDataTimeRange(SyncType syncType, SingleVerSyncTaskContext *context,
456 const std::vector<SendDataItem> &inData, UpdateWaterMark &isUpdate)
457 {
458 WaterMark localMark = 0;
459 WaterMark deleteMark = 0;
460 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
461 GetLocalDeleteSyncWaterMark(context, deleteMark);
462 return SingleVerDataSyncUtils::GetSyncDataTimeRange(syncType, localMark, deleteMark, inData, isUpdate);
463 }
464
SaveLocalWaterMark(SyncType syncType,const SingleVerSyncTaskContext * context,SyncTimeRange dataTimeRange,bool isCheckBeforUpdate) const465 int SingleVerDataSync::SaveLocalWaterMark(SyncType syncType, const SingleVerSyncTaskContext *context,
466 SyncTimeRange dataTimeRange, bool isCheckBeforUpdate) const
467 {
468 WaterMark localMark = 0;
469 int errCode = E_OK;
470 const std::string &deviceId = context->GetDeviceId();
471 std::string queryId = context->GetQuerySyncId();
472 if (syncType != SyncType::QUERY_SYNC_TYPE) {
473 if (isCheckBeforUpdate) {
474 GetLocalWaterMark(syncType, queryId, context, localMark);
475 if (localMark >= dataTimeRange.endTime) {
476 return E_OK;
477 }
478 }
479 errCode = metadata_->SaveLocalWaterMark(deviceId, dataTimeRange.endTime);
480 } else {
481 bool isNeedUpdateMark = true;
482 bool isNeedUpdateDeleteMark = true;
483 if (isCheckBeforUpdate) {
484 WaterMark deleteDataWaterMark = 0;
485 GetLocalWaterMark(syncType, queryId, context, localMark);
486 GetLocalDeleteSyncWaterMark(context, deleteDataWaterMark);
487 if (localMark >= dataTimeRange.endTime) {
488 isNeedUpdateMark = false;
489 }
490 if (deleteDataWaterMark >= dataTimeRange.deleteEndTime) {
491 isNeedUpdateDeleteMark = false;
492 }
493 }
494 if (isNeedUpdateMark) {
495 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), dataTimeRange.endTime);
496 errCode = metadata_->SetSendQueryWaterMark(queryId, deviceId, dataTimeRange.endTime);
497 if (errCode != E_OK) {
498 LOGE("[DataSync][SaveLocalWaterMark] save query metadata watermark failed,errCode=%d", errCode);
499 return errCode;
500 }
501 }
502 if (isNeedUpdateDeleteMark) {
503 LOGD("label=%s,dev=%s,deleteEndTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()),
504 dataTimeRange.deleteEndTime);
505 errCode = metadata_->SetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), dataTimeRange.deleteEndTime);
506 }
507 }
508 if (errCode != E_OK) {
509 LOGE("[DataSync][SaveLocalWaterMark] save metadata local watermark failed,errCode=%d", errCode);
510 }
511 return errCode;
512 }
513
GetPeerWaterMark(SyncType syncType,const std::string & queryIdentify,const DeviceID & deviceId,WaterMark & waterMark) const514 void SingleVerDataSync::GetPeerWaterMark(SyncType syncType, const std::string &queryIdentify,
515 const DeviceID &deviceId, WaterMark &waterMark) const
516 {
517 if (syncType != SyncType::QUERY_SYNC_TYPE) {
518 metadata_->GetPeerWaterMark(deviceId, waterMark);
519 return;
520 }
521 metadata_->GetRecvQueryWaterMark(queryIdentify, deviceId, waterMark);
522 }
523
GetPeerDeleteSyncWaterMark(const DeviceID & deviceId,WaterMark & waterMark)524 void SingleVerDataSync::GetPeerDeleteSyncWaterMark(const DeviceID &deviceId, WaterMark &waterMark)
525 {
526 metadata_->GetRecvDeleteSyncWaterMark(deviceId, waterMark);
527 }
528
GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext * context,WaterMark & waterMark) const529 void SingleVerDataSync::GetLocalDeleteSyncWaterMark(const SingleVerSyncTaskContext *context,
530 WaterMark &waterMark) const
531 {
532 metadata_->GetSendDeleteSyncWaterMark(context->GetDeleteSyncId(), waterMark, context->IsAutoLiftWaterMark());
533 }
534
GetLocalWaterMark(SyncType syncType,const std::string & queryIdentify,const SingleVerSyncTaskContext * context,WaterMark & waterMark) const535 void SingleVerDataSync::GetLocalWaterMark(SyncType syncType, const std::string &queryIdentify,
536 const SingleVerSyncTaskContext *context, WaterMark &waterMark) const
537 {
538 if (syncType != SyncType::QUERY_SYNC_TYPE) {
539 metadata_->GetLocalWaterMark(context->GetDeviceId(), waterMark);
540 return;
541 }
542 metadata_->GetSendQueryWaterMark(queryIdentify, context->GetDeviceId(),
543 waterMark, context->IsAutoLiftWaterMark());
544 }
545
RemoveDeviceDataHandle(SingleVerSyncTaskContext * context,const Message * message,WaterMark maxSendDataTime)546 int SingleVerDataSync::RemoveDeviceDataHandle(SingleVerSyncTaskContext *context, const Message *message,
547 WaterMark maxSendDataTime)
548 {
549 bool isNeedClearRemoteData = false;
550 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
551 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
552 uint64_t clearDeviceDataMark = 0;
553 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
554 isNeedClearRemoteData = (clearDeviceDataMark == REMOVE_DEVICE_DATA_MARK);
555 } else {
556 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
557 if (packet == nullptr) {
558 LOGE("[RemoveDeviceDataHandle] get packet object failed");
559 return -E_INVALID_ARGS;
560 }
561 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
562 WaterMark packetLocalMark = packet->GetLocalWaterMark();
563 WaterMark peerMark = 0;
564 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(), peerMark);
565 isNeedClearRemoteData = ((packetLocalMark == 0) && (peerMark != 0));
566 }
567 if (!isNeedClearRemoteData) {
568 return E_OK;
569 }
570 int errCode = E_OK;
571 if (context->IsNeedClearRemoteStaleData()) {
572 // need to clear remote device history data
573 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
574 if (errCode != E_OK) {
575 (void)SendDataAck(context, message, errCode, maxSendDataTime);
576 return errCode;
577 }
578 if (context->GetRemoteSoftwareVersion() == SOFTWARE_VERSION_EARLIEST) {
579 // avoid repeat clear in ack
580 metadata_->SaveLocalWaterMark(context->GetDeviceId(), 0);
581 }
582 }
583 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
584 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
585 if (errCode != E_OK) {
586 (void)SendDataAck(context, message, errCode, maxSendDataTime);
587 return errCode;
588 }
589 }
590 return E_OK;
591 }
592
DealRemoveDeviceDataByAck(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)593 int SingleVerDataSync::DealRemoveDeviceDataByAck(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
594 const std::vector<uint64_t> &reserved)
595 {
596 bool isNeedClearRemoteData = false;
597 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
598 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
599 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
600 uint64_t clearDeviceDataMark = 0;
601 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearDeviceDataMark);
602 isNeedClearRemoteData = (clearDeviceDataMark != 0);
603 } else if (reserved.empty()) {
604 WaterMark localMark = 0;
605 GetLocalWaterMark(curType, context->GetQuery().GetIdentify(), context, localMark);
606 isNeedClearRemoteData = ((localMark != 0) && (ackWaterMark == 0));
607 } else {
608 WaterMark peerMark = 0;
609 GetPeerWaterMark(curType, context->GetQuerySyncId(),
610 context->GetDeviceId(), peerMark);
611 isNeedClearRemoteData = ((reserved[ACK_PACKET_RESERVED_INDEX_LOCAL_WATER_MARK] == 0) && (peerMark != 0));
612 }
613 if (!isNeedClearRemoteData) {
614 return E_OK;
615 }
616 // need to clear remote historydata
617 LOGI("[DataSync][WaterMarkException] AckRecv reserved not empty,rebuilted,clear historydata,label=%s,dev=%s",
618 label_.c_str(), STR_MASK(GetDeviceId()));
619 int errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
620 if (errCode != E_OK) {
621 return errCode;
622 }
623 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_3_0) {
624 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
625 }
626 return errCode;
627 }
628
SetSessionEndTimestamp(Timestamp end)629 void SingleVerDataSync::SetSessionEndTimestamp(Timestamp end)
630 {
631 sessionEndTimestamp_ = end;
632 }
633
GetSessionEndTimestamp() const634 Timestamp SingleVerDataSync::GetSessionEndTimestamp() const
635 {
636 return sessionEndTimestamp_;
637 }
638
UpdateSendInfo(SyncTimeRange dataTimeRange,SingleVerSyncTaskContext * context)639 void SingleVerDataSync::UpdateSendInfo(SyncTimeRange dataTimeRange, SingleVerSyncTaskContext *context)
640 {
641 ReSendInfo reSendInfo;
642 reSendInfo.start = dataTimeRange.beginTime;
643 reSendInfo.end = dataTimeRange.endTime;
644 reSendInfo.deleteBeginTime = dataTimeRange.deleteBeginTime;
645 reSendInfo.deleteEndTime = dataTimeRange.deleteEndTime;
646 reSendInfo.packetId = context->GetPacketId();
647 maxSequenceIdHasSent_++;
648 reSendMap_[maxSequenceIdHasSent_] = reSendInfo;
649 windowSize_--;
650 ContinueToken token;
651 context->GetContinueToken(token);
652 if (token == nullptr) {
653 isAllDataHasSent_ = true;
654 }
655 LOGI("[DataSync] mode=%d,start=%" PRIu64 ",end=%" PRIu64 ",deleteStart=%" PRIu64 ",deleteEnd=%" PRIu64 ","
656 "seqId=%" PRIu32 ",packetId=%" PRIu64 ",window_size=%d,isAllSend=%d,label=%s,device=%s", mode_,
657 reSendInfo.start, reSendInfo.end, reSendInfo.deleteBeginTime, reSendInfo.deleteEndTime, maxSequenceIdHasSent_,
658 reSendInfo.packetId, windowSize_, isAllDataHasSent_, label_.c_str(), STR_MASK(deviceId_));
659 }
660
FillDataRequestPacket(DataRequestPacket * packet,SingleVerSyncTaskContext * context,SyncEntry & syncData,int sendCode,int mode)661 void SingleVerDataSync::FillDataRequestPacket(DataRequestPacket *packet, SingleVerSyncTaskContext *context,
662 SyncEntry &syncData, int sendCode, int mode)
663 {
664 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
665 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
666 WaterMark localMark = 0;
667 WaterMark peerMark = 0;
668 WaterMark deleteMark = 0;
669 bool needCompressOnSync = false;
670 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
671 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
672 std::string id = context->GetQuerySyncId();
673 GetLocalWaterMark(curType, id, context, localMark);
674 GetPeerWaterMark(curType, id, context->GetDeviceId(), peerMark);
675 GetLocalDeleteSyncWaterMark(context, deleteMark);
676 if (((mode != SyncModeType::RESPONSE_PULL && sendCode == E_OK)) ||
677 (mode == SyncModeType::RESPONSE_PULL && sendCode == SEND_FINISHED)) {
678 packet->SetLastSequence();
679 }
680 int tmpMode = mode;
681 if (mode == SyncModeType::RESPONSE_PULL) {
682 tmpMode = (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH;
683 }
684 packet->SetData(syncData.entries);
685 packet->SetCompressData(syncData.compressedEntries);
686 packet->SetBasicInfo(sendCode, version, tmpMode);
687 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
688 packet->SetWaterMark(localMark, peerMark, deleteMark);
689 if (SyncOperation::TransferSyncMode(mode) == SyncModeType::PUSH_AND_PULL) {
690 packet->SetEndWaterMark(context->GetEndMark());
691 packet->SetSessionId(context->GetRequestSessionId());
692 }
693 packet->SetQuery(context->GetQuery());
694 packet->SetQueryId(context->GetQuerySyncId());
695 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
696 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
697 packet->SetCompressDataMark();
698 packet->SetCompressAlgo(curAlgo);
699 }
700 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
701 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
702 context->GetQuery().HasOrderBy())) {
703 packet->SetUpdateWaterMark();
704 }
705 LOGD("[DataSync] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",label=%s,dev=%s,queryId=%s,"
706 "isCompress=%d", static_cast<int>(curType), localMark, deleteMark, context->GetEndMark(), label_.c_str(),
707 STR_MASK(GetDeviceId()), STR_MASK(context->GetQuery().GetIdentify()), packet->IsCompressData());
708 }
709
RequestStart(SingleVerSyncTaskContext * context,int mode)710 int SingleVerDataSync::RequestStart(SingleVerSyncTaskContext *context, int mode)
711 {
712 int errCode = QuerySyncCheck(context);
713 if (errCode != E_OK) {
714 return errCode;
715 }
716 errCode = RemoveDeviceDataIfNeed(context);
717 if (errCode != E_OK) {
718 context->SetTaskErrCode(errCode);
719 return errCode;
720 }
721 SyncEntry syncData;
722 // get data
723 errCode = GetMatchData(context, syncData);
724 SingleVerDataSyncUtils::TranslateErrCodeIfNeed(mode, context->GetRemoteSoftwareVersion(), errCode);
725
726 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
727 LOGE("[DataSync][PushStart] get data failed, errCode=%d", errCode);
728 return errCode;
729 }
730
731 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
732 if (packet == nullptr) {
733 LOGE("[DataSync][PushStart] new DataRequestPacket error");
734 return -E_OUT_OF_MEMORY;
735 }
736 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
737 UpdateWaterMark isUpdateWaterMark;
738 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
739 if (errCode == E_OK) {
740 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
741 }
742 FillDataRequestPacket(packet, context, syncData, errCode, mode);
743 errCode = SendDataPacket(curType, packet, context);
744 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
745 if (performance != nullptr) {
746 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_MACHINE_START_TO_PUSH_SEND);
747 }
748 if (errCode == E_OK || errCode == -E_TIMEOUT) {
749 UpdateSendInfo(dataTime, context);
750 }
751 if (errCode == E_OK) {
752 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
753 context->GetQuery().HasOrderBy())) {
754 LOGI("[DataSync][RequestStart] query contain limit/offset/orderby, no need to update watermark.");
755 return E_OK;
756 }
757 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
758 SaveLocalWaterMark(curType, context, tmpDataTime);
759 }
760 return errCode;
761 }
762
PushStart(SingleVerSyncTaskContext * context)763 int SingleVerDataSync::PushStart(SingleVerSyncTaskContext *context)
764 {
765 if (context == nullptr) {
766 return -E_INVALID_ARGS;
767 }
768 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
769 return RequestStart(context,
770 (curType == SyncType::QUERY_SYNC_TYPE) ? SyncModeType::QUERY_PUSH : SyncModeType::PUSH);
771 }
772
PushPullStart(SingleVerSyncTaskContext * context)773 int SingleVerDataSync::PushPullStart(SingleVerSyncTaskContext *context)
774 {
775 if (context == nullptr) {
776 return -E_INVALID_ARGS;
777 }
778 return RequestStart(context, context->GetMode());
779 }
780
PullRequestStart(SingleVerSyncTaskContext * context)781 int SingleVerDataSync::PullRequestStart(SingleVerSyncTaskContext *context)
782 {
783 if (context == nullptr) {
784 return -E_INVALID_ARGS;
785 }
786 int errCode = QuerySyncCheck(context);
787 if (errCode != E_OK) {
788 return errCode;
789 }
790 errCode = RemoveDeviceDataIfNeed(context);
791 if (errCode != E_OK) {
792 context->SetTaskErrCode(errCode);
793 return errCode;
794 }
795 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
796 if (packet == nullptr) {
797 LOGE("[DataSync][PullRequest]new DataRequestPacket error");
798 return -E_OUT_OF_MEMORY;
799 }
800 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
801 WaterMark peerMark = 0;
802 WaterMark localMark = 0;
803 WaterMark deleteMark = 0;
804 GetPeerWaterMark(syncType, context->GetQuerySyncId(),
805 context->GetDeviceId(), peerMark);
806 GetLocalWaterMark(syncType, context->GetQuerySyncId(), context, localMark);
807 GetLocalDeleteSyncWaterMark(context, deleteMark);
808 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
809 WaterMark endMark = context->GetEndMark();
810 SyncTimeRange dataTime = {localMark, deleteMark, localMark, deleteMark};
811
812 packet->SetBasicInfo(E_OK, version, context->GetMode());
813 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
814 packet->SetWaterMark(localMark, peerMark, deleteMark);
815 packet->SetEndWaterMark(endMark);
816 packet->SetSessionId(context->GetRequestSessionId());
817 packet->SetQuery(context->GetQuery());
818 packet->SetQueryId(context->GetQuerySyncId());
819 packet->SetLastSequence();
820 SingleVerDataSyncUtils::SetPacketId(packet, context, version);
821 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
822
823 LOGD("[DataSync][Pull] curType=%d,local=%" PRIu64 ",del=%" PRIu64 ",end=%" PRIu64 ",peer=%" PRIu64 ",label=%s,"
824 "dev=%s", static_cast<int>(syncType), localMark, deleteMark, endMark, peerMark, label_.c_str(),
825 STR_MASK(GetDeviceId()));
826 UpdateSendInfo(dataTime, context);
827 return SendDataPacket(syncType, packet, context);
828 }
829
PullResponseStart(SingleVerSyncTaskContext * context)830 int SingleVerDataSync::PullResponseStart(SingleVerSyncTaskContext *context)
831 {
832 if (context == nullptr) {
833 return -E_INVALID_ARGS;
834 }
835 SyncEntry syncData;
836 // get data
837 int errCode = GetMatchData(context, syncData);
838 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
839 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
840 (void)SendPullResponseDataPkt(errCode, syncData, context);
841 }
842 return errCode;
843 }
844 // if send finished
845 int ackCode = E_OK;
846 ContinueToken token = nullptr;
847 context->GetContinueToken(token);
848 if (errCode == E_OK && token == nullptr) {
849 LOGD("[DataSync][PullResponse] send last frame end");
850 ackCode = SEND_FINISHED;
851 }
852 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
853 UpdateWaterMark isUpdateWaterMark;
854 SyncTimeRange dataTime = GetSyncDataTimeRange(curType, context, syncData.entries, isUpdateWaterMark);
855 if (errCode == E_OK) {
856 SetSessionEndTimestamp(std::max(dataTime.endTime, dataTime.deleteEndTime));
857 }
858 errCode = SendPullResponseDataPkt(ackCode, syncData, context);
859 if (errCode == E_OK || errCode == -E_TIMEOUT) {
860 UpdateSendInfo(dataTime, context);
861 }
862 if (errCode == E_OK) {
863 if (curType == SyncType::QUERY_SYNC_TYPE && (context->GetQuery().HasLimit() ||
864 context->GetQuery().HasOrderBy())) {
865 LOGI("[DataSync][PullResponseStart] query contain limit/offset/orderby, no need to update watermark.");
866 return E_OK;
867 }
868 SyncTimeRange tmpDataTime = SingleVerDataSyncUtils::ReviseLocalMark(curType, dataTime, isUpdateWaterMark);
869 SaveLocalWaterMark(curType, context, tmpDataTime);
870 }
871 return errCode;
872 }
873
UpdateQueryPeerWaterMark(SyncType syncType,const std::string & queryId,const SyncTimeRange & dataTime,const SingleVerSyncTaskContext * context,UpdateWaterMark isUpdateWaterMark)874 void SingleVerDataSync::UpdateQueryPeerWaterMark(SyncType syncType, const std::string &queryId,
875 const SyncTimeRange &dataTime, const SingleVerSyncTaskContext *context, UpdateWaterMark isUpdateWaterMark)
876 {
877 WaterMark tmpPeerWatermark = dataTime.endTime;
878 WaterMark tmpPeerDeletedWatermark = dataTime.deleteEndTime;
879 if (isUpdateWaterMark.normalUpdateMark) {
880 tmpPeerWatermark++;
881 }
882 if (isUpdateWaterMark.deleteUpdateMark) {
883 tmpPeerDeletedWatermark++;
884 }
885 UpdatePeerWaterMark(syncType, queryId, context, tmpPeerWatermark, tmpPeerDeletedWatermark);
886 }
887
UpdatePeerWaterMark(SyncType syncType,const std::string & queryId,const SingleVerSyncTaskContext * context,WaterMark peerWatermark,WaterMark peerDeletedWatermark)888 void SingleVerDataSync::UpdatePeerWaterMark(SyncType syncType, const std::string &queryId,
889 const SingleVerSyncTaskContext *context, WaterMark peerWatermark, WaterMark peerDeletedWatermark)
890 {
891 if (peerWatermark == 0 && peerDeletedWatermark == 0) {
892 return;
893 }
894 int errCode = E_OK;
895 if (syncType != SyncType::QUERY_SYNC_TYPE) {
896 errCode = metadata_->SavePeerWaterMark(context->GetDeviceId(), peerWatermark, true);
897 } else {
898 if (peerWatermark != 0) {
899 LOGD("label=%s,dev=%s,endTime=%" PRIu64, label_.c_str(), STR_MASK(GetDeviceId()), peerWatermark);
900 errCode = metadata_->SetRecvQueryWaterMark(queryId, context->GetDeviceId(), peerWatermark);
901 if (errCode != E_OK) {
902 LOGE("[DataSync][UpdatePeerWaterMark] save query peer water mark failed,errCode=%d", errCode);
903 }
904 }
905 if (peerDeletedWatermark != 0) {
906 LOGD("label=%s,dev=%s,peerDeletedTime=%" PRIu64,
907 label_.c_str(), STR_MASK(GetDeviceId()), peerDeletedWatermark);
908 errCode = metadata_->SetRecvDeleteSyncWaterMark(context->GetDeleteSyncId(), peerDeletedWatermark);
909 }
910 }
911 if (errCode != E_OK) {
912 LOGE("[DataSync][UpdatePeerWaterMark] save peer water mark failed,errCode=%d", errCode);
913 }
914 }
915
DoAbilitySyncIfNeed(SingleVerSyncTaskContext * context,const Message * message,bool isControlMsg)916 int SingleVerDataSync::DoAbilitySyncIfNeed(SingleVerSyncTaskContext *context, const Message *message, bool isControlMsg)
917 {
918 uint16_t remoteCommunicatorVersion = 0;
919 if (communicateHandle_->GetRemoteCommunicatorVersion(context->GetDeviceId(), remoteCommunicatorVersion) ==
920 -E_NOT_FOUND) {
921 LOGE("[DataSync][DoAbilitySyncIfNeed] get remote communicator version failed");
922 return -E_VERSION_NOT_SUPPORT;
923 }
924 // If remote is not the first version, we need check SOFTWARE_VERSION_BASE
925 if (remoteCommunicatorVersion == 0) {
926 LOGI("[DataSync] set remote version 0");
927 context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_EARLIEST);
928 return E_OK;
929 } else {
930 LOGI("[DataSync][DoAbilitySyncIfNeed] need do ability sync");
931 if (isControlMsg) {
932 SendControlAck(context, message, -E_NEED_ABILITY_SYNC, 0);
933 } else {
934 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
935 }
936 return -E_NEED_ABILITY_SYNC;
937 }
938 }
939
DataRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)940 int SingleVerDataSync::DataRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
941 {
942 if (context == nullptr || message == nullptr) {
943 return -E_INVALID_ARGS;
944 }
945 auto *packet = message->GetObject<DataRequestPacket>();
946 if (packet == nullptr) {
947 return -E_INVALID_ARGS;
948 }
949 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
950 return DoAbilitySyncIfNeed(context, message);
951 }
952 int32_t sendCode = packet->GetSendCode();
953 if (sendCode == -E_VERSION_NOT_SUPPORT) {
954 LOGE("[DataSync] Version mismatch: ver=%u, current=%u", packet->GetVersion(), SOFTWARE_VERSION_CURRENT);
955 (void)SendDataAck(context, message, -E_VERSION_NOT_SUPPORT, 0);
956 return -E_WAIT_NEXT_MESSAGE;
957 }
958 // only deal with pull response packet errCode
959 if (sendCode != E_OK && sendCode != SEND_FINISHED &&
960 message->GetSessionId() == context->GetRequestSessionId()) {
961 LOGE("[DataSync][DataRequestRecvPre] remote pullResponse getData sendCode=%d", sendCode);
962 return sendCode;
963 }
964 int errCode = RunPermissionCheck(context, message, packet);
965 if (errCode != E_OK) {
966 return errCode;
967 }
968 if (std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT) > SOFTWARE_VERSION_RELEASE_2_0) {
969 errCode = CheckSchemaStrategy(context, message);
970 }
971 if (errCode == E_OK) {
972 errCode = SingleVerDataSyncUtils::RequestQueryCheck(packet, storage_);
973 }
974 if (errCode != E_OK) {
975 (void)SendDataAck(context, message, errCode, 0);
976 }
977 return errCode;
978 }
979
DataRequestRecv(SingleVerSyncTaskContext * context,const Message * message,WaterMark & pullEndWatermark)980 int SingleVerDataSync::DataRequestRecv(SingleVerSyncTaskContext *context, const Message *message,
981 WaterMark &pullEndWatermark)
982 {
983 int errCode = DataRequestRecvPre(context, message);
984 if (errCode != E_OK) {
985 return errCode;
986 }
987 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
988 const std::vector<SendDataItem> &data = packet->GetData();
989 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
990 LOGI("[DataSync][DataRequestRecv] curType=%d, remote ver=%" PRIu32 ", size=%zu, errCode=%d, queryId=%s,"
991 " Label=%s, dev=%s", static_cast<int>(curType), packet->GetVersion(), data.size(), packet->GetSendCode(),
992 STR_MASK(packet->GetQueryId()), label_.c_str(), STR_MASK(GetDeviceId()));
993 context->SetReceiveWaterMarkErr(false);
994 UpdateWaterMark isUpdateWaterMark;
995 SyncTimeRange dataTime = SingleVerDataSyncUtils::GetRecvDataTimeRange(curType, data, isUpdateWaterMark);
996 errCode = RemoveDeviceDataHandle(context, message, dataTime.endTime);
997 if (errCode != E_OK) {
998 return errCode;
999 }
1000 Metadata::MetaWaterMarkAutoLock autoLock(metadata_);
1001 if (WaterMarkErrHandle(curType, context, message)) {
1002 return E_OK;
1003 }
1004 GetPullEndWatermark(context, packet, pullEndWatermark);
1005 // save data first
1006 errCode = SaveData(context, data, curType, packet->GetQuery());
1007 if (errCode != E_OK) {
1008 (void)SendDataAck(context, message, errCode, dataTime.endTime);
1009 return errCode;
1010 }
1011 if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode
1012 pullEndWatermark = 0;
1013 errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime);
1014 } else {
1015 // if data is empty, we don't know the max timestap of this packet.
1016 errCode = SendDataAck(context, message, !data.empty() ? E_OK : WATER_MARK_INVALID, dataTime.endTime);
1017 }
1018 RemotePushFinished(packet->GetSendCode(), packet->GetMode(), message->GetSessionId(),
1019 context->GetRequestSessionId());
1020 if (curType != SyncType::QUERY_SYNC_TYPE && isUpdateWaterMark.normalUpdateMark) {
1021 UpdatePeerWaterMark(curType, "", context, dataTime.endTime + 1, 0);
1022 } else if (curType == SyncType::QUERY_SYNC_TYPE && packet->IsNeedUpdateWaterMark()) {
1023 UpdateQueryPeerWaterMark(curType, packet->GetQueryId(), dataTime, context, isUpdateWaterMark);
1024 }
1025 if (errCode != E_OK) {
1026 return errCode;
1027 }
1028 if (packet->GetSendCode() == SEND_FINISHED) {
1029 return -E_RECV_FINISHED;
1030 }
1031 return errCode;
1032 }
1033
SendDataPacket(SyncType syncType,const DataRequestPacket * packet,SingleVerSyncTaskContext * context)1034 int SingleVerDataSync::SendDataPacket(SyncType syncType, const DataRequestPacket *packet,
1035 SingleVerSyncTaskContext *context)
1036 {
1037 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1038 if (message == nullptr) {
1039 LOGE("[DataSync][SendDataPacket] new message error");
1040 delete packet;
1041 packet = nullptr;
1042 return -E_OUT_OF_MEMORY;
1043 }
1044 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1045 int errCode = message->SetExternalObject(packet);
1046 if (errCode != E_OK) {
1047 delete packet;
1048 packet = nullptr;
1049 delete message;
1050 message = nullptr;
1051 LOGE("[DataSync][SendDataPacket] set external object failed errCode=%d", errCode);
1052 return errCode;
1053 }
1054 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1055 context->GetSequenceId(), context->GetRequestSessionId());
1056 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1057 if (performance != nullptr) {
1058 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_DATA_SEND_REQUEST_TO_ACK_RECV);
1059 }
1060 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1061 context, message->GetSessionId());
1062 errCode = Send(context, message, handler, packetLen);
1063 if (errCode != E_OK) {
1064 delete message;
1065 message = nullptr;
1066 }
1067
1068 return errCode;
1069 }
1070
SendPullResponseDataPkt(int ackCode,SyncEntry & syncOutData,SingleVerSyncTaskContext * context)1071 int SingleVerDataSync::SendPullResponseDataPkt(int ackCode, SyncEntry &syncOutData,
1072 SingleVerSyncTaskContext *context)
1073 {
1074 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1075 if (packet == nullptr) {
1076 LOGE("[DataSync][SendPullResponseDataPkt] new data request packet error");
1077 return -E_OUT_OF_MEMORY;
1078 }
1079 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1080 FillDataRequestPacket(packet, context, syncOutData, ackCode, SyncModeType::RESPONSE_PULL);
1081 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1082 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1083 if (message == nullptr) {
1084 LOGE("[DataSync][SendPullResponseDataPkt] new message error");
1085 delete packet;
1086 packet = nullptr;
1087 return -E_OUT_OF_MEMORY;
1088 }
1089 int errCode = message->SetExternalObject(packet);
1090 if (errCode != E_OK) {
1091 delete packet;
1092 packet = nullptr;
1093 delete message;
1094 message = nullptr;
1095 LOGE("[SendPullResponseDataPkt] set external object failed, errCode=%d", errCode);
1096 return errCode;
1097 }
1098 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1099 context->GetSequenceId(), context->GetResponseSessionId());
1100 SendResetWatchDogPacket(context, packetLen);
1101 errCode = Send(context, message, nullptr, packetLen);
1102 if (errCode != E_OK) {
1103 delete message;
1104 message = nullptr;
1105 }
1106 return errCode;
1107 }
1108
SendFinishedDataAck(SingleVerSyncTaskContext * context,const Message * message)1109 void SingleVerDataSync::SendFinishedDataAck(SingleVerSyncTaskContext *context, const Message *message)
1110 {
1111 (void)SendDataAck(context, message, E_OK, 0);
1112 }
1113
SendDataAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,WaterMark maxSendDataTime)1114 int SingleVerDataSync::SendDataAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1115 WaterMark maxSendDataTime)
1116 {
1117 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1118 if (packet == nullptr) {
1119 return -E_INVALID_ARGS;
1120 }
1121 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1122 if (ackMessage == nullptr) {
1123 LOGE("[DataSync][SendDataAck] new message error");
1124 return -E_OUT_OF_MEMORY;
1125 }
1126 DataAckPacket ack;
1127 SetAckPacket(ack, context, packet, recvCode, maxSendDataTime);
1128 int errCode = ackMessage->SetCopiedObject(ack);
1129 if (errCode != E_OK) {
1130 delete ackMessage;
1131 ackMessage = nullptr;
1132 LOGE("[DataSync][SendDataAck] set copied object failed, errcode=%d", errCode);
1133 return errCode;
1134 }
1135 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1136 message->GetSequenceId(), message->GetSessionId());
1137
1138 errCode = Send(context, ackMessage, nullptr, 0);
1139 if (errCode != E_OK) {
1140 delete ackMessage;
1141 ackMessage = nullptr;
1142 }
1143 return errCode;
1144 }
1145
AckPacketIdCheck(const Message * message)1146 bool SingleVerDataSync::AckPacketIdCheck(const Message *message)
1147 {
1148 if (message == nullptr) {
1149 LOGE("[DataSync] AckRecv message nullptr");
1150 return false;
1151 }
1152 if (message->GetMessageType() == TYPE_NOTIFY || message->IsFeedbackError()) {
1153 return true;
1154 }
1155 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1156 if (packet == nullptr) {
1157 return false;
1158 }
1159 uint64_t packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1160 std::lock_guard<std::mutex> lock(lock_);
1161 uint32_t sequenceId = message->GetSequenceId();
1162 if (reSendMap_.count(sequenceId) != 0) {
1163 uint64_t originalPacketId = reSendMap_[sequenceId].packetId;
1164 if (DataAckPacket::IsPacketIdValid(packetId) && packetId != originalPacketId) {
1165 LOGE("[DataSync] packetId[%" PRIu64 "] is not match with original[%" PRIu64 "]", packetId,
1166 originalPacketId);
1167 return false;
1168 }
1169 }
1170 return true;
1171 }
1172
AckRecv(SingleVerSyncTaskContext * context,const Message * message)1173 int SingleVerDataSync::AckRecv(SingleVerSyncTaskContext *context, const Message *message)
1174 {
1175 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1176 if (errCode != E_OK) {
1177 return errCode;
1178 }
1179 const DataAckPacket *packet = message->GetObject<DataAckPacket>();
1180 if (packet == nullptr) {
1181 return -E_INVALID_ARGS;
1182 }
1183 int32_t recvCode = packet->GetRecvCode();
1184 LOGD("[DataSync][AckRecv] ver=%u,recvCode=%d,myversion=%u,label=%s,dev=%s", packet->GetVersion(), recvCode,
1185 SOFTWARE_VERSION_CURRENT, label_.c_str(), STR_MASK(GetDeviceId()));
1186 if (recvCode == -E_VERSION_NOT_SUPPORT) {
1187 LOGE("[DataSync][AckRecv] Version mismatch");
1188 return -E_VERSION_NOT_SUPPORT;
1189 }
1190
1191 if (recvCode == -E_NEED_ABILITY_SYNC || recvCode == -E_NOT_PERMIT) {
1192 // we should ReleaseContinueToken, avoid crash
1193 LOGI("[DataSync][AckRecv] Data sync abort,recvCode =%d,label =%s,dev=%s", recvCode, label_.c_str(),
1194 STR_MASK(GetDeviceId()));
1195 context->ReleaseContinueToken();
1196 return recvCode;
1197 }
1198 uint64_t data = packet->GetData();
1199 if (recvCode == LOCAL_WATER_MARK_NOT_INIT) {
1200 return DealWaterMarkException(context, data, packet->GetReserved());
1201 }
1202
1203 if (recvCode == -E_SAVE_DATA_NOTIFY && data != 0) {
1204 // data only use low 32bit
1205 context->StartFeedDogForSync(static_cast<uint32_t>(data), SyncDirectionFlag::RECEIVE);
1206 LOGI("[DataSync][AckRecv] notify ResetWatchDog=%" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1207 STR_MASK(GetDeviceId()));
1208 }
1209
1210 if (recvCode != E_OK && recvCode != WATER_MARK_INVALID) {
1211 LOGW("[DataSync][AckRecv] Received a uncatched recvCode=%d,label=%s,dev=%s", recvCode,
1212 label_.c_str(), STR_MASK(GetDeviceId()));
1213 return recvCode;
1214 }
1215
1216 // Judge if send finished
1217 ContinueToken token;
1218 context->GetContinueToken(token);
1219 if (((message->GetSessionId() == context->GetResponseSessionId()) ||
1220 (message->GetSessionId() == context->GetRequestSessionId())) && (token == nullptr)) {
1221 return -E_NO_DATA_SEND;
1222 }
1223
1224 // send next data
1225 return -E_SEND_DATA;
1226 }
1227
SendSaveDataNotifyPacket(SingleVerSyncTaskContext * context,uint32_t pktVersion,uint32_t sessionId,uint32_t sequenceId,uint32_t inMsgId)1228 void SingleVerDataSync::SendSaveDataNotifyPacket(SingleVerSyncTaskContext *context, uint32_t pktVersion,
1229 uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId)
1230 {
1231 if (inMsgId != DATA_SYNC_MESSAGE && inMsgId != QUERY_SYNC_MESSAGE) {
1232 LOGE("[SingleVerDataSync] messageId not available.");
1233 return;
1234 }
1235 Message *ackMessage = new (std::nothrow) Message(inMsgId);
1236 if (ackMessage == nullptr) {
1237 LOGE("[DataSync][SaveDataNotify] new message failed");
1238 return;
1239 }
1240
1241 DataAckPacket ack;
1242 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1243 ack.SetVersion(pktVersion);
1244 int errCode = ackMessage->SetCopiedObject(ack);
1245 if (errCode != E_OK) {
1246 delete ackMessage;
1247 ackMessage = nullptr;
1248 LOGE("[DataSync][SendSaveDataNotifyPacket] set copied object failed,errcode=%d", errCode);
1249 return;
1250 }
1251 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(), sequenceId, sessionId);
1252
1253 errCode = Send(context, ackMessage, nullptr, 0);
1254 if (errCode != E_OK) {
1255 delete ackMessage;
1256 ackMessage = nullptr;
1257 }
1258 LOGD("[DataSync][SaveDataNotify] Send SaveDataNotify packet Finished,errcode=%d,label=%s,dev=%s",
1259 errCode, label_.c_str(), STR_MASK(GetDeviceId()));
1260 }
1261
GetPullEndWatermark(const SingleVerSyncTaskContext * context,const DataRequestPacket * packet,WaterMark & pullEndWatermark) const1262 void SingleVerDataSync::GetPullEndWatermark(const SingleVerSyncTaskContext *context, const DataRequestPacket *packet,
1263 WaterMark &pullEndWatermark) const
1264 {
1265 if (packet == nullptr) {
1266 return;
1267 }
1268 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1269 if ((mode == SyncModeType::PULL) || (mode == SyncModeType::PUSH_AND_PULL)) {
1270 WaterMark endMark = packet->GetEndWaterMark();
1271 TimeOffset offset;
1272 metadata_->GetTimeOffset(context->GetDeviceId(), offset);
1273 pullEndWatermark = endMark - static_cast<WaterMark>(offset);
1274 LOGD("[DataSync][PullEndWatermark] packetEndMark=%" PRIu64 ",offset=%" PRId64 ",endWaterMark=%" PRIu64 ","
1275 "label=%s,dev=%s", endMark, offset, pullEndWatermark, label_.c_str(), STR_MASK(GetDeviceId()));
1276 }
1277 }
1278
DealWaterMarkException(SingleVerSyncTaskContext * context,WaterMark ackWaterMark,const std::vector<uint64_t> & reserved)1279 int SingleVerDataSync::DealWaterMarkException(SingleVerSyncTaskContext *context, WaterMark ackWaterMark,
1280 const std::vector<uint64_t> &reserved)
1281 {
1282 WaterMark deletedWaterMark = 0;
1283 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1284 if (curType == SyncType::QUERY_SYNC_TYPE) {
1285 if (reserved.size() <= ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK) {
1286 LOGE("[DataSync] get packet reserve size failed");
1287 return -E_INVALID_ARGS;
1288 }
1289 deletedWaterMark = reserved[ACK_PACKET_RESERVED_INDEX_DELETE_WATER_MARK];
1290 }
1291 LOGI("[DataSync][WaterMarkException] AckRecv water error, mark=%" PRIu64 ",deleteMark=%" PRIu64 ","
1292 "label=%s,dev=%s", ackWaterMark, deletedWaterMark, label_.c_str(), STR_MASK(GetDeviceId()));
1293 int errCode = SaveLocalWaterMark(curType, context,
1294 {0, 0, ackWaterMark, deletedWaterMark});
1295 if (errCode != E_OK) {
1296 return errCode;
1297 }
1298 context->SetRetryStatus(SyncTaskContext::NEED_RETRY);
1299 context->IncNegotiationCount();
1300 SingleVerDataSyncUtils::PushAndPullKeyRevokHandle(context);
1301 if (!context->IsNeedClearRemoteStaleData()) {
1302 return -E_RE_SEND_DATA;
1303 }
1304 errCode = DealRemoveDeviceDataByAck(context, ackWaterMark, reserved);
1305 if (errCode != E_OK) {
1306 return errCode;
1307 }
1308 return -E_RE_SEND_DATA;
1309 }
1310
RunPermissionCheck(SingleVerSyncTaskContext * context,const Message * message,const DataRequestPacket * packet)1311 int SingleVerDataSync::RunPermissionCheck(SingleVerSyncTaskContext *context, const Message *message,
1312 const DataRequestPacket *packet)
1313 {
1314 int mode = SyncOperation::TransferSyncMode(packet->GetMode());
1315 int errCode = SingleVerDataSyncUtils::RunPermissionCheck(context, storage_, label_, packet);
1316 if (errCode != E_OK) {
1317 if (context->GetRemoteSoftwareVersion() > SOFTWARE_VERSION_EARLIEST) { // ver 101 can't handle this errCode
1318 (void)SendDataAck(context, message, -E_NOT_PERMIT, 0);
1319 }
1320 return -E_NOT_PERMIT;
1321 }
1322 const std::vector<SendDataItem> &data = packet->GetData();
1323 WaterMark maxSendDataTime = SingleVerDataSyncUtils::GetMaxSendDataTime(data);
1324 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1325 if (version > SOFTWARE_VERSION_RELEASE_2_0 && (mode != SyncModeType::PULL) &&
1326 !context->GetReceivcPermitCheck()) {
1327 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1328 if (permitReceive) {
1329 context->SetReceivcPermitCheck(true);
1330 } else {
1331 (void)SendDataAck(context, message, -E_SECURITY_OPTION_CHECK_ERROR, maxSendDataTime);
1332 return -E_SECURITY_OPTION_CHECK_ERROR;
1333 }
1334 }
1335 return errCode;
1336 }
1337
1338 // used in pull response
SendResetWatchDogPacket(SingleVerSyncTaskContext * context,uint32_t packetLen)1339 void SingleVerDataSync::SendResetWatchDogPacket(SingleVerSyncTaskContext *context, uint32_t packetLen)
1340 {
1341 // When mtu less than 30k, we send data with bluetooth
1342 // In order not to block the bluetooth channel, we don't send notify packet here
1343 if (mtuSize_ >= packetLen || mtuSize_ < NOTIFY_MIN_MTU_SIZE) {
1344 return;
1345 }
1346 uint64_t data = static_cast<uint64_t>(packetLen) * static_cast<uint64_t>(context->GetTimeoutTime()) / mtuSize_;
1347
1348 Message *ackMessage = new (std::nothrow) Message(DATA_SYNC_MESSAGE);
1349 if (ackMessage == nullptr) {
1350 LOGE("[DataSync][ResetWatchDog] new message failed");
1351 return;
1352 }
1353
1354 DataAckPacket ack;
1355 ack.SetData(data);
1356 ack.SetRecvCode(-E_SAVE_DATA_NOTIFY);
1357 ack.SetVersion(std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT));
1358 int errCode = ackMessage->SetCopiedObject(ack);
1359 if (errCode != E_OK) {
1360 delete ackMessage;
1361 ackMessage = nullptr;
1362 LOGE("[DataSync][ResetWatchDog] set copied object failed, errcode=%d", errCode);
1363 return;
1364 }
1365 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_NOTIFY, context->GetDeviceId(),
1366 context->GetSequenceId(), context->GetResponseSessionId());
1367
1368 errCode = Send(context, ackMessage, nullptr, 0);
1369 if (errCode != E_OK) {
1370 delete ackMessage;
1371 ackMessage = nullptr;
1372 LOGE("[DataSync][ResetWatchDog] Send packet failed,errcode=%d,label=%s,dev=%s", errCode, label_.c_str(),
1373 STR_MASK(GetDeviceId()));
1374 } else {
1375 LOGI("[DataSync][ResetWatchDog] data = %" PRIu64 ",label=%s,dev=%s", data, label_.c_str(),
1376 STR_MASK(GetDeviceId()));
1377 }
1378 }
1379
ReSend(SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1380 int32_t SingleVerDataSync::ReSend(SingleVerSyncTaskContext *context, DataSyncReSendInfo reSendInfo)
1381 {
1382 if (context == nullptr) {
1383 return -E_INVALID_ARGS;
1384 }
1385 SyncEntry syncData;
1386 int errCode = GetReSendData(syncData, context, reSendInfo);
1387 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1388 return errCode;
1389 }
1390 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1391 DataRequestPacket *packet = new (std::nothrow) DataRequestPacket;
1392 if (packet == nullptr) {
1393 LOGE("[DataSync][ReSend] new DataRequestPacket error");
1394 return -E_OUT_OF_MEMORY;
1395 }
1396 FillRequestReSendPacket(context, packet, reSendInfo, syncData, errCode);
1397 errCode = SendReSendPacket(packet, context, reSendInfo.sessionId, reSendInfo.sequenceId);
1398 if (errCode == E_OK && SyncOperation::TransferSyncMode(context->GetMode()) != SyncModeType::PULL) {
1399 // resend.end may not update in localwatermark while E_TIMEOUT occurred in send message last time.
1400 SyncTimeRange dataTime {reSendInfo.start, reSendInfo.deleteDataStart, reSendInfo.end, reSendInfo.deleteDataEnd};
1401 if (reSendInfo.deleteDataEnd > reSendInfo.deleteDataStart && curType == SyncType::QUERY_SYNC_TYPE) {
1402 dataTime.deleteEndTime += 1;
1403 }
1404 if (reSendInfo.end > reSendInfo.start) {
1405 dataTime.endTime += 1;
1406 }
1407 errCode = SaveLocalWaterMark(curType, context, dataTime, true);
1408 if (errCode != E_OK) {
1409 LOGE("[DataSync][ReSend] SaveLocalWaterMark failed.");
1410 }
1411 }
1412 return errCode;
1413 }
1414
SendReSendPacket(const DataRequestPacket * packet,SingleVerSyncTaskContext * context,uint32_t sessionId,uint32_t sequenceId)1415 int SingleVerDataSync::SendReSendPacket(const DataRequestPacket *packet, SingleVerSyncTaskContext *context,
1416 uint32_t sessionId, uint32_t sequenceId)
1417 {
1418 SyncType syncType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1419 Message *message = new (std::nothrow) Message(SingleVerDataSyncUtils::GetMessageId(syncType));
1420 if (message == nullptr) {
1421 LOGE("[DataSync][SendDataPacket] new message error");
1422 delete packet;
1423 packet = nullptr;
1424 return -E_OUT_OF_MEMORY;
1425 }
1426 uint32_t packetLen = packet->CalculateLen(SingleVerDataSyncUtils::GetMessageId(syncType));
1427 int errCode = message->SetExternalObject(packet);
1428 if (errCode != E_OK) {
1429 delete packet;
1430 packet = nullptr;
1431 delete message;
1432 message = nullptr;
1433 LOGE("[DataSync][SendReSendPacket] SetExternalObject failed errCode=%d", errCode);
1434 return errCode;
1435 }
1436 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(), sequenceId, sessionId);
1437 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1438 context, message->GetSessionId());
1439 errCode = Send(context, message, handler, packetLen);
1440 if (errCode != E_OK) {
1441 delete message;
1442 message = nullptr;
1443 }
1444 return errCode;
1445 }
1446
CheckPermitSendData(int inMode,SingleVerSyncTaskContext * context)1447 int SingleVerDataSync::CheckPermitSendData(int inMode, SingleVerSyncTaskContext *context)
1448 {
1449 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1450 int mode = SyncOperation::TransferSyncMode(inMode);
1451 // for pull mode it just need to get data, no need to send data.
1452 if (version <= SOFTWARE_VERSION_RELEASE_2_0 || mode == SyncModeType::PULL) {
1453 return E_OK;
1454 }
1455 if (context->GetSendPermitCheck()) {
1456 return E_OK;
1457 }
1458 bool isPermitSync = true;
1459 std::string deviceId = context->GetDeviceId();
1460 SecurityOption remoteSecOption = context->GetRemoteSeccurityOption();
1461 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL || mode == SyncModeType::RESPONSE_PULL) {
1462 isPermitSync = SingleVerDataSyncUtils::IsPermitRemoteDeviceRecvData(deviceId, remoteSecOption, storage_);
1463 }
1464 LOGI("[DataSync][PermitSendData] mode=%d,dev=%s,label=%d,flag=%d,PermitSync=%d", mode, STR_MASK(deviceId_),
1465 remoteSecOption.securityLabel, remoteSecOption.securityFlag, isPermitSync);
1466 if (isPermitSync) {
1467 context->SetSendPermitCheck(true);
1468 return E_OK;
1469 }
1470 if (mode == SyncModeType::PUSH || mode == SyncModeType::PUSH_AND_PULL) {
1471 context->SetTaskErrCode(-E_SECURITY_OPTION_CHECK_ERROR);
1472 return -E_SECURITY_OPTION_CHECK_ERROR;
1473 }
1474 if (mode == SyncModeType::RESPONSE_PULL) {
1475 SyncEntry syncData;
1476 (void)SendPullResponseDataPkt(-E_SECURITY_OPTION_CHECK_ERROR, syncData, context);
1477 return -E_SECURITY_OPTION_CHECK_ERROR;
1478 }
1479 if (mode == SyncModeType::SUBSCRIBE_QUERY) {
1480 return -E_SECURITY_OPTION_CHECK_ERROR;
1481 }
1482 return E_OK;
1483 }
1484
GetLabel() const1485 std::string SingleVerDataSync::GetLabel() const
1486 {
1487 return label_;
1488 }
1489
GetDeviceId() const1490 std::string SingleVerDataSync::GetDeviceId() const
1491 {
1492 return deviceId_;
1493 }
1494
WaterMarkErrHandle(SyncType syncType,SingleVerSyncTaskContext * context,const Message * message)1495 bool SingleVerDataSync::WaterMarkErrHandle(SyncType syncType, SingleVerSyncTaskContext *context, const Message *message)
1496 {
1497 const DataRequestPacket *packet = message->GetObject<DataRequestPacket>();
1498 if (packet == nullptr) {
1499 LOGE("[WaterMarkErrHandle] get packet object failed");
1500 return -E_INVALID_ARGS;
1501 }
1502 WaterMark packetLocalMark = packet->GetLocalWaterMark();
1503 WaterMark packetDeletedMark = packet->GetDeletedWaterMark();
1504 WaterMark peerMark = 0;
1505 WaterMark deletedMark = 0;
1506 GetPeerWaterMark(syncType, packet->GetQueryId(), context->GetDeviceId(), peerMark);
1507 if (syncType == SyncType::QUERY_SYNC_TYPE) {
1508 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedMark);
1509 }
1510 if (syncType != SyncType::QUERY_SYNC_TYPE && packetLocalMark > peerMark) {
1511 LOGI("[DataSync][DataRequestRecv] packetLocalMark=%" PRIu64 ",current=%" PRIu64, packetLocalMark, peerMark);
1512 context->SetReceiveWaterMarkErr(true);
1513 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1514 return true;
1515 }
1516 if (syncType == SyncType::QUERY_SYNC_TYPE && (packetLocalMark > peerMark || packetDeletedMark > deletedMark)) {
1517 LOGI("[DataSync][DataRequestRecv] packetDeletedMark=%" PRIu64 ",deletedMark=%" PRIu64 ","
1518 "packetLocalMark=%" PRIu64 ",peerMark=%" PRIu64, packetDeletedMark, deletedMark, packetLocalMark,
1519 peerMark);
1520 context->SetReceiveWaterMarkErr(true);
1521 (void)SendDataAck(context, message, LOCAL_WATER_MARK_NOT_INIT, 0);
1522 return true;
1523 }
1524 return false;
1525 }
1526
CheckSchemaStrategy(SingleVerSyncTaskContext * context,const Message * message)1527 int SingleVerDataSync::CheckSchemaStrategy(SingleVerSyncTaskContext *context, const Message *message)
1528 {
1529 auto *packet = message->GetObject<DataRequestPacket>();
1530 if (packet == nullptr) {
1531 return -E_INVALID_ARGS;
1532 }
1533 auto query = packet->GetQuery();
1534 std::pair<bool, bool> schemaSyncStatus = context->GetSchemaSyncStatus(query);
1535 if (!schemaSyncStatus.second) {
1536 LOGE("[DataSync][CheckSchemaStrategy] isSchemaSync=%d check failed", schemaSyncStatus.second);
1537 (void)SendDataAck(context, message, -E_NEED_ABILITY_SYNC, 0);
1538 return -E_NEED_ABILITY_SYNC;
1539 }
1540 if (!schemaSyncStatus.first) {
1541 LOGE("[DataSync][CheckSchemaStrategy] Strategy permitSync=%d check failed", schemaSyncStatus.first);
1542 (void)SendDataAck(context, message, -E_SCHEMA_MISMATCH, 0);
1543 return -E_SCHEMA_MISMATCH;
1544 }
1545 return E_OK;
1546 }
1547
RemotePushFinished(int sendCode,int inMode,uint32_t msgSessionId,uint32_t contextSessionId)1548 void SingleVerDataSync::RemotePushFinished(int sendCode, int inMode, uint32_t msgSessionId, uint32_t contextSessionId)
1549 {
1550 int mode = SyncOperation::TransferSyncMode(inMode);
1551 if ((mode != SyncModeType::PUSH) && (mode != SyncModeType::PUSH_AND_PULL) && (mode != SyncModeType::QUERY_PUSH) &&
1552 (mode != SyncModeType::QUERY_PUSH_PULL)) {
1553 return;
1554 }
1555
1556 if ((sendCode == E_OK) && (msgSessionId != 0) && (msgSessionId != contextSessionId)) {
1557 storage_->NotifyRemotePushFinished(deviceId_);
1558 }
1559 }
1560
SetAckPacket(DataAckPacket & ackPacket,SingleVerSyncTaskContext * context,const DataRequestPacket * packet,int32_t recvCode,WaterMark maxSendDataTime)1561 void SingleVerDataSync::SetAckPacket(DataAckPacket &ackPacket, SingleVerSyncTaskContext *context,
1562 const DataRequestPacket *packet, int32_t recvCode, WaterMark maxSendDataTime)
1563 {
1564 SyncType curType = SyncOperation::GetSyncType(packet->GetMode());
1565 WaterMark localMark = 0;
1566 GetLocalWaterMark(curType, packet->GetQueryId(), context, localMark);
1567 ackPacket.SetRecvCode(recvCode);
1568 // send ack packet
1569 if ((recvCode == E_OK) && (maxSendDataTime != 0)) {
1570 ackPacket.SetData(maxSendDataTime + 1); // + 1 to next start
1571 } else if (recvCode != WATER_MARK_INVALID) {
1572 WaterMark mark = 0;
1573 GetPeerWaterMark(curType, packet->GetQueryId(), context->GetDeviceId(), mark);
1574 ackPacket.SetData(mark);
1575 }
1576 std::vector<uint64_t> reserved {localMark};
1577 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1578 uint64_t packetId = 0;
1579 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1580 packetId = packet->GetPacketId(); // above 102 version data request reserve[0] store packetId value
1581 }
1582 if (version > SOFTWARE_VERSION_RELEASE_2_0 && packetId > 0) {
1583 reserved.push_back(packetId);
1584 }
1585 // while recv is not E_OK, data is peerMark, reserve[2] is deletedPeerMark value
1586 if (curType == SyncType::QUERY_SYNC_TYPE && recvCode != WATER_MARK_INVALID) {
1587 WaterMark deletedPeerMark;
1588 GetPeerDeleteSyncWaterMark(context->GetDeleteSyncId(), deletedPeerMark);
1589 reserved.push_back(deletedPeerMark); // query sync mode, reserve[2] store deletedPeerMark value
1590 }
1591 ackPacket.SetReserved(reserved);
1592 ackPacket.SetVersion(version);
1593 }
1594
GetReSendData(SyncEntry & syncData,SingleVerSyncTaskContext * context,DataSyncReSendInfo reSendInfo)1595 int SingleVerDataSync::GetReSendData(SyncEntry &syncData, SingleVerSyncTaskContext *context,
1596 DataSyncReSendInfo reSendInfo)
1597 {
1598 int mode = SyncOperation::TransferSyncMode(context->GetMode());
1599 if (mode == SyncModeType::PULL) {
1600 return E_OK;
1601 }
1602 ContinueToken token = nullptr;
1603 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1604 size_t packetSize = (version > SOFTWARE_VERSION_RELEASE_2_0) ?
1605 DBConstant::MAX_HPMODE_PACK_ITEM_SIZE : DBConstant::MAX_NORMAL_PACK_ITEM_SIZE;
1606 DataSizeSpecInfo reSendDataSizeInfo = GetDataSizeSpecInfo(packetSize);
1607 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1608 int errCode;
1609 if (curType != SyncType::QUERY_SYNC_TYPE) {
1610 errCode = storage_->GetSyncData(reSendInfo.start, reSendInfo.end + 1, syncData.entries, token,
1611 reSendDataSizeInfo);
1612 } else {
1613 QuerySyncObject queryObj = context->GetQuery();
1614 errCode = storage_->GetSyncData(queryObj, SyncTimeRange { reSendInfo.start, reSendInfo.deleteDataStart,
1615 reSendInfo.end + 1, reSendInfo.deleteDataEnd + 1 }, reSendDataSizeInfo, token, syncData.entries);
1616 }
1617 if (token != nullptr) {
1618 storage_->ReleaseContinueToken(token);
1619 }
1620 if (errCode == -E_BUSY || errCode == -E_EKEYREVOKED) {
1621 context->SetTaskErrCode(errCode);
1622 return errCode;
1623 }
1624 if (!SingleVerDataSyncUtils::IsGetDataSuccessfully(errCode)) {
1625 return errCode;
1626 }
1627
1628 int innerCode = InterceptData(syncData);
1629 if (innerCode != E_OK) {
1630 context->SetTaskErrCode(innerCode);
1631 return innerCode;
1632 }
1633
1634 bool needCompressOnSync = false;
1635 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1636 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1637 CompressAlgorithm remoteAlgo = context->ChooseCompressAlgo();
1638 if (needCompressOnSync && remoteAlgo != CompressAlgorithm::NONE) {
1639 int compressCode = GenericSingleVerKvEntry::Compress(syncData.entries, syncData.compressedEntries,
1640 { remoteAlgo, version });
1641 if (compressCode != E_OK) {
1642 return compressCode;
1643 }
1644 }
1645 return errCode;
1646 }
1647
RemoveDeviceDataIfNeed(SingleVerSyncTaskContext * context)1648 int SingleVerDataSync::RemoveDeviceDataIfNeed(SingleVerSyncTaskContext *context)
1649 {
1650 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_RELEASE_3_0) {
1651 return E_OK;
1652 }
1653 uint64_t clearRemoteDataMark = 0;
1654 std::lock_guard<std::mutex> autoLock(removeDeviceDataLock_);
1655 metadata_->GetRemoveDataMark(context->GetDeviceId(), clearRemoteDataMark);
1656 if (clearRemoteDataMark == 0) {
1657 return E_OK;
1658 }
1659 int errCode = E_OK;
1660 if (context->IsNeedClearRemoteStaleData() && clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1661 errCode = storage_->RemoveDeviceData(context->GetDeviceId(), true);
1662 if (errCode != E_OK) {
1663 LOGE("clear remote %s data failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1664 return errCode;
1665 }
1666 }
1667 if (clearRemoteDataMark == REMOVE_DEVICE_DATA_MARK) {
1668 errCode = metadata_->ResetMetaDataAfterRemoveData(context->GetDeviceId());
1669 if (errCode != E_OK) {
1670 LOGE("set %s removeDataWaterMark to false failed,errCode=%d", STR_MASK(GetDeviceId()), errCode);
1671 return errCode;
1672 }
1673 }
1674 return E_OK;
1675 }
1676
UpdateMtuSize()1677 void SingleVerDataSync::UpdateMtuSize()
1678 {
1679 mtuSize_ = communicateHandle_->GetCommunicatorMtuSize(deviceId_) * 9 / 10; // get the 9/10 of the size
1680 }
1681
FillRequestReSendPacket(const SingleVerSyncTaskContext * context,DataRequestPacket * packet,DataSyncReSendInfo reSendInfo,SyncEntry & syncData,int sendCode)1682 void SingleVerDataSync::FillRequestReSendPacket(const SingleVerSyncTaskContext *context, DataRequestPacket *packet,
1683 DataSyncReSendInfo reSendInfo, SyncEntry &syncData, int sendCode)
1684 {
1685 SyncType curType = (context->IsQuerySync()) ? SyncType::QUERY_SYNC_TYPE : SyncType::MANUAL_FULL_SYNC_TYPE;
1686 WaterMark peerMark = 0;
1687 GetPeerWaterMark(curType, context->GetQuerySyncId(), context->GetDeviceId(),
1688 peerMark);
1689 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1690 // transfer reSend mode, RESPONSE_PULL transfer to push or query push
1691 // PUSH_AND_PULL mode which sequenceId lager than first transfer to push or query push
1692 int reSendMode = SingleVerDataSyncUtils::GetReSendMode(context->GetMode(), reSendInfo.sequenceId, curType);
1693 if (GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) ||
1694 SyncOperation::TransferSyncMode(context->GetMode()) == SyncModeType::PULL) {
1695 LOGI("[DataSync][ReSend] set lastid,label=%s,dev=%s", label_.c_str(), STR_MASK(GetDeviceId()));
1696 packet->SetLastSequence();
1697 }
1698 if (sendCode == E_OK && GetSessionEndTimestamp() == std::max(reSendInfo.end, reSendInfo.deleteDataEnd) &&
1699 context->GetMode() == SyncModeType::RESPONSE_PULL) {
1700 sendCode = SEND_FINISHED;
1701 }
1702 packet->SetData(syncData.entries);
1703 packet->SetCompressData(syncData.compressedEntries);
1704 packet->SetBasicInfo(sendCode, version, reSendMode);
1705 packet->SetExtraConditions(RuntimeContext::GetInstance()->GetPermissionCheckParam(storage_->GetDbProperties()));
1706 packet->SetWaterMark(reSendInfo.start, peerMark, reSendInfo.deleteDataStart);
1707 if (SyncOperation::TransferSyncMode(reSendMode) != SyncModeType::PUSH) {
1708 packet->SetEndWaterMark(context->GetEndMark());
1709 packet->SetQuery(context->GetQuery());
1710 }
1711 packet->SetQueryId(context->GetQuerySyncId());
1712 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
1713 std::vector<uint64_t> reserved {reSendInfo.packetId};
1714 packet->SetReserved(reserved);
1715 }
1716 if (reSendMode == SyncModeType::PULL) {
1717 // resend pull packet dont set compress type
1718 return;
1719 }
1720 bool needCompressOnSync = false;
1721 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1722 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1723 CompressAlgorithm curAlgo = context->ChooseCompressAlgo();
1724 if (needCompressOnSync && curAlgo != CompressAlgorithm::NONE) {
1725 packet->SetCompressDataMark();
1726 packet->SetCompressAlgo(curAlgo);
1727 }
1728 }
1729
GetDataSizeSpecInfo(size_t packetSize)1730 DataSizeSpecInfo SingleVerDataSync::GetDataSizeSpecInfo(size_t packetSize)
1731 {
1732 bool needCompressOnSync = false;
1733 uint8_t compressionRate = DBConstant::DEFAULT_COMPTRESS_RATE;
1734 (void)storage_->GetCompressionOption(needCompressOnSync, compressionRate);
1735 uint32_t blockSize = std::min(static_cast<uint32_t>(DBConstant::MAX_SYNC_BLOCK_SIZE),
1736 mtuSize_ * 100 / compressionRate); // compressionRate max is 100
1737 return {blockSize, packetSize};
1738 }
1739
InterceptData(SyncEntry & syncEntry)1740 int SingleVerDataSync::InterceptData(SyncEntry &syncEntry)
1741 {
1742 if (storage_ == nullptr) {
1743 LOGE("Invalid DB. Can not intercept data.");
1744 return -E_INVALID_DB;
1745 }
1746
1747 // GetLocalDeviceName get local device ID.
1748 // GetDeviceId get remote device ID.
1749 // If intercept data fail, entries will be released.
1750 return storage_->InterceptData(syncEntry.entries, GetLocalDeviceName(), GetDeviceId());
1751 }
1752
ControlCmdStart(SingleVerSyncTaskContext * context)1753 int SingleVerDataSync::ControlCmdStart(SingleVerSyncTaskContext *context)
1754 {
1755 if (context == nullptr) {
1756 return -E_INVALID_ARGS;
1757 }
1758 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1759 if (subManager == nullptr) {
1760 return -E_INVALID_ARGS;
1761 }
1762 int errCode = ControlCmdStartCheck(context);
1763 if (errCode != E_OK) {
1764 return errCode;
1765 }
1766 ControlRequestPacket* packet = new (std::nothrow) SubscribeRequest();
1767 if (packet == nullptr) {
1768 LOGE("[DataSync][ControlCmdStart] new SubscribeRequest error");
1769 return -E_OUT_OF_MEMORY;
1770 }
1771 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1772 errCode = subManager->ReserveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1773 if (errCode != E_OK) {
1774 LOGE("[DataSync][ControlCmdStart] reserve local subscribe query failed,err=%d", errCode);
1775 delete packet;
1776 packet = nullptr;
1777 return errCode;
1778 }
1779 }
1780 SingleVerDataSyncUtils::FillControlRequestPacket(packet, context);
1781 errCode = SendControlPacket(packet, context);
1782 if (errCode != E_OK && context->GetMode() == SyncModeType::SUBSCRIBE_QUERY) {
1783 subManager->DeleteLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1784 }
1785 return errCode;
1786 }
1787
ControlCmdRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1788 int SingleVerDataSync::ControlCmdRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1789 {
1790 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1791 if (packet == nullptr) {
1792 return -E_INVALID_ARGS;
1793 }
1794 LOGI("[SingleVerDataSync] recv control cmd message,label=%s,dev=%s,controlType=%u", label_.c_str(),
1795 STR_MASK(GetDeviceId()), packet->GetcontrolCmdType());
1796 int errCode = ControlCmdRequestRecvPre(context, message);
1797 if (errCode != E_OK) {
1798 return errCode;
1799 }
1800 if (packet->GetcontrolCmdType() == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1801 errCode = SubscribeRequestRecv(context, message);
1802 } else if (packet->GetcontrolCmdType() == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1803 errCode = UnsubscribeRequestRecv(context, message);
1804 }
1805 return errCode;
1806 }
1807
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)1808 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
1809 {
1810 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
1811 if (subManager == nullptr) {
1812 return -E_INVALID_ARGS;
1813 }
1814 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
1815 if (errCode != E_OK) {
1816 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1817 return errCode;
1818 }
1819 const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
1820 if (packet == nullptr) {
1821 return -E_INVALID_ARGS;
1822 }
1823 int32_t recvCode = packet->GetRecvCode();
1824 uint32_t cmdType = packet->GetcontrolCmdType();
1825 if (recvCode != E_OK) {
1826 LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
1827 STR_MASK(GetDeviceId()), cmdType);
1828 // for unsubscribe no need to do something
1829 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
1830 return recvCode;
1831 }
1832 if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1833 errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1834 } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
1835 subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
1836 }
1837 if (errCode != E_OK) {
1838 LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
1839 return errCode;
1840 }
1841 return -E_NO_DATA_SEND; // means control msg send finished
1842 }
1843
ControlCmdStartCheck(SingleVerSyncTaskContext * context)1844 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
1845 {
1846 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
1847 (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
1848 LOGE("[ControlCmdStartCheck] not support controlCmd");
1849 return -E_INVALID_ARGS;
1850 }
1851 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
1852 context->GetQuery().HasInKeys() &&
1853 context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
1854 return -E_NOT_SUPPORT;
1855 }
1856 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
1857 return E_OK;
1858 }
1859 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
1860 if (permitReceive) {
1861 context->SetReceivcPermitCheck(true);
1862 } else {
1863 return -E_SECURITY_OPTION_CHECK_ERROR;
1864 }
1865 return E_OK;
1866 }
1867
SendControlPacket(const ControlRequestPacket * packet,SingleVerSyncTaskContext * context)1868 int SingleVerDataSync::SendControlPacket(const ControlRequestPacket *packet, SingleVerSyncTaskContext *context)
1869 {
1870 Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
1871 if (message == nullptr) {
1872 LOGE("[DataSync][SendControlPacket] new message error");
1873 delete packet;
1874 packet = nullptr;
1875 return -E_OUT_OF_MEMORY;
1876 }
1877 uint32_t packetLen = packet->CalculateLen();
1878 int errCode = message->SetExternalObject(packet);
1879 if (errCode != E_OK) {
1880 delete packet;
1881 packet = nullptr;
1882 delete message;
1883 message = nullptr;
1884 LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
1885 return errCode;
1886 }
1887 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
1888 context->GetSequenceId(), context->GetRequestSessionId());
1889 CommErrHandler handler = std::bind(&SyncTaskContext::CommErrHandlerFunc, std::placeholders::_1,
1890 context, message->GetSessionId());
1891 errCode = Send(context, message, handler, packetLen);
1892 if (errCode != E_OK) {
1893 delete message;
1894 message = nullptr;
1895 }
1896 return errCode;
1897 }
1898
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)1899 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
1900 uint32_t controlCmdType, const CommErrHandler &handler)
1901 {
1902 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
1903 if (ackMessage == nullptr) {
1904 LOGE("[DataSync][SendControlAck] new message error");
1905 return -E_OUT_OF_MEMORY;
1906 }
1907 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
1908 ControlAckPacket ack;
1909 ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
1910 int errCode = ackMessage->SetCopiedObject(ack);
1911 if (errCode != E_OK) {
1912 delete ackMessage;
1913 ackMessage = nullptr;
1914 LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
1915 return errCode;
1916 }
1917 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
1918 message->GetSequenceId(), message->GetSessionId());
1919 errCode = Send(context, ackMessage, handler, 0);
1920 if (errCode != E_OK) {
1921 delete ackMessage;
1922 ackMessage = nullptr;
1923 }
1924 return errCode;
1925 }
1926
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)1927 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
1928 {
1929 if (context == nullptr || message == nullptr) {
1930 return -E_INVALID_ARGS;
1931 }
1932 const ControlRequestPacket *packet = message->GetObject<ControlRequestPacket>();
1933 if (packet == nullptr) {
1934 return -E_INVALID_ARGS;
1935 }
1936 uint32_t controlCmdType = packet->GetcontrolCmdType();
1937 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
1938 return DoAbilitySyncIfNeed(context, message, true);
1939 }
1940 if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
1941 SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
1942 return -E_WAIT_NEXT_MESSAGE;
1943 }
1944 return E_OK;
1945 }
1946
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)1947 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
1948 const Message *message)
1949 {
1950 uint32_t controlCmdType = packet->GetcontrolCmdType();
1951 if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
1952 return E_OK;
1953 }
1954 QuerySyncObject syncQuery = packet->GetQuery();
1955 int errCode;
1956 if (!packet->IsAutoSubscribe()) {
1957 errCode = storage_->CheckAndInitQueryCondition(syncQuery);
1958 if (errCode != E_OK) {
1959 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1960 SendControlAck(context, message, errCode, controlCmdType);
1961 return -E_WAIT_NEXT_MESSAGE;
1962 }
1963 }
1964 int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
1965 static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
1966 if (mode >= SyncModeType::INVALID_MODE) {
1967 LOGE("[SingleVerDataSync] invalid mode");
1968 SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
1969 return -E_WAIT_NEXT_MESSAGE;
1970 }
1971 errCode = CheckPermitSendData(mode, context);
1972 if (errCode != E_OK) {
1973 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
1974 SendControlAck(context, message, errCode, controlCmdType);
1975 }
1976 return errCode;
1977 }
1978
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)1979 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
1980 {
1981 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
1982 if (packet == nullptr) {
1983 return -E_INVALID_ARGS;
1984 }
1985 int errCode = SubscribeRequestRecvPre(context, packet, message);
1986 if (errCode != E_OK) {
1987 return errCode;
1988 }
1989 uint32_t controlCmdType = packet->GetcontrolCmdType();
1990 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
1991 if (subscribeManager == nullptr) {
1992 LOGE("[SingleVerDataSync] subscribeManager check failed");
1993 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
1994 return -E_INVALID_ARGS;
1995 }
1996 errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
1997 if (errCode != E_OK) {
1998 LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
1999 STR_MASK(GetDeviceId()));
2000 SendControlAck(context, message, errCode, controlCmdType);
2001 return errCode;
2002 }
2003 errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2004 if (errCode != E_OK) {
2005 LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2006 STR_MASK(GetDeviceId()));
2007 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2008 SendControlAck(context, message, errCode, controlCmdType);
2009 return errCode;
2010 }
2011 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2012 if (errCode != E_OK) {
2013 subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2014 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
2015 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2016 STR_MASK(GetDeviceId()));
2017 return errCode;
2018 }
2019 subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2020 return errCode;
2021 }
2022
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)2023 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
2024 {
2025 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
2026 if (packet == nullptr) {
2027 return -E_INVALID_ARGS;
2028 }
2029 uint32_t controlCmdType = packet->GetcontrolCmdType();
2030 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
2031 if (subscribeManager == nullptr) {
2032 LOGE("[SingleVerDataSync] subscribeManager check failed");
2033 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
2034 return -E_INVALID_ARGS;
2035 }
2036 int errCode;
2037 std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
2038 if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
2039 errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
2040 if (errCode != E_OK) {
2041 LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2042 STR_MASK(GetDeviceId()));
2043 SendControlAck(context, message, errCode, controlCmdType);
2044 return errCode;
2045 }
2046 }
2047 errCode = SendControlAck(context, message, E_OK, controlCmdType);
2048 if (errCode != E_OK) {
2049 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
2050 STR_MASK(GetDeviceId()));
2051 return errCode;
2052 }
2053 subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
2054 metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
2055 return errCode;
2056 }
2057
PutDataMsg(Message * message)2058 void SingleVerDataSync::PutDataMsg(Message *message)
2059 {
2060 return msgSchedule_.PutMsg(message);
2061 }
2062
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)2063 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
2064 bool &isNeedContinue)
2065 {
2066 return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
2067 }
2068
IsNeedReloadQueue()2069 bool SingleVerDataSync::IsNeedReloadQueue()
2070 {
2071 return msgSchedule_.IsNeedReloadQueue();
2072 }
2073
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)2074 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
2075 {
2076 msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
2077 }
2078
ClearDataMsg()2079 void SingleVerDataSync::ClearDataMsg()
2080 {
2081 msgSchedule_.ClearMsg();
2082 }
2083
QuerySyncCheck(SingleVerSyncTaskContext * context)2084 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
2085 {
2086 if (context == nullptr) {
2087 return -E_INVALID_ARGS;
2088 }
2089 bool isCheckStatus = false;
2090 int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
2091 if (errCode != E_OK) {
2092 return errCode;
2093 }
2094 if (!isCheckStatus) {
2095 context->SetTaskErrCode(-E_NOT_SUPPORT);
2096 return -E_NOT_SUPPORT;
2097 }
2098 return E_OK;
2099 }
2100
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)2101 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
2102 const std::shared_ptr<SubscribeManager> &subscribeManager)
2103 {
2104 if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
2105 storage_->RemoveSubscribe(queryId);
2106 }
2107 }
2108 } // namespace DistributedDB
2109