1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "store_types.h"
31 #include "strategy_factory.h"
32 #include "version.h"
33
34 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)35 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
36 {
37 Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
38 waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
39 RecordWaterMark(taskId, 0u);
40 }
41
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)42 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
43 {
44 std::lock_guard<std::mutex> autoLock(dataLock_);
45 std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
46 cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
47 }
48
ReloadUploadInfoIfNeed(const UploadParam & param,InnerProcessInfo & info)49 void CloudSyncer::ReloadUploadInfoIfNeed(const UploadParam ¶m, InnerProcessInfo &info)
50 {
51 info.upLoadInfo.total = static_cast<uint32_t>(param.count);
52 Info lastUploadInfo;
53 UploadRetryInfo retryInfo;
54 GetLastUploadInfo(info.tableName, lastUploadInfo, retryInfo);
55 info.upLoadInfo.total += lastUploadInfo.successCount;
56 info.upLoadInfo.successCount += lastUploadInfo.successCount;
57 info.upLoadInfo.failCount += lastUploadInfo.failCount;
58 info.upLoadInfo.insertCount += lastUploadInfo.insertCount;
59 info.upLoadInfo.updateCount += lastUploadInfo.updateCount;
60 info.upLoadInfo.deleteCount += lastUploadInfo.deleteCount;
61 info.retryInfo.uploadBatchRetryCount = retryInfo.uploadBatchRetryCount;
62 LOGD("[CloudSyncer] resume upload, last success count %" PRIu32 ", last fail count %" PRIu32,
63 lastUploadInfo.successCount, lastUploadInfo.failCount);
64 }
65
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,UploadRetryInfo & retryInfo)66 void CloudSyncer::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo)
67 {
68 std::lock_guard<std::mutex> autoLock(dataLock_);
69 return currentContext_.notifier->GetLastUploadInfo(tableName, lastUploadInfo, retryInfo);
70 }
71
GetCloudGidAndFillExtend(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,VBucket & extend)72 int CloudSyncer::GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj,
73 VBucket &extend)
74 {
75 int errCode = GetCloudGid(taskId, tableName, obj);
76 if (errCode != E_OK) {
77 LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
78 return errCode;
79 }
80 return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
81 }
82
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)83 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
84 VBucket &extend)
85 {
86 extend = {
87 {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
88 };
89
90 QuerySyncObject obj = GetQuerySyncObject(tableName);
91 if (obj.IsContainQueryNodes()) {
92 return GetCloudGidAndFillExtend(taskId, tableName, obj, extend);
93 }
94 if (IsCompensatedTask(taskId)) {
95 int errCode = GetCloudGid(taskId, tableName, obj);
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 if (obj.IsContainQueryNodes()) {
100 return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
101 }
102 }
103 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
104 return E_OK;
105 }
106
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)107 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
108 {
109 std::vector<std::string> cloudGid;
110 bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
111 int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
112 if (errCode != E_OK) {
113 LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
114 } else if (!cloudGid.empty()) {
115 obj.SetCloudGid(cloudGid);
116 }
117 LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
118 return errCode;
119 }
120
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,std::vector<std::string> & cloudGid)121 int CloudSyncer::GetCloudGid(
122 TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid)
123 {
124 bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
125 int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
126 if (errCode != E_OK) {
127 LOGE("[CloudSyncer] Failed to get cloud gid, taskid:%" PRIu64 ", table name: %s, length: %zu, %d.",
128 taskId, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode);
129 }
130 return errCode;
131 }
132
GetQuerySyncObject(const std::string & tableName)133 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
134 {
135 std::lock_guard<std::mutex> autoLock(dataLock_);
136 for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
137 if (item.GetTableName() == tableName) {
138 return item;
139 }
140 }
141 LOGW("[CloudSyncer] not found query in cache");
142 QuerySyncObject querySyncObject;
143 querySyncObject.SetTableName(tableName);
144 return querySyncObject;
145 }
146
UpdateProcessWhenUploadFailed(InnerProcessInfo & info)147 void CloudSyncer::UpdateProcessWhenUploadFailed(InnerProcessInfo &info)
148 {
149 info.tableStatus = ProcessStatus::FINISHED;
150 std::lock_guard<std::mutex> autoLock(dataLock_);
151 currentContext_.notifier->UpdateProcess(info);
152 currentContext_.notifier->UpdateUploadRetryInfo(info);
153 }
154
NotifyUploadFailed(int errCode,InnerProcessInfo & info)155 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
156 {
157 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
158 LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
159 } else {
160 LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
161 info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
162 }
163 UpdateProcessWhenUploadFailed(info);
164 }
165
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)166 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
167 {
168 return BatchInsertOrUpdate(insertInfo, uploadData, innerProcessInfo, true);
169 }
170
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)171 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
172 {
173 return BatchInsertOrUpdate(updateInfo, uploadData, innerProcessInfo, false);
174 }
175
BatchInsertOrUpdate(Info & uploadInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo,bool isInsert)176 int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
177 bool isInsert)
178 {
179 uint32_t retryCount = 0;
180 int errCode = E_OK;
181 if (isInsert) {
182 errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
183 uploadData.insData.extend, uploadInfo, retryCount);
184 innerProcessInfo.upLoadInfo.insertCount += uploadInfo.successCount;
185 } else {
186 errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
187 uploadData.updData.extend, uploadInfo, retryCount);
188 innerProcessInfo.upLoadInfo.updateCount += uploadInfo.successCount;
189 }
190 innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount;
191 innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount;
192 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
193 ProcessVersionConflictInfo(innerProcessInfo, retryCount);
194 }
195 if (errCode != E_OK) {
196 LOGE("[CloudSyncer][BatchInsertOrUpdate] BatchInsertOrUpdate with error, ret is %d.", errCode);
197 }
198 if (uploadData.isCloudVersionRecord) {
199 return errCode;
200 }
201 return BackFillAfterBatchUpload(uploadData, isInsert, errCode);
202 }
203
BackFillAfterBatchUpload(CloudSyncData & uploadData,bool isInsert,int batchUploadResult)204 int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult)
205 {
206 bool isSharedTable = false;
207 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
208 if (ret != E_OK) {
209 LOGE("[CloudSyncer] BackFillAfterBatchUpload cannot judge the table is shared table. %d", ret);
210 return ret;
211 }
212 int errCode = batchUploadResult;
213 if (!isSharedTable) {
214 CloudWaterType type = isInsert ? CloudWaterType::INSERT : CloudWaterType::UPDATE;
215 CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
216 ret = CloudSyncUtils::FillAssetIdToAssets(data, errCode, type);
217 if (ret != E_OK) {
218 LOGW("[CloudSyncer][BackFillAfterBatchUpload] FillAssetIdToAssets with error, ret is %d.", ret);
219 }
220 }
221 OpType opType = isInsert ? OpType::INSERT : OpType::UPDATE;
222 if (errCode != E_OK) {
223 storageProxy_->FillCloudGidIfSuccess(opType, uploadData);
224 CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
225 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend);
226 if (isSkip) {
227 LOGI("[CloudSyncer][BackFillAfterBatchUpload] Try to FillCloudLogAndAsset when assets missing: %d",
228 errCode);
229 return E_OK;
230 } else {
231 LOGE("[CloudSyncer][BackFillAfterBatchUpload] errCode: %d, can not skip assets missing record.", errCode);
232 return errCode;
233 }
234 }
235 int errorCode = storageProxy_->FillCloudLogAndAsset(opType, uploadData);
236 if ((errorCode != E_OK) || (ret != E_OK)) {
237 LOGE("[CloudSyncer] Failed to fill back when doing upload insData or updData, %d.", errorCode);
238 return ret == E_OK ? errorCode : ret;
239 }
240 return E_OK;
241 }
242
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)243 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
244 std::map<std::string, Assets> &downloadAssets)
245 {
246 bool isSharedTable = false;
247 int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
248 if (errCode != E_OK) {
249 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
250 return errCode;
251 }
252 int transactionCode = E_OK;
253 // shared table don't download, so just begin transaction once
254 if (isSharedTable) {
255 transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
256 }
257 if (transactionCode != E_OK) {
258 LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
259 return transactionCode;
260 }
261 errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
262 if (isSharedTable) {
263 transactionCode = storageProxy_->Commit();
264 if (transactionCode != E_OK) {
265 LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
266 }
267 }
268 return (errCode == E_OK) ? transactionCode : errCode;
269 }
270
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)271 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
272 const DownloadItem &downloadItem, VBucket &dbAssets)
273 {
274 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
275 auto &errCode = res.first;
276 if (!isSharedTable) {
277 errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
278 }
279 if (errCode != E_OK) {
280 LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
281 return res;
282 }
283 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
284 downloadItem.hashKey, dbAssets);
285 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
286 if (errCode != -E_CLOUD_GID_MISMATCH) {
287 LOGE("[CloudSyncer] get assets from db failed %d", errCode);
288 }
289 if (!isSharedTable) {
290 (void)storageProxy_->Rollback();
291 }
292 return res;
293 }
294 if (!isSharedTable) {
295 errCode = storageProxy_->Commit();
296 }
297 if (errCode != E_OK) {
298 LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
299 }
300 return res;
301 }
302
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)303 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
304 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
305 std::map<std::string, Assets> &tmpAssetsToDelete)
306 {
307 std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
308 for (auto &[col, assets] : tmpAssetsToDownload) {
309 int i = 0;
310 for (auto &asset : assets) {
311 asset.flag = tmpFlags[col][i++];
312 if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
313 continue;
314 }
315 if (downloadCode == E_OK) {
316 asset.status = NORMAL;
317 } else {
318 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
319 }
320 }
321 }
322 for (auto &[col, assets] : tmpAssetsToDelete) {
323 for (auto &asset : assets) {
324 asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
325 if (deleteCode == E_OK) {
326 asset.status = NORMAL;
327 } else {
328 asset.status = ABNORMAL;
329 }
330 downloadAssets[col].push_back(asset);
331 }
332 }
333 return downloadAssets;
334 }
335
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)336 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
337 const DownloadItem &downloadItem, VBucket &dbAssets)
338 {
339 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK)};
340 auto &ret = res.first;
341 auto &status = res.second;
342 if (info.isAsyncDownload) {
343 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
344 downloadItem.hashKey, dbAssets);
345 } else {
346 res = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
347 }
348 if (ret == -E_CLOUD_GID_MISMATCH) {
349 LOGW("[CloudSyncer] skip download asset because gid mismatch");
350 errCode = E_OK;
351 return true;
352 }
353 if (CloudStorageUtils::IsDataLocked(status)) {
354 LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
355 errCode = E_OK;
356 return true;
357 }
358 if (ret != E_OK) {
359 LOGE("[CloudSyncer]%s download get assets from DB failed: %d, return errCode: %d",
360 info.isAsyncDownload ? "Async" : "Sync", ret, errCode);
361 errCode = (errCode != E_OK) ? errCode : ret;
362 return true;
363 }
364 return false;
365 }
366
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)367 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
368 {
369 if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
370 downloadItem.recordConflict = true;
371 errCode = E_OK;
372 return false;
373 }
374 errCode = (errCode != E_OK) ? errCode : deleteCode;
375 errCode = (errCode != E_OK) ? errCode : downloadCode;
376 if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
377 return false;
378 }
379 return true;
380 }
381
GetAssetsToDownload(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToDownload,std::map<std::string,std::vector<uint32_t>> & tmpFlags)382 void GetAssetsToDownload(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
383 std::map<std::string, Assets> &assetsToDownload, std::map<std::string, std::vector<uint32_t>> &tmpFlags)
384 {
385 if (isSharedTable) {
386 LOGD("[CloudSyncer] skip download for shared table");
387 return;
388 }
389 for (auto &[col, assets] : downloadAssets) {
390 for (auto &asset : assets) {
391 if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE) &&
392 AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
393 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
394 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
395 static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
396 assetsToDownload[col].push_back(asset);
397 tmpFlags[col].push_back(asset.flag);
398 } else {
399 LOGD("[CloudSyncer] skip download asset...");
400 }
401 }
402 }
403 }
404
GetAssetsToRemove(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToRemove)405 void GetAssetsToRemove(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
406 std::map<std::string, Assets> &assetsToRemove)
407 {
408 if (isSharedTable) {
409 LOGD("[CloudSyncer] skip remove for shared table");
410 return;
411 }
412 for (auto &[col, assets] : downloadAssets) {
413 for (auto &asset : assets) {
414 if (asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) &&
415 AssetOperationUtils::CalAssetRemoveOperation(col, asset, dbAssets) ==
416 AssetOperationUtils::AssetOpType::HANDLE) {
417 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
418 assetsToRemove[col].push_back(asset);
419 }
420 }
421 }
422 }
423
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)424 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
425 DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
426 {
427 int errCode = E_OK;
428 VBucket dbAssets;
429 std::map<std::string, Assets> tmpAssetsToRemove;
430 if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
431 GetAssetsToRemove(downloadAssets, dbAssets, isSharedTable, tmpAssetsToRemove);
432 }
433 auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
434 tmpAssetsToRemove);
435
436 std::map<std::string, Assets> tmpAssetsToDownload;
437 std::map<std::string, std::vector<uint32_t>> tmpFlags;
438 if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
439 GetAssetsToDownload(downloadAssets, dbAssets, isSharedTable, tmpAssetsToDownload, tmpFlags);
440 }
441 auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
442 if (downloadCode != SKIP_ASSET && !CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
443 return errCode;
444 }
445
446 // copy asset back
447 downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
448 tmpAssetsToRemove);
449 return errCode;
450 }
451
SeparateNormalAndFailAssets(const std::map<std::string,Assets> & assetsMap,VBucket & normalAssets,VBucket & failedAssets)452 void CloudSyncer::SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
453 VBucket &failedAssets)
454 {
455 for (auto &[key, assets] : assetsMap) {
456 Assets tempFailedAssets;
457 Assets tempNormalAssets;
458 int32_t otherStatusCnt = 0;
459 for (auto &asset : assets) {
460 if (asset.status == AssetStatus::NORMAL) {
461 tempNormalAssets.push_back(asset);
462 } else if (asset.status == AssetStatus::ABNORMAL) {
463 tempFailedAssets.push_back(asset);
464 } else {
465 otherStatusCnt++;
466 LOGW("[CloudSyncer] download err, statue %d count %d", asset.status, otherStatusCnt);
467 }
468 }
469 LOGI("[CloudSyncer] download asset times, normalCount %d, abnormalCount %d, otherStatusCnt %d",
470 tempNormalAssets.size(), tempFailedAssets.size(), otherStatusCnt);
471 if (tempFailedAssets.size() > 0) {
472 failedAssets[key] = std::move(tempFailedAssets);
473 }
474 if (tempNormalAssets.size() > 0) {
475 normalAssets[key] = std::move(tempNormalAssets);
476 }
477 }
478 }
479
CommitDownloadAssets(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)480 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
481 DownloadCommitList &commitList, uint32_t &successCount)
482 {
483 int errCode = storageProxy_->SetLogTriggerStatus(false);
484 if (errCode != E_OK) {
485 return errCode;
486 }
487 for (auto &item : commitList) {
488 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
489 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
490 std::map<std::string, Assets> assetsMap = std::get<1>(item);
491 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
492 VBucket normalAssets;
493 VBucket failedAssets;
494 normalAssets[CloudDbConstant::GID_FIELD] = gid;
495 failedAssets[CloudDbConstant::GID_FIELD] = gid;
496 if (setAllNormal) {
497 for (auto &[key, asset] : assetsMap) {
498 normalAssets[key] = std::move(asset);
499 }
500 } else {
501 SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
502 }
503 if (!downloadItem.recordConflict) {
504 errCode = FillCloudAssets(info, normalAssets, failedAssets);
505 if (errCode != E_OK) {
506 break;
507 }
508 }
509 LogInfo logInfo;
510 logInfo.cloudGid = gid;
511 // download must contain gid, just set the default value here.
512 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
513 logInfo.hashKey = downloadItem.hashKey;
514 logInfo.timestamp = downloadItem.timestamp;
515 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
516 if (failedAssets.size() > 1) {
517 logInfo.timestamp = 0u;
518 }
519
520 errCode = storageProxy_->UpdateRecordFlag(
521 info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
522 if (errCode != E_OK) {
523 break;
524 }
525 successCount++;
526 }
527 int ret = storageProxy_->SetLogTriggerStatus(true);
528 return errCode == E_OK ? ret : errCode;
529 }
530
FillCloudAssetsForOneRecord(const std::string & gid,const std::map<std::string,Assets> & assetsMap,InnerProcessInfo & info,bool setAllNormal,bool & isExistAssetDownloadFail)531 int CloudSyncer::FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
532 InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail)
533 {
534 VBucket normalAssets;
535 VBucket failedAssets;
536 normalAssets[CloudDbConstant::GID_FIELD] = gid;
537 failedAssets[CloudDbConstant::GID_FIELD] = gid;
538 if (setAllNormal) {
539 for (auto &[key, asset] : assetsMap) {
540 normalAssets[key] = std::move(asset);
541 }
542 } else {
543 SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
544 }
545 isExistAssetDownloadFail = failedAssets.size() > 1; // There is initially one element present
546 return FillCloudAssets(info, normalAssets, failedAssets);
547 }
548
UpdateRecordFlagForOneRecord(const std::string & gid,const DownloadItem & downloadItem,InnerProcessInfo & info,bool isExistAssetDownloadFail)549 int CloudSyncer::UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem,
550 InnerProcessInfo &info, bool isExistAssetDownloadFail)
551 {
552 LogInfo logInfo;
553 logInfo.cloudGid = gid;
554 // download must contain gid, just set the default value here.
555 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
556 logInfo.hashKey = downloadItem.hashKey;
557 logInfo.timestamp = downloadItem.timestamp;
558 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
559 if (isExistAssetDownloadFail) {
560 logInfo.timestamp = 0u;
561 }
562 return storageProxy_->UpdateRecordFlag(info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
563 }
564
CommitDownloadAssetsForAsyncDownload(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)565 int CloudSyncer::CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
566 DownloadCommitList &commitList, uint32_t &successCount)
567 {
568 int errCode = storageProxy_->SetLogTriggerStatus(false, true);
569 if (errCode != E_OK) {
570 return errCode;
571 }
572 for (auto &item : commitList) {
573 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
574 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
575 std::map<std::string, Assets> assetsMap = std::get<1>(item);
576 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
577 LockStatus status = LockStatus::BUTT;
578 errCode = storageProxy_->GetLockStatusByGid(info.tableName, gid, status);
579 if (errCode == E_OK && status != LockStatus::UNLOCK) {
580 continue;
581 }
582
583 bool isExistAssetDownloadFail = false;
584 if (!downloadItem.recordConflict) {
585 errCode = FillCloudAssetsForOneRecord(gid, assetsMap, info, setAllNormal, isExistAssetDownloadFail);
586 if (errCode != E_OK) {
587 break;
588 }
589 }
590
591 errCode = UpdateRecordFlagForOneRecord(gid, downloadItem, info, isExistAssetDownloadFail);
592 if (errCode != E_OK) {
593 break;
594 }
595 successCount++;
596 }
597 int ret = storageProxy_->SetLogTriggerStatus(true, true);
598 return errCode == E_OK ? ret : errCode;
599 }
600
GenerateCompensatedSync(CloudTaskInfo & taskInfo)601 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
602 {
603 std::vector<QuerySyncObject> syncQuery;
604 std::vector<std::string> users;
605 int errCode =
606 CloudSyncUtils::GetQueryAndUsersForCompensatedSync(CanStartAsyncDownload(), storageProxy_, users, syncQuery);
607 if (errCode != E_OK) {
608 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
609 return;
610 }
611 if (syncQuery.empty()) {
612 LOGD("[CloudSyncer] Not need generate compensated sync");
613 return;
614 }
615 for (const auto &it : syncQuery) {
616 CloudTaskInfo compensatedTaskInfo = taskInfo;
617 compensatedTaskInfo.queryList.push_back(it);
618 Sync(compensatedTaskInfo);
619 taskInfo.callback = nullptr;
620 LOGI("[CloudSyncer] Generate compensated sync finished");
621 }
622 }
623
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)624 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
625 {
626 if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
627 return;
628 }
629 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
630 if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
631 return;
632 }
633 if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
634 return;
635 }
636 info.tableStatus = ProcessStatus::FINISHED;
637 info.upLoadInfo.batchIndex++;
638 NotifyInBatchUpload(uploadParam, info, true);
639 }
640
SaveCursorIfNeed(const std::string & tableName)641 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
642 {
643 std::string cursor = "";
644 int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
645 if (errCode != E_OK) {
646 LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
647 return errCode;
648 }
649 if (!cursor.empty()) {
650 return E_OK;
651 }
652 auto res = cloudDB_.GetEmptyCursor(tableName);
653 if (res.first != E_OK) {
654 LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
655 return res.first;
656 }
657 if (res.second.empty()) {
658 LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
659 return -E_CLOUD_ERROR;
660 }
661 errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
662 if (errCode != E_OK) {
663 LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
664 }
665 return errCode;
666 }
667
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)668 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
669 {
670 std::string hashDev;
671 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
672 if (errCode != E_OK) {
673 LOGE("[CloudSyncer] Failed to get local identity.");
674 return errCode;
675 }
676 errCode = SaveCursorIfNeed(table);
677 if (errCode != E_OK) {
678 return errCode;
679 }
680 errCode = CheckTaskIdValid(taskInfo.taskId);
681 if (errCode != E_OK) {
682 LOGW("[CloudSyncer] task is invalid, abort sync");
683 return errCode;
684 }
685 errCode = DoDownload(taskInfo.taskId, isFirstDownload);
686 if (errCode != E_OK) {
687 LOGE("[CloudSyncer] download failed %d", errCode);
688 }
689 return errCode;
690 }
691
IsClosed() const692 bool CloudSyncer::IsClosed() const
693 {
694 return closed_ || IsKilled();
695 }
696
UpdateFlagForSavedRecord(const SyncParam & param)697 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam ¶m)
698 {
699 DownloadList downloadList;
700 {
701 std::lock_guard<std::mutex> autoLock(dataLock_);
702 downloadList = currentContext_.assetDownloadList;
703 }
704 std::set<std::string> downloadGid;
705 std::set<std::string> consistentGid;
706 for (const auto &tuple : downloadList) {
707 if (CloudSyncUtils::IsContainDownloading(tuple)) {
708 downloadGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
709 }
710 consistentGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
711 }
712 if (IsCurrentAsyncDownloadTask()) {
713 int errCode = storageProxy_->MarkFlagAsAssetAsyncDownload(param.tableName, param.downloadData, downloadGid);
714 if (errCode != E_OK) {
715 LOGE("[CloudSyncer] Failed to mark flag consistent errCode %d", errCode);
716 return errCode;
717 }
718 }
719 return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, consistentGid);
720 }
721
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)722 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
723 {
724 uint32_t retryCount = 0;
725 int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
726 uploadData.delData.extend, deleteInfo, retryCount);
727 innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
728 innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
729 innerProcessInfo.upLoadInfo.failCount += deleteInfo.failCount;
730 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
731 ProcessVersionConflictInfo(innerProcessInfo, retryCount);
732 }
733 if (errCode != E_OK) {
734 LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
735 storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
736 return errCode;
737 }
738 errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
739 if (errCode != E_OK) {
740 LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
741 }
742 return errCode;
743 }
744
IsCompensatedTask(TaskId taskId)745 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
746 {
747 std::lock_guard<std::mutex> autoLock(dataLock_);
748 return cloudTaskInfos_[taskId].compensatedTask;
749 }
750
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)751 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
752 {
753 return cloudDB_.SetCloudDB(cloudDBs);
754 }
755
CleanAllWaterMark()756 void CloudSyncer::CleanAllWaterMark()
757 {
758 storageProxy_->CleanAllWaterMark();
759 }
760
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)761 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
762 {
763 downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
764 downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
765 downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
766 downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
767 downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
768 downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
769 downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
770 }
771
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)772 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
773 const bool isFirstDownload)
774 {
775 bool isNeedNotify = false;
776 {
777 std::lock_guard<std::mutex> autoLock(dataLock_);
778 // only when the first download and the task no need upload actually, notify the process, otherwise,
779 // the process will notify in the upload procedure, which can guarantee the notify order of the tables
780 isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
781 }
782 if (!isNeedNotify) {
783 return;
784 }
785 for (size_t i = 0; i < needNotifyTables.size(); ++i) {
786 UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
787 }
788 }
789
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)790 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
791 {
792 std::string tableName;
793 int ret = GetCurrentTableName(tableName);
794 if (ret != E_OK) {
795 LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
796 return ret;
797 }
798
799 ret = storageProxy_->StartTransaction();
800 if (ret != E_OK) {
801 LOGE("[CloudSyncer] start transaction failed before getting upload count.");
802 return ret;
803 }
804
805 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
806 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
807 if (ret != E_OK) {
808 // GetUploadCount will return E_OK when upload count is zero.
809 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
810 }
811 // No need Rollback when GetUploadCount failed
812 storageProxy_->Commit();
813 return ret;
814 }
815
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)816 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
817 bool needNotify)
818 {
819 LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
820 InnerProcessInfo innerProcessInfo;
821 innerProcessInfo.tableName = tableName;
822 innerProcessInfo.upLoadInfo.total = 0; // count is zero
823 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
824 {
825 std::lock_guard<std::mutex> autoLock(dataLock_);
826 if (!needNotify) {
827 currentContext_.notifier->UpdateProcess(innerProcessInfo);
828 } else {
829 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
830 }
831 }
832 }
833
SetNeedUpload(bool isNeedUpload)834 void CloudSyncer::SetNeedUpload(bool isNeedUpload)
835 {
836 std::lock_guard<std::mutex> autoLock(dataLock_);
837 currentContext_.isNeedUpload = isNeedUpload;
838 }
839
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)840 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
841 {
842 std::vector<std::string> needNotifyTables;
843 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
844 std::string table;
845 {
846 std::lock_guard<std::mutex> autoLock(dataLock_);
847 if (currentContext_.processRecorder == nullptr) {
848 LOGE("[CloudSyncer] process recorder of current context is nullptr.");
849 return -E_INTERNAL_ERROR;
850 }
851 if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
852 taskInfo.table[i])) {
853 continue;
854 }
855 LOGD("[CloudSyncer] try download table, index: %zu, table name: %s, length: %u",
856 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
857 currentContext_.tableName = taskInfo.table[i];
858 table = currentContext_.tableName;
859 }
860 int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
861 if (errCode != E_OK) {
862 return errCode;
863 }
864 CheckDataAfterDownload(table);
865 MarkDownloadFinishIfNeed(table);
866 // needUpload indicate that the syncMode need push
867 if (needUpload) {
868 int64_t count = 0;
869 errCode = GetUploadCountByTable(taskInfo.taskId, count);
870 if (errCode != E_OK) {
871 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
872 return errCode;
873 }
874 // count > 0 means current table need upload actually
875 if (count > 0) {
876 SetNeedUpload(true);
877 continue;
878 }
879 needNotifyTables.emplace_back(table);
880 }
881 errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
882 if (errCode != E_OK) {
883 LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
884 return errCode;
885 }
886 }
887 DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
888 TriggerAsyncDownloadAssetsInTaskIfNeed(isFirstDownload);
889 return E_OK;
890 }
891
IsNeedGetLocalWater(TaskId taskId)892 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
893 {
894 return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
895 !IsCompensatedTask(taskId);
896 }
897
TryToAddSyncTask(CloudTaskInfo && taskInfo)898 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
899 {
900 if (closed_) {
901 LOGW("[CloudSyncer] syncer is closed, should not sync now");
902 return -E_DB_CLOSED;
903 }
904 std::shared_ptr<DataBaseSchema> cloudSchema;
905 int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
906 if (errCode != E_OK) {
907 LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
908 return errCode;
909 }
910 std::lock_guard<std::mutex> autoLock(dataLock_);
911 taskInfo.priorityLevel = (!taskInfo.priorityTask || taskInfo.compensatedTask)
912 ? CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL
913 : taskInfo.priorityLevel;
914 errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
915 if (errCode != E_OK) {
916 return errCode;
917 }
918 errCode = GenerateTaskIdIfNeed(taskInfo);
919 if (errCode != E_OK) {
920 return errCode;
921 }
922 auto taskId = taskInfo.taskId;
923 cloudTaskInfos_[taskId] = std::move(taskInfo);
924 taskQueue_.insert({cloudTaskInfos_[taskId].priorityLevel, taskId});
925 LOGI("[CloudSyncer]Add task ok, storeId %.3s, priority %d, priorityLevel %" PRId32 ", taskId %" PRIu64 " async %d",
926 cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].priorityTask,
927 cloudTaskInfos_[taskId].priorityLevel, cloudTaskInfos_[taskId].taskId,
928 static_cast<int>(cloudTaskInfos_[taskId].asyncDownloadAssets));
929 MarkCurrentTaskPausedIfNeed(taskInfo);
930 if (!cloudTaskInfos_[taskId].priorityTask) {
931 MergeTaskInfo(cloudSchema, taskId);
932 }
933 return E_OK;
934 }
935
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)936 void CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
937 {
938 if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
939 return;
940 }
941 bool isMerge = false;
942 TaskId checkTaskId = taskId;
943 do {
944 std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
945 } while (isMerge);
946 }
947
RemoveTaskFromQueue(int32_t priorityLevel,TaskId taskId)948 void CloudSyncer::RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId)
949 {
950 for (auto it = taskQueue_.find(priorityLevel); it != taskQueue_.end(); ++it) {
951 if (it->second == taskId) {
952 taskQueue_.erase(it);
953 return;
954 }
955 }
956 }
957
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)958 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
959 {
960 std::pair<bool, TaskId> res;
961 auto &[merge, nextTryTask] = res;
962 TaskId beMergeTask = INVALID_TASK_ID;
963 TaskId runningTask = currentContext_.currentTaskId;
964 auto commonLevelTask = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
965 for (auto it = commonLevelTask.first; it != commonLevelTask.second; ++it) {
966 TaskId taskId = it->second;
967 if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
968 continue;
969 }
970 if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
971 continue;
972 }
973 if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
974 beMergeTask = taskId;
975 nextTryTask = tryTaskId;
976 merge = true;
977 break;
978 }
979 if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
980 beMergeTask = tryTaskId;
981 nextTryTask = taskId;
982 merge = true;
983 break;
984 }
985 }
986 if (!merge) { // LCOV_EXCL_BR_LINE
987 return res;
988 }
989 if (beMergeTask > nextTryTask) { // LCOV_EXCL_BR_LINE
990 std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
991 }
992 AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
993 auto processNotifier = std::make_shared<ProcessNotifier>(this);
994 processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
995 cloudTaskInfos_[beMergeTask].users);
996 cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
997 cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
998 processNotifier->SetAllTableFinish();
999 processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
1000 RemoveTaskFromQueue(cloudTaskInfos_[beMergeTask].priorityLevel, beMergeTask);
1001 cloudTaskInfos_.erase(beMergeTask);
1002 LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
1003 return res;
1004 }
1005
IsTaskCanMerge(const CloudTaskInfo & taskInfo)1006 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
1007 {
1008 return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
1009 taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
1010 }
1011
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)1012 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
1013 {
1014 const auto &taskInfo = cloudTaskInfos_[taskId];
1015 const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
1016 return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
1017 taskInfo.devices == tryMergeTaskInfo.devices &&
1018 taskInfo.asyncDownloadAssets == tryMergeTaskInfo.asyncDownloadAssets;
1019 }
1020
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)1021 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
1022 {
1023 const auto &source = cloudTaskInfos_[sourceId];
1024 const auto &target = cloudTaskInfos_[targetId];
1025 bool isMerge = true;
1026 for (const auto &table : source.table) {
1027 if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
1028 isMerge = false;
1029 break;
1030 }
1031 }
1032 return isMerge;
1033 }
1034
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)1035 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
1036 CloudTaskInfo &taskInfo)
1037 {
1038 std::vector<std::string> tmpTables = taskInfo.table;
1039 taskInfo.table.clear();
1040 taskInfo.queryList.clear();
1041 for (const auto &table : cloudSchema->tables) {
1042 if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
1043 taskInfo.table.push_back(table.name);
1044 QuerySyncObject querySyncObject;
1045 querySyncObject.SetTableName(table.name);
1046 taskInfo.queryList.push_back(querySyncObject);
1047 }
1048 }
1049 }
1050
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)1051 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
1052 {
1053 cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
1054 cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
1055 std::set<std::string> users;
1056 users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
1057 users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
1058 cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
1059 return {target, source};
1060 }
1061
IsQueryListEmpty(TaskId taskId)1062 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
1063 {
1064 std::lock_guard<std::mutex> autoLock(dataLock_);
1065 return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
1066 [](const auto &item) {
1067 return item.IsContainQueryNodes();
1068 });
1069 }
1070
IsNeedLock(const UploadParam & param)1071 bool CloudSyncer::IsNeedLock(const UploadParam ¶m)
1072 {
1073 return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
1074 }
1075
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)1076 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
1077 {
1078 std::pair<int, Timestamp> res = { E_OK, 0u };
1079 if (IsNeedGetLocalWater(uploadParam.taskId)) {
1080 res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
1081 }
1082 uploadParam.localMark = res.second;
1083 return res;
1084 }
1085
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken,std::vector<ReviseModTimeInfo> & revisedData)1086 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
1087 CloudSyncData &uploadData, ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData)
1088 {
1089 int ret = E_OK;
1090 uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
1091 bool isLocked = false;
1092 while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
1093 revisedData.insert(revisedData.end(), uploadData.revisedData.begin(), uploadData.revisedData.end());
1094 ret = PreProcessBatchUpload(uploadParam, info, uploadData);
1095 if (ret != E_OK) {
1096 break;
1097 }
1098 info.upLoadInfo.batchIndex = ++batchIndex;
1099 if (IsNeedLock(uploadParam) && !isLocked) {
1100 ret = LockCloudIfNeed(uploadParam.taskId);
1101 if (ret != E_OK) {
1102 break;
1103 }
1104 isLocked = true;
1105 }
1106 ret = DoBatchUpload(uploadData, uploadParam, info);
1107 if (ret != E_OK) {
1108 break;
1109 }
1110 uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
1111 if (continueStmtToken == nullptr) {
1112 break;
1113 }
1114 SetUploadDataFlag(uploadParam.taskId, uploadData);
1115 LOGI("[CloudSyncer] Write local water after upload one batch, table[%s length[%u]], water[%llu]",
1116 DBCommon::StringMiddleMasking(uploadData.tableName).c_str(), uploadData.tableName.length(),
1117 uploadParam.localMark);
1118 RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
1119 ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1120 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1121 LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1122 break;
1123 }
1124 ChkIgnoredProcess(info, uploadData, uploadParam);
1125 }
1126 if ((ret != E_OK) && (ret != -E_UNFINISHED) && (ret != -E_TASK_PAUSED)) {
1127 NotifyUploadFailed(ret, info);
1128 }
1129 if (isLocked && IsNeedLock(uploadParam)) {
1130 UnlockIfNeed();
1131 }
1132 return ret;
1133 }
1134
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1135 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1136 {
1137 InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
1138 static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
1139 int errCode = E_OK;
1140 for (const auto &waterType: waterTypes) {
1141 uploadParam.mode = waterType;
1142 errCode = DoUploadByMode(tableName, uploadParam, info);
1143 if (errCode != E_OK) {
1144 break;
1145 }
1146 }
1147 int ret = E_OK;
1148 if (info.upLoadInfo.successCount > 0) {
1149 ret = UploadVersionRecordIfNeed(uploadParam);
1150 }
1151 return errCode != E_OK ? errCode : ret;
1152 }
1153
UploadVersionRecordIfNeed(const UploadParam & uploadParam)1154 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
1155 {
1156 if (uploadParam.count == 0) {
1157 // no record upload
1158 return E_OK;
1159 }
1160 if (!cloudDB_.IsExistCloudVersionCallback()) {
1161 return E_OK;
1162 }
1163 auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
1164 if (errCode != E_OK) {
1165 return errCode;
1166 }
1167 bool isInsert = !uploadData.insData.record.empty();
1168 CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
1169 if (batchData.record.empty()) {
1170 LOGE("[CloudSyncer] Get invalid cloud version record");
1171 return -E_INTERNAL_ERROR;
1172 }
1173 std::string oriVersion;
1174 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1175 std::string newVersion;
1176 std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1177 if (errCode != E_OK) {
1178 LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1179 return errCode;
1180 }
1181 batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1182 InnerProcessInfo processInfo;
1183 Info info;
1184 std::vector<VBucket> copyRecord = batchData.record;
1185 WaterMark waterMark;
1186 CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1187 errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1188 batchData.record = copyRecord;
1189 CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1190 auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1191 return errCode != E_OK ? errCode : ret;
1192 }
1193
TagUploadAssets(CloudSyncData & uploadData)1194 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1195 {
1196 if (!IsDataContainAssets()) {
1197 return;
1198 }
1199 std::vector<Field> assetFields;
1200 {
1201 std::lock_guard<std::mutex> autoLock(dataLock_);
1202 assetFields = currentContext_.assetFields[currentContext_.tableName];
1203 }
1204
1205 for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1206 for (const Field &assetField : assetFields) {
1207 (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1208 }
1209 }
1210 for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1211 for (const Field &assetField : assetFields) {
1212 (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1213 }
1214 }
1215 }
1216
IsLockInDownload()1217 bool CloudSyncer::IsLockInDownload()
1218 {
1219 std::lock_guard<std::mutex> autoLock(dataLock_);
1220 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1221 return false;
1222 }
1223 auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1224 return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1225 }
1226
SetCurrentTaskFailedInMachine(int errCode)1227 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1228 {
1229 std::lock_guard<std::mutex> autoLock(dataLock_);
1230 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1231 return CloudSyncEvent::ERROR_EVENT;
1232 }
1233
InitCloudSyncStateMachine()1234 void CloudSyncer::InitCloudSyncStateMachine()
1235 {
1236 CloudSyncStateMachine::Initialize();
1237 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1238 return SyncMachineDoDownload();
1239 });
1240 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1241 return SyncMachineDoUpload();
1242 });
1243 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1244 return SyncMachineDoFinished();
1245 });
1246 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1247 return SyncMachineDoRepeatCheck();
1248 });
1249 }
1250
SyncMachineDoRepeatCheck()1251 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1252 {
1253 auto config = storageProxy_->GetCloudSyncConfig();
1254 {
1255 std::lock_guard<std::mutex> autoLock(dataLock_);
1256 if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1257 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1258 }
1259 currentContext_.repeatCount++;
1260 if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1261 LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1262 config.maxRetryConflictTimes);
1263 SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1264 return CloudSyncEvent::ERROR_EVENT;
1265 }
1266 LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1267 currentContext_.repeatCount);
1268 }
1269 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1270 }
1271
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1272 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1273 {
1274 // table exist reference should download every times
1275 if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1276 return;
1277 }
1278 std::lock_guard<std::mutex> autoLock(dataLock_);
1279 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1280 }
1281
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1282 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1283 {
1284 CloudSyncData uploadData(tableName, uploadParam.mode);
1285 SetUploadDataFlag(uploadParam.taskId, uploadData);
1286 auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1287 LOGI("[CloudSyncer] Get local water before upload result: %d, table[%s length[%u]], water[%llu]", err,
1288 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), localWater);
1289 if (err != E_OK) {
1290 return err;
1291 }
1292 ContinueToken continueStmtToken = nullptr;
1293 int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1294 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1295 LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1296 return ret;
1297 }
1298 uploadParam.count -= uploadData.ignoredCount;
1299 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1300 std::vector<ReviseModTimeInfo> revisedData;
1301 ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken, revisedData);
1302 if (ret != -E_TASK_PAUSED) {
1303 // reset watermark to zero when task no paused
1304 RecordWaterMark(uploadParam.taskId, 0u);
1305 }
1306 if (continueStmtToken != nullptr) {
1307 storageProxy_->ReleaseContinueToken(continueStmtToken);
1308 }
1309 if (!revisedData.empty()) {
1310 int errCode = storageProxy_->ReviseLocalModTime(tableName, revisedData);
1311 if (errCode != E_OK) {
1312 LOGE("[CloudSyncer] Failed to revise local modify time: %d, table: %s",
1313 errCode, DBCommon::StringMiddleMasking(tableName).c_str());
1314 }
1315 }
1316 return ret;
1317 }
1318
IsTableFinishInUpload(const std::string & table)1319 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1320 {
1321 std::lock_guard<std::mutex> autoLock(dataLock_);
1322 return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1323 }
1324
MarkUploadFinishIfNeed(const std::string & table)1325 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1326 {
1327 // table exist reference or reference by should upload every times
1328 if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1329 return;
1330 }
1331 std::lock_guard<std::mutex> autoLock(dataLock_);
1332 currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1333 }
1334
GetCloudTaskStatus(uint64_t taskId) const1335 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1336 {
1337 std::lock_guard<std::mutex> autoLock(dataLock_);
1338 auto iter = cloudTaskInfos_.find(taskId);
1339 SyncProcess syncProcess;
1340 if (iter == cloudTaskInfos_.end()) {
1341 syncProcess.process = ProcessStatus::FINISHED;
1342 syncProcess.errCode = NOT_FOUND;
1343 LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1344 return syncProcess;
1345 }
1346 syncProcess.process = iter->second.status;
1347 syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1348 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1349 if (currentContext_.currentTaskId == taskId) {
1350 notifier = currentContext_.notifier;
1351 }
1352 bool hasNotifier = notifier != nullptr;
1353 if (hasNotifier) {
1354 syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1355 }
1356 LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1357 iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1358 return syncProcess;
1359 }
1360
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1361 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1362 {
1363 if (taskInfo.taskId != INVALID_TASK_ID) {
1364 if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1365 LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1366 taskInfo.storeId.c_str());
1367 return -E_INVALID_ARGS;
1368 }
1369 LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1370 return E_OK;
1371 }
1372 lastTaskId_--;
1373 if (lastTaskId_ == INVALID_TASK_ID) {
1374 lastTaskId_ = UINT64_MAX;
1375 }
1376 taskInfo.taskId = lastTaskId_;
1377 return E_OK;
1378 }
1379
ProcessVersionConflictInfo(InnerProcessInfo & innerProcessInfo,uint32_t retryCount)1380 void CloudSyncer::ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount)
1381 {
1382 innerProcessInfo.retryInfo.uploadBatchRetryCount = retryCount;
1383 CloudSyncConfig config = storageProxy_->GetCloudSyncConfig();
1384 {
1385 std::lock_guard<std::mutex> autoLock(dataLock_);
1386 if (config.maxRetryConflictTimes >= 0 &&
1387 currentContext_.repeatCount + 1 > config.maxRetryConflictTimes) {
1388 innerProcessInfo.upLoadInfo.failCount =
1389 innerProcessInfo.upLoadInfo.total - innerProcessInfo.upLoadInfo.successCount;
1390 }
1391 }
1392 }
1393
GetStoreIdByTask(TaskId taskId)1394 std::string CloudSyncer::GetStoreIdByTask(TaskId taskId)
1395 {
1396 std::lock_guard<std::mutex> autoLock(dataLock_);
1397 return cloudTaskInfos_[taskId].storeId;
1398 }
1399
CleanKvCloudData(std::function<int (void)> & removeFunc)1400 int CloudSyncer::CleanKvCloudData(std::function<int(void)> &removeFunc)
1401 {
1402 hasKvRemoveTask = true;
1403 CloudSyncer::TaskId currentTask;
1404 {
1405 // stop task if exist
1406 std::lock_guard<std::mutex> autoLock(dataLock_);
1407 currentTask = currentContext_.currentTaskId;
1408 }
1409 if (currentTask != INVALID_TASK_ID) {
1410 StopAllTasks(-E_CLOUD_ERROR);
1411 }
1412 int errCode = E_OK;
1413 {
1414 std::lock_guard<std::mutex> lock(syncMutex_);
1415 errCode = removeFunc();
1416 hasKvRemoveTask = false;
1417 }
1418 if (errCode != E_OK) {
1419 LOGE("[CloudSyncer] removeFunc execute failed errCode: %d.", errCode);
1420 }
1421 return errCode;
1422 }
1423
StopAllTasks(int errCode)1424 void CloudSyncer::StopAllTasks(int errCode)
1425 {
1426 CloudSyncer::TaskId currentTask;
1427 {
1428 std::lock_guard<std::mutex> autoLock(dataLock_);
1429 currentTask = currentContext_.currentTaskId;
1430 }
1431 // mark current task user_change
1432 SetTaskFailed(currentTask, errCode);
1433 UnlockIfNeed();
1434 WaitCurTaskFinished();
1435
1436 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
1437 for (auto &info: infoList) {
1438 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with errCode %d, isPriority %d.", info.taskId, errCode,
1439 info.priorityTask);
1440 auto processNotifier = std::make_shared<ProcessNotifier>(this);
1441 processNotifier->Init(info.table, info.devices, info.users);
1442 info.errCode = errCode;
1443 info.status = ProcessStatus::FINISHED;
1444 processNotifier->NotifyProcess(info, {}, true);
1445 }
1446 }
1447
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)1448 int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
1449 {
1450 OpType strategyOpResult = OpType::NOT_HANDLE;
1451 int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
1452 if (errCode != E_OK) {
1453 return errCode;
1454 }
1455 param.downloadData.opType[idx] = strategyOpResult;
1456 if (!IsDataContainAssets()) {
1457 return E_OK;
1458 }
1459 Key hashKey;
1460 if (isExist) {
1461 hashKey = dataInfo.localInfo.logInfo.hashKey;
1462 }
1463 if (param.isAssetsOnly) {
1464 return strategyOpResult == OpType::LOCKED_NOT_HANDLE ?
1465 E_OK : TagDownloadAssetsForAssetOnly(hashKey, idx, param, dataInfo, localAssetInfo);
1466 }
1467 return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
1468 }
1469
CloudDbBatchDownloadAssets(TaskId taskId,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets)1470 int CloudSyncer::CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList,
1471 const std::set<Key> &dupHashKeySet, InnerProcessInfo &info, ChangedData &changedAssets)
1472 {
1473 int errorCode = CheckTaskIdValid(taskId);
1474 if (errorCode != E_OK) {
1475 return errorCode;
1476 }
1477 bool isSharedTable = false;
1478 errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1479 if (errorCode != E_OK) {
1480 LOGE("[CloudSyncer] check is shared table failed %d", errorCode);
1481 return errorCode;
1482 }
1483 // prepare download data
1484 auto [downloadRecord, removeAssets, downloadAssets] =
1485 GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, IsAsyncDownloadAssets(taskId), info);
1486 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1487 std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1488 };
1489 return BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1490 }
1491
FillDownloadItem(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,const InnerProcessInfo & info,bool isSharedTable,DownloadItems & record)1492 void CloudSyncer::FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1493 const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record)
1494 {
1495 CloudStorageUtils::EraseNoChangeAsset(record.downloadItem.assets);
1496 if (record.downloadItem.assets.empty()) { // Download data (include deleting)
1497 return;
1498 }
1499 if (isSharedTable) {
1500 // share table will not download asset, need to reset the status
1501 for (auto &entry: record.downloadItem.assets) {
1502 for (auto &asset: entry.second) {
1503 asset.status = AssetStatus::NORMAL;
1504 }
1505 }
1506 return;
1507 }
1508 int errCode = E_OK;
1509 VBucket dbAssets;
1510 if (!IsNeedSkipDownload(isSharedTable, errCode, info, record.downloadItem, dbAssets)) {
1511 GetAssetsToRemove(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToRemove);
1512 GetAssetsToDownload(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToDownload, record.flags);
1513 }
1514 }
1515
GetDownloadRecords(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,bool isSharedTable,bool isAsyncDownloadAssets,const InnerProcessInfo & info)1516 CloudSyncer::DownloadAssetDetail CloudSyncer::GetDownloadRecords(const DownloadList &downloadList,
1517 const std::set<Key> &dupHashKeySet, bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info)
1518 {
1519 DownloadItemRecords downloadRecord;
1520 RemoveAssetsRecords removeAssets;
1521 DownloadAssetsRecords downloadAssets;
1522 for (size_t i = 0; i < downloadList.size(); i++) {
1523 DownloadItems record;
1524 GetDownloadItem(downloadList, i, record.downloadItem);
1525 FillDownloadItem(dupHashKeySet, downloadList, info, isSharedTable, record);
1526
1527 IAssetLoader::AssetRecord removeAsset = {
1528 record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToRemove)
1529 };
1530 removeAssets.push_back(std::move(removeAsset));
1531 if (isAsyncDownloadAssets) {
1532 record.assetsToDownload.clear();
1533 }
1534 IAssetLoader::AssetRecord downloadAsset = {
1535 record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToDownload)
1536 };
1537 downloadAssets.push_back(std::move(downloadAsset));
1538 downloadRecord.push_back(std::move(record));
1539 }
1540 return {downloadRecord, removeAssets, downloadAssets};
1541 }
1542
BatchDownloadAndCommitRes(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets,std::tuple<DownloadItemRecords,RemoveAssetsRecords,DownloadAssetsRecords,bool> & downloadDetail)1543 int CloudSyncer::BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
1544 InnerProcessInfo &info, ChangedData &changedAssets,
1545 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail)
1546 {
1547 auto &[downloadRecord, removeAssets, downloadAssets, isSharedTable] = downloadDetail;
1548 // download and remove in batch
1549 auto deleteRes = cloudDB_.BatchRemoveLocalAssets(info.tableName, removeAssets);
1550 auto downloadRes = cloudDB_.BatchDownload(info.tableName, downloadAssets);
1551 if (deleteRes == -E_NOT_SET || downloadRes == -E_NOT_SET) {
1552 return -E_NOT_SET;
1553 }
1554 int errorCode = E_OK;
1555 int index = 0;
1556 for (auto &item : downloadRecord) {
1557 auto deleteCode = removeAssets[index].status == OK ? E_OK : -E_REMOVE_ASSETS_FAILED;
1558 auto downloadCode = CloudDBProxy::GetInnerErrorCode(downloadAssets[index].status);
1559 downloadCode = downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT ? E_OK : downloadCode;
1560 if (!isSharedTable) {
1561 item.downloadItem.assets = BackFillAssetsAfterDownload(downloadCode, deleteCode, item.flags,
1562 downloadAssets[index].assets, removeAssets[index].assets);
1563 }
1564 StatisticDownloadRes(downloadAssets[index], removeAssets[index], info, item.downloadItem);
1565 AddNotifyDataFromDownloadAssets(dupHashKeySet, item.downloadItem, changedAssets);
1566 if (item.downloadItem.strategy == OpType::DELETE) {
1567 item.downloadItem.assets = {};
1568 item.downloadItem.gid = "";
1569 }
1570 // commit download res
1571 DownloadCommitList commitList;
1572 // Process result of each asset
1573 downloadCode = downloadAssets[index].status == SKIP_ASSET ? E_OK : downloadCode;
1574 commitList.push_back(std::make_tuple(item.downloadItem.gid, std::move(item.downloadItem.assets),
1575 deleteCode == E_OK && downloadCode == E_OK));
1576 errorCode = (errorCode != E_OK) ? errorCode : deleteCode;
1577 errorCode = (errorCode != E_OK) ? errorCode : downloadCode;
1578 int currErrorCode = (deleteCode != E_OK) ? deleteCode : downloadCode;
1579 int ret = CommitDownloadResult(item.downloadItem, info, commitList, currErrorCode);
1580 if (ret != E_OK) {
1581 errorCode = errorCode == E_OK ? ret : errorCode;
1582 }
1583 index++;
1584 }
1585 storageProxy_->PrintCursorChange(info.tableName);
1586 return errorCode;
1587 }
1588
StatisticDownloadRes(const IAssetLoader::AssetRecord & downloadRecord,const IAssetLoader::AssetRecord & removeRecord,InnerProcessInfo & info,DownloadItem & downloadItem)1589 void CloudSyncer::StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
1590 const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem)
1591 {
1592 if ((downloadRecord.status == OK || downloadRecord.status == SKIP_ASSET) && (removeRecord.status == OK)) {
1593 return;
1594 }
1595 if ((downloadRecord.status == CLOUD_RECORD_EXIST_CONFLICT) ||
1596 (removeRecord.status == CLOUD_RECORD_EXIST_CONFLICT)) {
1597 downloadItem.recordConflict = true;
1598 return;
1599 }
1600 info.downLoadInfo.failCount += 1;
1601 if (info.downLoadInfo.successCount == 0) {
1602 LOGW("[CloudSyncer] Invalid successCount");
1603 } else {
1604 info.downLoadInfo.successCount -= 1;
1605 }
1606 }
1607
AddNotifyDataFromDownloadAssets(const std::set<Key> & dupHashKeySet,DownloadItem & downloadItem,ChangedData & changedAssets)1608 void CloudSyncer::AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
1609 ChangedData &changedAssets)
1610 {
1611 if (downloadItem.assets.empty()) {
1612 return;
1613 }
1614 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1615 if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
1616 LOGW("[CloudSyncer] [AddNotifyDataFromDownloadAssets] strategy is invalid.");
1617 } else {
1618 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1619 downloadItem.primaryKeyValList);
1620 }
1621 } else if (downloadItem.strategy == OpType::INSERT) {
1622 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1623 }
1624 }
1625
CheckDataAfterDownload(const std::string & tableName)1626 void CloudSyncer::CheckDataAfterDownload(const std::string &tableName)
1627 {
1628 int dataCount = 0;
1629 int logicDeleteDataCount = 0;
1630 int errCode = storageProxy_->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
1631 if (errCode == E_OK) {
1632 LOGI("[CloudSyncer] Check local data after download[%s[%u]], data count: %d, logic delete data count: %d",
1633 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), dataCount, logicDeleteDataCount);
1634 } else {
1635 LOGW("[CloudSyncer] Get local data after download fail: %d", errCode);
1636 }
1637 }
1638
WaitCurTaskFinished()1639 void CloudSyncer::WaitCurTaskFinished()
1640 {
1641 CancelBackgroundDownloadAssetsTask();
1642 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1643 if (currentContext_.currentTaskId != INVALID_TASK_ID) {
1644 LOGI("[CloudSyncer] begin wait current task %" PRIu64 " finished", currentContext_.currentTaskId);
1645 contextCv_.wait(uniqueLock, [this]() {
1646 return currentContext_.currentTaskId == INVALID_TASK_ID;
1647 });
1648 LOGI("[CloudSyncer] current task has been finished");
1649 }
1650 }
1651
IsAsyncDownloadAssets(TaskId taskId)1652 bool CloudSyncer::IsAsyncDownloadAssets(TaskId taskId)
1653 {
1654 std::lock_guard<std::mutex> autoLock(dataLock_);
1655 return cloudTaskInfos_[taskId].asyncDownloadAssets;
1656 }
1657
TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)1658 void CloudSyncer::TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)
1659 {
1660 {
1661 std::lock_guard<std::mutex> autoLock(dataLock_);
1662 if (!cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets) {
1663 return;
1664 }
1665 }
1666 if (isFirstDownload) {
1667 std::lock_guard<std::mutex> autoLock(dataLock_);
1668 if (currentContext_.isNeedUpload) {
1669 return;
1670 }
1671 }
1672 TriggerAsyncDownloadAssetsIfNeed();
1673 }
1674
TriggerAsyncDownloadAssetsIfNeed()1675 void CloudSyncer::TriggerAsyncDownloadAssetsIfNeed()
1676 {
1677 if (!storageProxy_->IsExistTableContainAssets()) {
1678 LOGD("[CloudSyncer] No exist table contain assets, skip async download asset check");
1679 return;
1680 }
1681 TaskId taskId = INVALID_TASK_ID;
1682 {
1683 std::lock_guard<std::mutex> autoLock(dataLock_);
1684 if (asyncTaskId_ != INVALID_TASK_ID || closed_) {
1685 LOGI("[CloudSyncer] No need generate async task now asyncTaskId %" PRIu64 " closed %d",
1686 static_cast<int>(asyncTaskId_), static_cast<int>(closed_));
1687 return;
1688 }
1689 lastTaskId_--;
1690 if (lastTaskId_ == INVALID_TASK_ID) {
1691 lastTaskId_ = UINT64_MAX;
1692 }
1693 asyncTaskId_ = lastTaskId_;
1694 taskId = asyncTaskId_;
1695 IncObjRef(this);
1696 }
1697 int errCode = RuntimeContext::GetInstance()->ScheduleTask([taskId, this]() {
1698 LOGI("[CloudSyncer] Exec asyncTaskId %" PRIu64 " begin", taskId);
1699 BackgroundDownloadAssetsTask();
1700 LOGI("[CloudSyncer] Exec asyncTaskId %" PRIu64 " finished", taskId);
1701 {
1702 std::lock_guard<std::mutex> autoLock(dataLock_);
1703 asyncTaskId_ = INVALID_TASK_ID;
1704 }
1705 asyncTaskCv_.notify_all();
1706 DecObjRef(this);
1707 });
1708 if (errCode == E_OK) {
1709 LOGI("[CloudSyncer] Schedule asyncTaskId %" PRIu64 " success", taskId);
1710 } else {
1711 LOGW("[CloudSyncer] Schedule BackgroundDownloadAssetsTask failed %d", errCode);
1712 DecObjRef(this);
1713 }
1714 }
1715
BackgroundDownloadAssetsTask()1716 void CloudSyncer::BackgroundDownloadAssetsTask()
1717 {
1718 // remove listener first
1719 CancelDownloadListener();
1720 // add download count and register listener if failed
1721 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
1722 IncObjRef(this);
1723 auto [errCode, listener] = manager->BeginDownloadWithListener([this](void *) {
1724 TriggerAsyncDownloadAssetsIfNeed();
1725 }, [this]() {
1726 DecObjRef(this);
1727 });
1728 if (errCode == E_OK) {
1729 // increase download count success
1730 CancelDownloadListener();
1731 DoBackgroundDownloadAssets();
1732 RuntimeContext::GetInstance()->GetAssetsDownloadManager()->FinishDownload();
1733 DecObjRef(this);
1734 return;
1735 }
1736 if (listener != nullptr) {
1737 std::lock_guard<std::mutex> autoLock(listenerMutex_);
1738 waitDownloadListener_ = listener;
1739 return;
1740 }
1741 LOGW("[CloudSyncer] BeginDownloadWithListener failed %d", errCode);
1742 DecObjRef(this);
1743 }
1744
CancelDownloadListener()1745 void CloudSyncer::CancelDownloadListener()
1746 {
1747 NotificationChain::Listener *waitDownloadListener = nullptr;
1748 {
1749 std::lock_guard<std::mutex> autoLock(listenerMutex_);
1750 if (waitDownloadListener_ != nullptr) {
1751 waitDownloadListener = waitDownloadListener_;
1752 waitDownloadListener_ = nullptr;
1753 }
1754 }
1755 if (waitDownloadListener != nullptr) {
1756 waitDownloadListener->Drop(true);
1757 waitDownloadListener = nullptr;
1758 }
1759 }
1760
DoBackgroundDownloadAssets()1761 void CloudSyncer::DoBackgroundDownloadAssets()
1762 {
1763 bool allDownloadFinish = true;
1764 std::map<std::string, int64_t> downloadBeginTime;
1765 do {
1766 auto [errCode, tables] = storageProxy_->GetDownloadAssetTable();
1767 if (errCode != E_OK) {
1768 LOGE("[CloudSyncer] Get download asset table failed %d", errCode);
1769 return;
1770 }
1771 allDownloadFinish = true;
1772 std::list<std::string> tableQueue(tables.begin(), tables.end());
1773 while (!tableQueue.empty()) {
1774 if (cancelAsyncTask_ || closed_) {
1775 LOGW("[CloudSyncer] exit task by cancel %d closed %d", static_cast<int>(cancelAsyncTask_),
1776 static_cast<int>(closed_));
1777 return;
1778 }
1779 errCode = BackgroundDownloadAssetsByTable(tableQueue.front(), downloadBeginTime);
1780 if (errCode == E_OK) {
1781 allDownloadFinish = false;
1782 } else if (errCode == -E_FINISHED) {
1783 tableQueue.pop_front();
1784 errCode = E_OK;
1785 } else {
1786 LOGW("[CloudSyncer] BackgroundDownloadAssetsByTable table %s failed %d",
1787 DBCommon::StringMiddleMasking(tableQueue.front()).c_str(), errCode);
1788 allDownloadFinish = false;
1789 }
1790 }
1791 } while (!allDownloadFinish);
1792 }
1793
CancelBackgroundDownloadAssetsTaskIfNeed()1794 void CloudSyncer::CancelBackgroundDownloadAssetsTaskIfNeed()
1795 {
1796 bool cancelDownload = true;
1797 {
1798 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1799 if (cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets || asyncTaskId_ == INVALID_TASK_ID) {
1800 return;
1801 }
1802 if (cloudTaskInfos_[currentContext_.currentTaskId].compensatedTask) {
1803 cancelDownload = false;
1804 }
1805 }
1806 CancelBackgroundDownloadAssetsTask(cancelDownload);
1807 }
1808
CancelBackgroundDownloadAssetsTask(bool cancelDownload)1809 void CloudSyncer::CancelBackgroundDownloadAssetsTask(bool cancelDownload)
1810 {
1811 cancelAsyncTask_ = true;
1812 if (cancelDownload) {
1813 cloudDB_.CancelDownload();
1814 }
1815 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1816 if (asyncTaskId_ != INVALID_TASK_ID) {
1817 LOGI("[CloudSyncer] begin wait async download task % " PRIu64 " finished", asyncTaskId_);
1818 asyncTaskCv_.wait(uniqueLock, [this]() {
1819 return asyncTaskId_ == INVALID_TASK_ID;
1820 });
1821 LOGI("[CloudSyncer] async download task has been finished");
1822 }
1823 cancelAsyncTask_ = false;
1824 }
1825
BackgroundDownloadAssetsByTable(const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)1826 int CloudSyncer::BackgroundDownloadAssetsByTable(const std::string &table,
1827 std::map<std::string, int64_t> &downloadBeginTime)
1828 {
1829 auto [errCode, downloadData] = storageProxy_->GetDownloadAssetRecords(table, downloadBeginTime[table]);
1830 if (errCode != E_OK) {
1831 return errCode;
1832 }
1833 if (downloadData.empty()) {
1834 LOGD("[CloudSyncer] table %s async download finished", DBCommon::StringMiddleMasking(table).c_str());
1835 return -E_FINISHED;
1836 }
1837
1838 bool isSharedTable = false;
1839 errCode = storageProxy_->IsSharedTable(table, isSharedTable);
1840 if (errCode != E_OK) {
1841 LOGE("[CloudSyncer] check is shared table failed %d", errCode);
1842 return errCode;
1843 }
1844 DownloadList downloadList;
1845 ChangedData changedAssets;
1846 std::tie(errCode, downloadList, changedAssets) = CloudSyncUtils::GetDownloadListByGid(storageProxy_, downloadData,
1847 table);
1848 if (errCode != E_OK) {
1849 return errCode;
1850 }
1851 CloudSyncUtils::UpdateMaxTimeWithDownloadList(downloadList, table, downloadBeginTime);
1852 std::set<Key> dupHashKeySet;
1853 InnerProcessInfo info;
1854 info.tableName = table;
1855 info.isAsyncDownload = true;
1856
1857 // prepare download data
1858 auto [downloadRecord, removeAssets, downloadAssets] =
1859 GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, false, info);
1860 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1861 std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1862 };
1863 errCode = BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1864 NotifyChangedDataWithDefaultDev(std::move(changedAssets));
1865 return errCode;
1866 }
1867
TagDownloadAssetsForAssetOnly(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)1868 int CloudSyncer::TagDownloadAssetsForAssetOnly(
1869 const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, VBucket &localAssetInfo)
1870 {
1871 Type prefix;
1872 std::vector<Type> pkVals;
1873 int ret = CloudSyncUtils::GetCloudPkVals(
1874 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
1875 if (ret != E_OK) {
1876 // if get pk vals failed, mean cloud data is deteled.
1877 LOGE("[CloudSyncer] TagDownloadAssetsForAssetOnly cannot get primary key value list. %d",
1878 -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1879 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1880 }
1881 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
1882 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
1883 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
1884 return -E_INTERNAL_ERROR;
1885 }
1886 std::map<std::string, Assets> downloadAssetsMap{};
1887 ret = CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(idx, param, downloadAssetsMap);
1888 if (ret != E_OK) {
1889 return ret;
1890 }
1891 param.assetsDownloadList.push_back(
1892 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, OpType::UPDATE, downloadAssetsMap, hashKey,
1893 pkVals, dataInfo.cloudLogInfo.timestamp));
1894 return ret;
1895 }
1896
PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam & param,std::vector<VBucket> & localInfo)1897 int CloudSyncer::PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam ¶m, std::vector<VBucket> &localInfo)
1898 {
1899 int ret = E_OK;
1900 if (param.isAssetsOnly) {
1901 // download and save only asset, ignore other data.
1902 for (auto &item : localInfo) {
1903 ret = storageProxy_->UpdateAssetStatusForAssetOnly(param.tableName, item);
1904 if (ret != E_OK) {
1905 LOGE("[CloudSyncer] Cannot save asset data due to error code %d", ret);
1906 return ret;
1907 }
1908 }
1909 } else {
1910 ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
1911 if (ret != E_OK) {
1912 param.info.downLoadInfo.failCount += param.downloadData.data.size();
1913 LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
1914 }
1915 }
1916 return ret;
1917 }
1918
QueryCloudGidForAssetsOnly(TaskId taskId,SyncParam & param,int64_t groupIdx,std::vector<std::string> & cloudGid)1919 int CloudSyncer::QueryCloudGidForAssetsOnly(
1920 TaskId taskId, SyncParam ¶m, int64_t groupIdx, std::vector<std::string> &cloudGid)
1921 {
1922 auto tableName = param.info.tableName;
1923 QuerySyncObject syncObj = GetQuerySyncObject(tableName);
1924 VBucket extend = {{CloudDbConstant::CURSOR_FIELD, param.cloudWaterMarkForAssetsOnly}};
1925 QuerySyncObject obj;
1926 int ret = syncObj.GetQuerySyncObjectFromGroup(groupIdx, obj);
1927 if (ret != E_OK) {
1928 LOGE("Get query obj from group fail, errCode = %d", ret);
1929 return ret;
1930 }
1931 ret = GetCloudGid(taskId, tableName, obj, cloudGid);
1932 if (ret != E_OK) {
1933 LOGE("Get cloud gid fail, errCode = %d", ret);
1934 }
1935 return ret;
1936 }
1937
GetEmptyGidAssetsMapFromDownloadData(const std::vector<VBucket> & data,std::map<std::string,AssetsMap> & gidAssetsMap)1938 int CloudSyncer::GetEmptyGidAssetsMapFromDownloadData(
1939 const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap)
1940 {
1941 for (uint32_t i = 0; i < data.size(); i++) {
1942 std::string gidStr;
1943 int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, data[i], gidStr);
1944 if (errCode != E_OK) {
1945 LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1946 return errCode;
1947 }
1948 gidAssetsMap[gidStr] = AssetsMap{};
1949 }
1950 return E_OK;
1951 }
1952
SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId,SyncParam & param,std::vector<VBucket> & downloadData,std::map<std::string,AssetsMap> & gidAssetsMap)1953 int CloudSyncer::SetAssetsMapAndEraseDataForAssetsOnly(
1954 TaskId taskId, SyncParam ¶m, std::vector<VBucket> &downloadData, std::map<std::string, AssetsMap> &gidAssetsMap)
1955 {
1956 for (uint32_t i = 0; i < param.groupNum; i++) {
1957 std::vector<std::string> cloudGid;
1958 int ret = QueryCloudGidForAssetsOnly(taskId, param, i, cloudGid);
1959 if ((ret != E_OK && ret != -E_QUERY_END) || cloudGid.empty()) {
1960 LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1961 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1962 }
1963 if (!CloudSyncUtils::SetAssetsMapByCloudGid(cloudGid, param.assetsGroupMap[i], gidAssetsMap)) {
1964 // if group no match data, return error code.
1965 LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1966 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1967 }
1968 }
1969 for (auto iter = downloadData.begin(); iter != downloadData.end();) {
1970 std::string gidStr;
1971 int ret = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, *iter, gidStr);
1972 if (ret != E_OK) {
1973 LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", ret);
1974 return ret;
1975 }
1976
1977 if (DBCommon::IsRecordDelete(*iter)) {
1978 iter = downloadData.erase(iter);
1979 gidAssetsMap.erase(gidStr);
1980 continue;
1981 }
1982
1983 auto assetsMap = gidAssetsMap[gidStr];
1984 if (!CloudSyncUtils::IsAssetOnlyData(*iter, assetsMap, false)) {
1985 iter = downloadData.erase(iter);
1986 gidAssetsMap.erase(gidStr);
1987 continue;
1988 }
1989 ++iter;
1990 }
1991 return E_OK;
1992 }
1993
CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId,SyncParam & param)1994 int CloudSyncer::CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam ¶m)
1995 {
1996 {
1997 std::lock_guard<std::mutex> autoLock(dataLock_);
1998 if (!param.isAssetsOnly || cloudTaskInfos_[taskId].compensatedTask) {
1999 return E_OK;
2000 }
2001 if (!param.isVaildForAssetsOnly) {
2002 param.downloadData.data.clear();
2003 return E_OK;
2004 }
2005 cloudTaskInfos_[taskId].isAssetsOnly = param.isAssetsOnly;
2006 cloudTaskInfos_[taskId].groupNum = param.groupNum;
2007 cloudTaskInfos_[taskId].assetsGroupMap = param.assetsGroupMap;
2008 }
2009
2010 std::vector<VBucket> &downloadData = param.downloadData.data;
2011 auto &gidAssetsMap = param.gidAssetsMap;
2012 gidAssetsMap.clear();
2013 int ret = GetEmptyGidAssetsMapFromDownloadData(downloadData, gidAssetsMap);
2014 if (ret != E_OK) {
2015 return ret;
2016 }
2017
2018 // set assets map for every record and erase not match data.
2019 ret = SetAssetsMapAndEraseDataForAssetsOnly(taskId, param, downloadData, gidAssetsMap);
2020 if (ret != E_OK) {
2021 return ret;
2022 }
2023
2024 for (uint32_t i = 0; i < param.groupNum; i++) {
2025 bool isEmpty = CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(gidAssetsMap, param.assetsGroupMap[i]);
2026 if (isEmpty) {
2027 LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2028 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2029 }
2030 }
2031 return E_OK;
2032 }
2033
CheckLocalQueryAssetsOnlyIfNeed(VBucket & localAssetInfo,SyncParam & param,DataInfoWithLog & logInfo)2034 int CloudSyncer::CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam ¶m, DataInfoWithLog &logInfo)
2035 {
2036 if (!param.isAssetsOnly) {
2037 return E_OK;
2038 }
2039 std::string gid = logInfo.logInfo.cloudGid;
2040 auto iter = param.gidAssetsMap.find(gid);
2041 if (iter == param.gidAssetsMap.end()) {
2042 LOGE("query assets failed, error code:%d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2043 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2044 }
2045
2046 auto assetsMap = iter->second;
2047 if (!CloudSyncUtils::IsAssetOnlyData(localAssetInfo, assetsMap, true)) {
2048 LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2049 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2050 }
2051 return E_OK;
2052 }
2053
IsCurrentAsyncDownloadTask()2054 bool CloudSyncer::IsCurrentAsyncDownloadTask()
2055 {
2056 std::lock_guard<std::mutex> autoLock(dataLock_);
2057 return cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets;
2058 }
2059
CanStartAsyncDownload() const2060 bool CloudSyncer::CanStartAsyncDownload() const
2061 {
2062 if (!RuntimeContext::GetInstance()->GetAssetsDownloadManager()->CanStartNewTask()) {
2063 LOGW("[CloudSyncer] Too many download tasks");
2064 return false;
2065 }
2066 return asyncTaskId_ == INVALID_TASK_ID;
2067 }
2068
NotifyChangedDataWithDefaultDev(ChangedData && changedData)2069 void CloudSyncer::NotifyChangedDataWithDefaultDev(ChangedData &&changedData)
2070 {
2071 auto table = changedData.tableName;
2072 int ret = CloudSyncUtils::NotifyChangeData(CloudDbConstant::DEFAULT_CLOUD_DEV, storageProxy_,
2073 std::move(changedData));
2074 if (ret != E_OK) {
2075 LOGW("[CloudSyncer] Notify %s change data failed %d", DBCommon::StringMiddleMasking(table).c_str(), ret);
2076 }
2077 }
2078
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2079 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2080 {
2081 cloudDB_.SetGenCloudVersionCallback(callback);
2082 }
2083
GetDownloadAssetIndex(TaskId taskId)2084 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2085 {
2086 size_t index = 0u;
2087 std::lock_guard<std::mutex> autoLock(dataLock_);
2088 if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2089 index = resumeTaskInfos_[taskId].lastDownloadIndex;
2090 resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2091 }
2092 return index;
2093 }
2094
GetCurrentTableUploadBatchIndex()2095 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2096 {
2097 std::lock_guard<std::mutex> autoLock(dataLock_);
2098 return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2099 }
2100
ResetCurrentTableUploadBatchIndex()2101 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2102 {
2103 std::lock_guard<std::mutex> autoLock(dataLock_);
2104 if (currentContext_.notifier == nullptr) {
2105 return;
2106 }
2107 currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2108 }
2109
RecordWaterMark(TaskId taskId,Timestamp waterMark)2110 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2111 {
2112 std::lock_guard<std::mutex> autoLock(dataLock_);
2113 resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2114 }
2115
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2116 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2117 {
2118 InnerProcessInfo info;
2119 info.tableName = tableName;
2120 info.tableStatus = ProcessStatus::PROCESSING;
2121 ReloadUploadInfoIfNeed(uploadParam, info);
2122 return info;
2123 }
2124
CopyAndClearTaskInfos()2125 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2126 {
2127 std::vector<CloudTaskInfo> infoList;
2128 std::lock_guard<std::mutex> autoLock(dataLock_);
2129 for (const auto &item: cloudTaskInfos_) {
2130 infoList.push_back(item.second);
2131 }
2132 taskQueue_.clear();
2133 cloudTaskInfos_.clear();
2134 resumeTaskInfos_.clear();
2135 currentContext_.notifier = nullptr;
2136 return infoList;
2137 }
2138
TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)2139 bool CloudSyncer::TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)
2140 {
2141 std::vector<QuerySyncObject> syncQuery;
2142 std::vector<std::string> users;
2143 int errCode =
2144 CloudSyncUtils::GetQueryAndUsersForCompensatedSync(CanStartAsyncDownload(), storageProxy_, users, syncQuery);
2145 if (errCode != E_OK) {
2146 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
2147 // if failed, finshed the task directly.
2148 DoFinished(triggerTaskId, errCode);
2149 return false;
2150 }
2151 if (syncQuery.empty()) {
2152 // if quey is empty, finshed the task directly.
2153 DoFinished(triggerTaskId, E_OK);
2154 return false;
2155 }
2156 std::vector<std::string> userList;
2157 CloudSyncUtils::GetUserListForCompensatedSync(cloudDB_, users, userList);
2158 std::lock_guard<std::mutex> autoLock(dataLock_);
2159 if (cloudTaskInfos_.find(triggerTaskId) == cloudTaskInfos_.end()) {
2160 return false;
2161 }
2162 cloudTaskInfos_[triggerTaskId].users = userList;
2163 cloudTaskInfos_[triggerTaskId].table.clear();
2164 cloudTaskInfos_[triggerTaskId].queryList.clear();
2165 cloudTaskInfos_[triggerTaskId].table.push_back(syncQuery[0].GetRelationTableName());
2166 cloudTaskInfos_[triggerTaskId].queryList.push_back(syncQuery[0]);
2167 return true;
2168 }
2169 }