1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "store_types.h"
31 #include "strategy_factory.h"
32 #include "version.h"
33
34 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)35 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
36 {
37 Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
38 waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
39 RecordWaterMark(taskId, 0u);
40 }
41
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)42 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
43 {
44 std::lock_guard<std::mutex> autoLock(dataLock_);
45 std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
46 cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
47 }
48
ReloadUploadInfoIfNeed(const UploadParam & param,InnerProcessInfo & info)49 void CloudSyncer::ReloadUploadInfoIfNeed(const UploadParam ¶m, InnerProcessInfo &info)
50 {
51 info.upLoadInfo.total = static_cast<uint32_t>(param.count);
52 Info lastUploadInfo;
53 UploadRetryInfo retryInfo;
54 GetLastUploadInfo(info.tableName, lastUploadInfo, retryInfo);
55 info.upLoadInfo.total += lastUploadInfo.successCount;
56 info.upLoadInfo.successCount += lastUploadInfo.successCount;
57 info.upLoadInfo.failCount += lastUploadInfo.failCount;
58 info.upLoadInfo.insertCount += lastUploadInfo.insertCount;
59 info.upLoadInfo.updateCount += lastUploadInfo.updateCount;
60 info.upLoadInfo.deleteCount += lastUploadInfo.deleteCount;
61 info.retryInfo.uploadBatchRetryCount = retryInfo.uploadBatchRetryCount;
62 LOGD("[CloudSyncer] resume upload, last success count %" PRIu32 ", last fail count %" PRIu32,
63 lastUploadInfo.successCount, lastUploadInfo.failCount);
64 }
65
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,UploadRetryInfo & retryInfo)66 void CloudSyncer::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo, UploadRetryInfo &retryInfo)
67 {
68 std::lock_guard<std::mutex> autoLock(dataLock_);
69 return currentContext_.notifier->GetLastUploadInfo(tableName, lastUploadInfo, retryInfo);
70 }
71
GetCloudGidAndFillExtend(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,VBucket & extend)72 int CloudSyncer::GetCloudGidAndFillExtend(TaskId taskId, const std::string &tableName, QuerySyncObject &obj,
73 VBucket &extend)
74 {
75 int errCode = GetCloudGid(taskId, tableName, obj);
76 if (errCode != E_OK) {
77 LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
78 return errCode;
79 }
80 return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
81 }
82
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)83 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
84 VBucket &extend)
85 {
86 extend = {
87 {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
88 };
89
90 QuerySyncObject obj = GetQuerySyncObject(tableName);
91 if (obj.IsContainQueryNodes()) {
92 return GetCloudGidAndFillExtend(taskId, tableName, obj, extend);
93 }
94 if (IsCompensatedTask(taskId)) {
95 int errCode = GetCloudGid(taskId, tableName, obj);
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 if (obj.IsContainQueryNodes()) {
100 return CloudStorageUtils::FillCloudQueryToExtend(obj, extend);
101 }
102 }
103 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
104 return E_OK;
105 }
106
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)107 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
108 {
109 std::vector<std::string> cloudGid;
110 int errCode = storageProxy_->GetCloudGid(obj, IsCloudForcePush(taskId), IsCompensatedTask(taskId), cloudGid);
111 if (errCode != E_OK) {
112 LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
113 } else if (!cloudGid.empty()) {
114 obj.SetCloudGid(cloudGid);
115 }
116 LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
117 return errCode;
118 }
119
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj,std::vector<std::string> & cloudGid)120 int CloudSyncer::GetCloudGid(
121 TaskId taskId, const std::string &tableName, QuerySyncObject &obj, std::vector<std::string> &cloudGid)
122 {
123 int errCode = storageProxy_->GetCloudGid(obj, IsCloudForcePush(taskId), IsCompensatedTask(taskId), cloudGid);
124 if (errCode != E_OK) {
125 LOGE("[CloudSyncer] Failed to get cloud gid, taskid:%" PRIu64 ", table name: %s, length: %zu, %d.",
126 taskId, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), errCode);
127 }
128 return errCode;
129 }
130
GetQuerySyncObject(const std::string & tableName)131 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
132 {
133 std::lock_guard<std::mutex> autoLock(dataLock_);
134 for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
135 if (item.GetTableName() == tableName) {
136 return item;
137 }
138 }
139 LOGW("[CloudSyncer] not found query in cache");
140 QuerySyncObject querySyncObject;
141 querySyncObject.SetTableName(tableName);
142 return querySyncObject;
143 }
144
UpdateProcessWhenUploadFailed(InnerProcessInfo & info)145 void CloudSyncer::UpdateProcessWhenUploadFailed(InnerProcessInfo &info)
146 {
147 info.tableStatus = ProcessStatus::FINISHED;
148 std::lock_guard<std::mutex> autoLock(dataLock_);
149 currentContext_.notifier->UpdateProcess(info);
150 currentContext_.notifier->UpdateUploadRetryInfo(info);
151 }
152
NotifyUploadFailed(int errCode,InnerProcessInfo & info)153 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
154 {
155 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
156 LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
157 } else {
158 LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
159 info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
160 }
161 UpdateProcessWhenUploadFailed(info);
162 }
163
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)164 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
165 {
166 return BatchInsertOrUpdate(insertInfo, uploadData, innerProcessInfo, true);
167 }
168
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)169 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
170 {
171 return BatchInsertOrUpdate(updateInfo, uploadData, innerProcessInfo, false);
172 }
173
BatchInsertOrUpdate(Info & uploadInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo,bool isInsert)174 int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo,
175 bool isInsert)
176 {
177 uint32_t retryCount = 0;
178 int errCode = E_OK;
179 if (isInsert) {
180 errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
181 uploadData.insData.extend, uploadInfo, retryCount);
182 innerProcessInfo.upLoadInfo.insertCount += uploadInfo.successCount;
183 } else {
184 errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
185 uploadData.updData.extend, uploadInfo, retryCount);
186 innerProcessInfo.upLoadInfo.updateCount += uploadInfo.successCount;
187 }
188 innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount;
189 innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount;
190 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
191 ProcessVersionConflictInfo(innerProcessInfo, retryCount);
192 }
193 if (errCode != E_OK) {
194 LOGE("[CloudSyncer][BatchInsertOrUpdate] BatchInsertOrUpdate with error, ret is %d.", errCode);
195 }
196 if (uploadData.isCloudVersionRecord) {
197 return errCode;
198 }
199 return BackFillAfterBatchUpload(uploadData, isInsert, errCode);
200 }
201
BackFillAfterBatchUpload(CloudSyncData & uploadData,bool isInsert,int batchUploadResult)202 int CloudSyncer::BackFillAfterBatchUpload(CloudSyncData &uploadData, bool isInsert, int batchUploadResult)
203 {
204 bool isSharedTable = false;
205 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
206 if (ret != E_OK) {
207 LOGE("[CloudSyncer] BackFillAfterBatchUpload cannot judge the table is shared table. %d", ret);
208 return ret;
209 }
210 int errCode = batchUploadResult;
211 if (!isSharedTable) {
212 CloudWaterType type = isInsert ? CloudWaterType::INSERT : CloudWaterType::UPDATE;
213 CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
214 ret = CloudSyncUtils::FillAssetIdToAssets(data, errCode, type);
215 if (ret != E_OK) {
216 LOGW("[CloudSyncer][BackFillAfterBatchUpload] FillAssetIdToAssets with error, ret is %d.", ret);
217 }
218 }
219 OpType opType = isInsert ? OpType::INSERT : OpType::UPDATE;
220 if (errCode != E_OK) {
221 storageProxy_->FillCloudGidIfSuccess(opType, uploadData);
222 CloudSyncBatch &data = isInsert ? uploadData.insData : uploadData.updData;
223 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(data.extend);
224 if (isSkip) {
225 LOGI("[CloudSyncer][BackFillAfterBatchUpload] Try to FillCloudLogAndAsset when assets missing: %d",
226 errCode);
227 return E_OK;
228 } else {
229 LOGE("[CloudSyncer][BackFillAfterBatchUpload] errCode: %d, can not skip assets missing record.", errCode);
230 return errCode;
231 }
232 }
233 int errorCode = storageProxy_->FillCloudLogAndAsset(opType, uploadData);
234 if ((errorCode != E_OK) || (ret != E_OK)) {
235 LOGE("[CloudSyncer] Failed to fill back when doing upload insData or updData, %d.", errorCode);
236 return ret == E_OK ? errorCode : ret;
237 }
238 return E_OK;
239 }
240
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)241 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
242 std::map<std::string, Assets> &downloadAssets)
243 {
244 bool isSharedTable = false;
245 int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
246 if (errCode != E_OK) {
247 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
248 return errCode;
249 }
250 int transactionCode = E_OK;
251 // shared table don't download, so just begin transaction once
252 if (isSharedTable) {
253 transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
254 }
255 if (transactionCode != E_OK) {
256 LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
257 return transactionCode;
258 }
259 errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
260 if (isSharedTable) {
261 transactionCode = storageProxy_->Commit();
262 if (transactionCode != E_OK) {
263 LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
264 }
265 }
266 return (errCode == E_OK) ? transactionCode : errCode;
267 }
268
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)269 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
270 const DownloadItem &downloadItem, VBucket &dbAssets)
271 {
272 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
273 auto &errCode = res.first;
274 if (!isSharedTable) {
275 errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
276 }
277 if (errCode != E_OK) {
278 LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
279 return res;
280 }
281 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
282 downloadItem.hashKey, dbAssets);
283 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
284 if (errCode != -E_CLOUD_GID_MISMATCH) {
285 LOGE("[CloudSyncer] get assets from db failed %d", errCode);
286 }
287 if (!isSharedTable) {
288 (void)storageProxy_->Rollback();
289 }
290 return res;
291 }
292 if (!isSharedTable) {
293 errCode = storageProxy_->Commit();
294 }
295 if (errCode != E_OK) {
296 LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
297 }
298 return res;
299 }
300
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)301 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
302 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
303 std::map<std::string, Assets> &tmpAssetsToDelete)
304 {
305 std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
306 for (auto &[col, assets] : tmpAssetsToDownload) {
307 int i = 0;
308 for (auto &asset : assets) {
309 asset.flag = tmpFlags[col][i++];
310 if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
311 continue;
312 }
313 if (downloadCode == E_OK) {
314 asset.status = NORMAL;
315 } else {
316 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
317 }
318 }
319 }
320 for (auto &[col, assets] : tmpAssetsToDelete) {
321 for (auto &asset : assets) {
322 asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
323 if (deleteCode == E_OK) {
324 asset.status = NORMAL;
325 } else {
326 asset.status = ABNORMAL;
327 }
328 downloadAssets[col].push_back(asset);
329 }
330 }
331 return downloadAssets;
332 }
333
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)334 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
335 const DownloadItem &downloadItem, VBucket &dbAssets)
336 {
337 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK)};
338 auto &ret = res.first;
339 auto &status = res.second;
340 if (info.isAsyncDownload) {
341 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, info.isAsyncDownload, downloadItem.gid,
342 downloadItem.hashKey, dbAssets);
343 } else {
344 res = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
345 }
346 if (ret == -E_CLOUD_GID_MISMATCH) {
347 LOGW("[CloudSyncer] skip download asset because gid mismatch");
348 errCode = E_OK;
349 return true;
350 }
351 if (CloudStorageUtils::IsDataLocked(status)) {
352 LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
353 errCode = E_OK;
354 return true;
355 }
356 if (ret != E_OK) {
357 LOGE("[CloudSyncer]%s download get assets from DB failed: %d, return errCode: %d",
358 info.isAsyncDownload ? "Async" : "Sync", ret, errCode);
359 errCode = (errCode != E_OK) ? errCode : ret;
360 return true;
361 }
362 return false;
363 }
364
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)365 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
366 {
367 if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
368 downloadItem.recordConflict = true;
369 errCode = E_OK;
370 return false;
371 }
372 errCode = (errCode != E_OK) ? errCode : deleteCode;
373 errCode = (errCode != E_OK) ? errCode : downloadCode;
374 if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
375 return false;
376 }
377 return true;
378 }
379
GetAssetsToDownload(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToDownload,std::map<std::string,std::vector<uint32_t>> & tmpFlags)380 void GetAssetsToDownload(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
381 std::map<std::string, Assets> &assetsToDownload, std::map<std::string, std::vector<uint32_t>> &tmpFlags)
382 {
383 if (isSharedTable) {
384 LOGD("[CloudSyncer] skip download for shared table");
385 return;
386 }
387 for (auto &[col, assets] : downloadAssets) {
388 for (auto &asset : assets) {
389 if (asset.flag != static_cast<uint32_t>(AssetOpType::DELETE) &&
390 AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
391 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
392 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
393 static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
394 assetsToDownload[col].push_back(asset);
395 tmpFlags[col].push_back(asset.flag);
396 } else {
397 LOGD("[CloudSyncer] skip download asset...");
398 }
399 }
400 }
401 }
402
GetAssetsToRemove(std::map<std::string,Assets> & downloadAssets,VBucket & dbAssets,bool isSharedTable,std::map<std::string,Assets> & assetsToRemove)403 void GetAssetsToRemove(std::map<std::string, Assets> &downloadAssets, VBucket &dbAssets, bool isSharedTable,
404 std::map<std::string, Assets> &assetsToRemove)
405 {
406 if (isSharedTable) {
407 LOGD("[CloudSyncer] skip remove for shared table");
408 return;
409 }
410 for (auto &[col, assets] : downloadAssets) {
411 for (auto &asset : assets) {
412 if (asset.flag == static_cast<uint32_t>(AssetOpType::DELETE) &&
413 AssetOperationUtils::CalAssetRemoveOperation(col, asset, dbAssets) ==
414 AssetOperationUtils::AssetOpType::HANDLE) {
415 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
416 assetsToRemove[col].push_back(asset);
417 }
418 }
419 }
420 }
421
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)422 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
423 DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
424 {
425 int errCode = E_OK;
426 VBucket dbAssets;
427 std::map<std::string, Assets> tmpAssetsToRemove;
428 if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
429 GetAssetsToRemove(downloadAssets, dbAssets, isSharedTable, tmpAssetsToRemove);
430 }
431 auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
432 tmpAssetsToRemove);
433
434 std::map<std::string, Assets> tmpAssetsToDownload;
435 std::map<std::string, std::vector<uint32_t>> tmpFlags;
436 if (!IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
437 GetAssetsToDownload(downloadAssets, dbAssets, isSharedTable, tmpAssetsToDownload, tmpFlags);
438 }
439 auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
440 if (downloadCode != SKIP_ASSET && !CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
441 return errCode;
442 }
443
444 // copy asset back
445 downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
446 tmpAssetsToRemove);
447 return errCode;
448 }
449
SeparateNormalAndFailAssets(const std::map<std::string,Assets> & assetsMap,VBucket & normalAssets,VBucket & failedAssets)450 void CloudSyncer::SeparateNormalAndFailAssets(const std::map<std::string, Assets> &assetsMap, VBucket &normalAssets,
451 VBucket &failedAssets)
452 {
453 for (auto &[key, assets] : assetsMap) {
454 Assets tempFailedAssets;
455 Assets tempNormalAssets;
456 int32_t otherStatusCnt = 0;
457 for (auto &asset : assets) {
458 if (asset.status == AssetStatus::NORMAL) {
459 tempNormalAssets.push_back(asset);
460 } else if (asset.status == AssetStatus::ABNORMAL) {
461 tempFailedAssets.push_back(asset);
462 } else {
463 otherStatusCnt++;
464 LOGW("[CloudSyncer] download err, statue %d count %d", asset.status, otherStatusCnt);
465 }
466 }
467 LOGI("[CloudSyncer] download asset times, normalCount %zu, abnormalCount %zu, otherStatusCnt %d",
468 tempNormalAssets.size(), tempFailedAssets.size(), otherStatusCnt);
469 if (tempFailedAssets.size() > 0) {
470 failedAssets[key] = std::move(tempFailedAssets);
471 }
472 if (tempNormalAssets.size() > 0) {
473 normalAssets[key] = std::move(tempNormalAssets);
474 }
475 }
476 }
477
CommitDownloadAssets(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)478 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, InnerProcessInfo &info,
479 DownloadCommitList &commitList, uint32_t &successCount)
480 {
481 int errCode = storageProxy_->SetLogTriggerStatus(false);
482 if (errCode != E_OK) {
483 return errCode;
484 }
485 for (auto &item : commitList) {
486 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
487 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
488 std::map<std::string, Assets> assetsMap = std::get<1>(item);
489 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
490 VBucket normalAssets;
491 VBucket failedAssets;
492 normalAssets[CloudDbConstant::GID_FIELD] = gid;
493 failedAssets[CloudDbConstant::GID_FIELD] = gid;
494 if (setAllNormal) {
495 for (auto &[key, asset] : assetsMap) {
496 normalAssets[key] = std::move(asset);
497 }
498 } else {
499 SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
500 }
501 if (!downloadItem.recordConflict) {
502 errCode = FillCloudAssets(info, normalAssets, failedAssets);
503 if (errCode != E_OK) {
504 break;
505 }
506 }
507 LogInfo logInfo;
508 logInfo.cloudGid = gid;
509 // download must contain gid, just set the default value here.
510 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
511 logInfo.hashKey = downloadItem.hashKey;
512 logInfo.timestamp = downloadItem.timestamp;
513 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
514 if (failedAssets.size() > 1) {
515 logInfo.timestamp = 0u;
516 }
517
518 errCode = storageProxy_->UpdateRecordFlag(
519 info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
520 if (errCode != E_OK) {
521 break;
522 }
523 successCount++;
524 }
525 int ret = storageProxy_->SetLogTriggerStatus(true);
526 return errCode == E_OK ? ret : errCode;
527 }
528
FillCloudAssetsForOneRecord(const std::string & gid,const std::map<std::string,Assets> & assetsMap,InnerProcessInfo & info,bool setAllNormal,bool & isExistAssetDownloadFail)529 int CloudSyncer::FillCloudAssetsForOneRecord(const std::string &gid, const std::map<std::string, Assets> &assetsMap,
530 InnerProcessInfo &info, bool setAllNormal, bool &isExistAssetDownloadFail)
531 {
532 VBucket normalAssets;
533 VBucket failedAssets;
534 normalAssets[CloudDbConstant::GID_FIELD] = gid;
535 failedAssets[CloudDbConstant::GID_FIELD] = gid;
536 if (setAllNormal) {
537 for (auto &[key, asset] : assetsMap) {
538 normalAssets[key] = std::move(asset);
539 }
540 } else {
541 SeparateNormalAndFailAssets(assetsMap, normalAssets, failedAssets);
542 }
543 isExistAssetDownloadFail = failedAssets.size() > 1; // There is initially one element present
544 return FillCloudAssets(info, normalAssets, failedAssets);
545 }
546
UpdateRecordFlagForOneRecord(const std::string & gid,const DownloadItem & downloadItem,InnerProcessInfo & info,bool isExistAssetDownloadFail)547 int CloudSyncer::UpdateRecordFlagForOneRecord(const std::string &gid, const DownloadItem &downloadItem,
548 InnerProcessInfo &info, bool isExistAssetDownloadFail)
549 {
550 LogInfo logInfo;
551 logInfo.cloudGid = gid;
552 // download must contain gid, just set the default value here.
553 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
554 logInfo.hashKey = downloadItem.hashKey;
555 logInfo.timestamp = downloadItem.timestamp;
556 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
557 if (isExistAssetDownloadFail) {
558 logInfo.timestamp = 0u;
559 }
560 return storageProxy_->UpdateRecordFlag(info.tableName, info.isAsyncDownload, downloadItem.recordConflict, logInfo);
561 }
562
CommitDownloadAssetsForAsyncDownload(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)563 int CloudSyncer::CommitDownloadAssetsForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info,
564 DownloadCommitList &commitList, uint32_t &successCount)
565 {
566 int errCode = storageProxy_->SetLogTriggerStatus(false, true);
567 if (errCode != E_OK) {
568 return errCode;
569 }
570 for (auto &item : commitList) {
571 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
572 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
573 std::map<std::string, Assets> assetsMap = std::get<1>(item);
574 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
575 LockStatus status = LockStatus::BUTT;
576 errCode = storageProxy_->GetLockStatusByGid(info.tableName, gid, status);
577 if (errCode == E_OK && status != LockStatus::UNLOCK) {
578 continue;
579 }
580
581 bool isExistAssetDownloadFail = false;
582 if (!downloadItem.recordConflict) {
583 errCode = FillCloudAssetsForOneRecord(gid, assetsMap, info, setAllNormal, isExistAssetDownloadFail);
584 if (errCode != E_OK) {
585 break;
586 }
587 }
588
589 errCode = UpdateRecordFlagForOneRecord(gid, downloadItem, info, isExistAssetDownloadFail);
590 if (errCode != E_OK) {
591 break;
592 }
593 successCount++;
594 }
595 int ret = storageProxy_->SetLogTriggerStatus(true, true);
596 return errCode == E_OK ? ret : errCode;
597 }
598
GenerateCompensatedSync(CloudTaskInfo & taskInfo)599 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
600 {
601 std::vector<QuerySyncObject> syncQuery;
602 std::vector<std::string> users;
603 int errCode = CloudSyncUtils::GetQueryAndUsersForCompensatedSync(
604 CloudSyncUtils::CanStartAsyncDownload(scheduleTaskCount_), storageProxy_, users, syncQuery);
605 if (errCode != E_OK) {
606 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
607 return;
608 }
609 if (syncQuery.empty()) {
610 LOGD("[CloudSyncer] Not need generate compensated sync");
611 return;
612 }
613 for (const auto &it : syncQuery) {
614 CloudTaskInfo compensatedTaskInfo = taskInfo;
615 compensatedTaskInfo.queryList.push_back(it);
616 Sync(compensatedTaskInfo);
617 taskInfo.callback = nullptr;
618 LOGI("[CloudSyncer] Generate compensated sync finished");
619 }
620 }
621
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)622 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
623 {
624 if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
625 return;
626 }
627 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
628 if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
629 return;
630 }
631 if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
632 return;
633 }
634 info.tableStatus = ProcessStatus::FINISHED;
635 info.upLoadInfo.batchIndex++;
636 NotifyInBatchUpload(uploadParam, info, true);
637 }
638
SaveCursorIfNeed(const std::string & tableName)639 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
640 {
641 std::string cursor = "";
642 int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
643 if (errCode != E_OK) {
644 LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
645 return errCode;
646 }
647 if (!cursor.empty()) {
648 return E_OK;
649 }
650 auto res = cloudDB_.GetEmptyCursor(tableName);
651 if (res.first != E_OK) {
652 LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
653 return res.first;
654 }
655 if (res.second.empty()) {
656 LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
657 return -E_CLOUD_ERROR;
658 }
659 errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
660 if (errCode != E_OK) {
661 LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
662 }
663 return errCode;
664 }
665
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)666 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
667 {
668 std::string hashDev;
669 int errCode = RuntimeContext::GetInstance()->GetLocalIdentity(hashDev);
670 if (errCode != E_OK) {
671 LOGE("[CloudSyncer] Failed to get local identity.");
672 return errCode;
673 }
674 errCode = SaveCursorIfNeed(table);
675 if (errCode != E_OK) {
676 return errCode;
677 }
678 errCode = CheckTaskIdValid(taskInfo.taskId);
679 if (errCode != E_OK) {
680 LOGW("[CloudSyncer] task is invalid, abort sync");
681 return errCode;
682 }
683 errCode = DoDownload(taskInfo.taskId, isFirstDownload);
684 if (errCode != E_OK) {
685 LOGE("[CloudSyncer] download failed %d", errCode);
686 }
687 return errCode;
688 }
689
IsClosed() const690 bool CloudSyncer::IsClosed() const
691 {
692 return closed_ || IsKilled();
693 }
694
UpdateFlagForSavedRecord(const SyncParam & param)695 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam ¶m)
696 {
697 DownloadList downloadList;
698 {
699 std::lock_guard<std::mutex> autoLock(dataLock_);
700 downloadList = currentContext_.assetDownloadList;
701 }
702 std::set<std::string> downloadGid;
703 for (const auto &tuple : downloadList) {
704 if (CloudSyncUtils::IsContainDownloading(tuple)) {
705 downloadGid.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
706 }
707 }
708 if (IsCurrentAsyncDownloadTask()) {
709 int errCode = storageProxy_->MarkFlagAsAssetAsyncDownload(param.tableName, param.downloadData, downloadGid);
710 if (errCode != E_OK) {
711 LOGE("[CloudSyncer] Failed to mark flag consistent errCode %d", errCode);
712 return errCode;
713 }
714 }
715 return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, downloadGid);
716 }
717
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)718 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
719 {
720 uint32_t retryCount = 0;
721 int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
722 uploadData.delData.extend, deleteInfo, retryCount);
723 innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
724 innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
725 innerProcessInfo.upLoadInfo.failCount += deleteInfo.failCount;
726 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
727 ProcessVersionConflictInfo(innerProcessInfo, retryCount);
728 }
729 if (errCode != E_OK) {
730 LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
731 storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
732 return errCode;
733 }
734 errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
735 if (errCode != E_OK) {
736 LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
737 }
738 return errCode;
739 }
740
IsCompensatedTask(TaskId taskId)741 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
742 {
743 std::lock_guard<std::mutex> autoLock(dataLock_);
744 return cloudTaskInfos_[taskId].compensatedTask;
745 }
746
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)747 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
748 {
749 return cloudDB_.SetCloudDB(cloudDBs);
750 }
751
CleanAllWaterMark()752 void CloudSyncer::CleanAllWaterMark()
753 {
754 storageProxy_->CleanAllWaterMark();
755 }
756
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)757 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
758 {
759 downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
760 downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
761 downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
762 downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
763 downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
764 downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
765 downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
766 }
767
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)768 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
769 const bool isFirstDownload)
770 {
771 bool isNeedNotify = false;
772 bool isLockAction = IsLockInDownload();
773 {
774 std::lock_guard<std::mutex> autoLock(dataLock_);
775 // only when the first download and the task no need upload actually, notify the process, otherwise,
776 // the process will notify in the upload procedure, which can guarantee the notify order of the tables
777 isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload && isLockAction;
778 }
779 if (!isNeedNotify) {
780 return;
781 }
782 for (size_t i = 0; i < needNotifyTables.size(); ++i) {
783 UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
784 }
785 }
786
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)787 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
788 {
789 std::string tableName;
790 int ret = GetCurrentTableName(tableName);
791 if (ret != E_OK) {
792 LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
793 return ret;
794 }
795
796 ret = storageProxy_->StartTransaction();
797 if (ret != E_OK) {
798 LOGE("[CloudSyncer] start transaction failed before getting upload count.");
799 return ret;
800 }
801
802 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
803 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
804 if (ret != E_OK) {
805 // GetUploadCount will return E_OK when upload count is zero.
806 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
807 }
808 // No need Rollback when GetUploadCount failed
809 storageProxy_->Commit();
810 return ret;
811 }
812
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)813 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
814 bool needNotify)
815 {
816 LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
817 InnerProcessInfo innerProcessInfo;
818 innerProcessInfo.tableName = tableName;
819 innerProcessInfo.upLoadInfo.total = 0; // count is zero
820 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
821 {
822 std::lock_guard<std::mutex> autoLock(dataLock_);
823 if (!needNotify) {
824 currentContext_.notifier->UpdateProcess(innerProcessInfo);
825 } else {
826 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
827 }
828 }
829 }
830
SetNeedUpload(bool isNeedUpload)831 void CloudSyncer::SetNeedUpload(bool isNeedUpload)
832 {
833 std::lock_guard<std::mutex> autoLock(dataLock_);
834 currentContext_.isNeedUpload = isNeedUpload;
835 }
836
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)837 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
838 {
839 std::vector<std::string> needNotifyTables;
840 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
841 std::string table;
842 {
843 std::lock_guard<std::mutex> autoLock(dataLock_);
844 if (currentContext_.processRecorder == nullptr) {
845 LOGE("[CloudSyncer] process recorder of current context is nullptr.");
846 return -E_INTERNAL_ERROR;
847 }
848 if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
849 taskInfo.table[i])) {
850 continue;
851 }
852 LOGD("[CloudSyncer] try download table, index: %zu, table name: %s, length: %zu",
853 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
854 currentContext_.tableName = taskInfo.table[i];
855 table = currentContext_.tableName;
856 }
857 int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
858 if (errCode != E_OK) {
859 return errCode;
860 }
861 CheckDataAfterDownload(table);
862 MarkDownloadFinishIfNeed(table);
863 // needUpload indicate that the syncMode need push
864 if (needUpload) {
865 int64_t count = 0;
866 errCode = GetUploadCountByTable(taskInfo.taskId, count);
867 if (errCode != E_OK) {
868 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
869 return errCode;
870 }
871 // count > 0 means current table need upload actually
872 if (count > 0) {
873 SetNeedUpload(true);
874 continue;
875 }
876 needNotifyTables.emplace_back(table);
877 }
878 errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
879 if (errCode != E_OK) {
880 LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
881 return errCode;
882 }
883 }
884 DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
885 TriggerAsyncDownloadAssetsInTaskIfNeed(isFirstDownload);
886 return E_OK;
887 }
888
IsNeedGetLocalWater(TaskId taskId)889 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
890 {
891 return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
892 !IsCompensatedTask(taskId);
893 }
894
TryToAddSyncTask(CloudTaskInfo && taskInfo)895 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
896 {
897 if (closed_) {
898 LOGW("[CloudSyncer] syncer is closed, should not sync now");
899 return -E_DB_CLOSED;
900 }
901 std::shared_ptr<DataBaseSchema> cloudSchema;
902 int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
903 if (errCode != E_OK) {
904 LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
905 return errCode;
906 }
907 std::lock_guard<std::mutex> autoLock(dataLock_);
908 taskInfo.priorityLevel = (!taskInfo.priorityTask || taskInfo.compensatedTask)
909 ? CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL
910 : taskInfo.priorityLevel;
911 errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
912 if (errCode != E_OK) {
913 return errCode;
914 }
915 errCode = GenerateTaskIdIfNeed(taskInfo);
916 if (errCode != E_OK) {
917 return errCode;
918 }
919 auto taskId = taskInfo.taskId;
920 cloudTaskInfos_[taskId] = std::move(taskInfo);
921 taskQueue_.insert({cloudTaskInfos_[taskId].priorityLevel, taskId});
922 LOGI("[CloudSyncer]Add task ok, storeId %.3s, priority %d, priorityLevel %" PRId32 ", taskId %" PRIu64 " async %d",
923 cloudTaskInfos_[taskId].storeId.c_str(), cloudTaskInfos_[taskId].priorityTask,
924 cloudTaskInfos_[taskId].priorityLevel, cloudTaskInfos_[taskId].taskId,
925 static_cast<int>(cloudTaskInfos_[taskId].asyncDownloadAssets));
926 MarkCurrentTaskPausedIfNeed(taskInfo);
927 if (!cloudTaskInfos_[taskId].priorityTask) {
928 MergeTaskInfo(cloudSchema, taskId);
929 }
930 return E_OK;
931 }
932
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)933 void CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
934 {
935 if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
936 return;
937 }
938 bool isMerge = false;
939 TaskId checkTaskId = taskId;
940 do {
941 std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
942 } while (isMerge);
943 }
944
RemoveTaskFromQueue(int32_t priorityLevel,TaskId taskId)945 void CloudSyncer::RemoveTaskFromQueue(int32_t priorityLevel, TaskId taskId)
946 {
947 for (auto it = taskQueue_.find(priorityLevel); it != taskQueue_.end(); ++it) {
948 if (it->second == taskId) {
949 taskQueue_.erase(it);
950 return;
951 }
952 }
953 }
954
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)955 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
956 {
957 std::pair<bool, TaskId> res;
958 auto &[merge, nextTryTask] = res;
959 TaskId beMergeTask = INVALID_TASK_ID;
960 TaskId runningTask = currentContext_.currentTaskId;
961 auto commonLevelTask = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
962 for (auto it = commonLevelTask.first; it != commonLevelTask.second; ++it) {
963 TaskId taskId = it->second;
964 if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
965 continue;
966 }
967 if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
968 continue;
969 }
970 if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
971 beMergeTask = taskId;
972 nextTryTask = tryTaskId;
973 merge = true;
974 break;
975 }
976 if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
977 beMergeTask = tryTaskId;
978 nextTryTask = taskId;
979 merge = true;
980 break;
981 }
982 }
983 if (!merge) { // LCOV_EXCL_BR_LINE
984 return res;
985 }
986 if (beMergeTask > nextTryTask) { // LCOV_EXCL_BR_LINE
987 std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
988 }
989 AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
990 auto processNotifier = std::make_shared<ProcessNotifier>(this);
991 processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
992 cloudTaskInfos_[beMergeTask].users);
993 cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
994 cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
995 processNotifier->SetAllTableFinish();
996 processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
997 RemoveTaskFromQueue(cloudTaskInfos_[beMergeTask].priorityLevel, beMergeTask);
998 cloudTaskInfos_.erase(beMergeTask);
999 LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
1000 return res;
1001 }
1002
IsTaskCanMerge(const CloudTaskInfo & taskInfo)1003 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
1004 {
1005 return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
1006 taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
1007 }
1008
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)1009 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
1010 {
1011 const auto &taskInfo = cloudTaskInfos_[taskId];
1012 const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
1013 return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
1014 taskInfo.devices == tryMergeTaskInfo.devices &&
1015 taskInfo.asyncDownloadAssets == tryMergeTaskInfo.asyncDownloadAssets;
1016 }
1017
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)1018 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
1019 {
1020 const auto &source = cloudTaskInfos_[sourceId];
1021 const auto &target = cloudTaskInfos_[targetId];
1022 bool isMerge = true;
1023 for (const auto &table : source.table) {
1024 if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
1025 isMerge = false;
1026 break;
1027 }
1028 }
1029 return isMerge;
1030 }
1031
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)1032 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
1033 CloudTaskInfo &taskInfo)
1034 {
1035 std::vector<std::string> tmpTables = taskInfo.table;
1036 taskInfo.table.clear();
1037 taskInfo.queryList.clear();
1038 for (const auto &table : cloudSchema->tables) {
1039 if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
1040 taskInfo.table.push_back(table.name);
1041 QuerySyncObject querySyncObject;
1042 querySyncObject.SetTableName(table.name);
1043 taskInfo.queryList.push_back(querySyncObject);
1044 }
1045 }
1046 }
1047
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)1048 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
1049 {
1050 cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
1051 cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
1052 std::set<std::string> users;
1053 users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
1054 users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
1055 cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
1056 return {target, source};
1057 }
1058
IsQueryListEmpty(TaskId taskId)1059 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
1060 {
1061 std::lock_guard<std::mutex> autoLock(dataLock_);
1062 return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
1063 [](const auto &item) {
1064 return item.IsContainQueryNodes();
1065 });
1066 }
1067
IsNeedProcessCloudCursor(TaskId taskId)1068 bool CloudSyncer::IsNeedProcessCloudCursor(TaskId taskId)
1069 {
1070 // Compensated task no need to save/get cloud cursor
1071 return IsQueryListEmpty(taskId) && !IsCompensatedTask(taskId);
1072 }
1073
IsNeedLock(const UploadParam & param)1074 bool CloudSyncer::IsNeedLock(const UploadParam ¶m)
1075 {
1076 return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
1077 }
1078
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)1079 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
1080 {
1081 std::pair<int, Timestamp> res = { E_OK, 0u };
1082 if (IsNeedGetLocalWater(uploadParam.taskId)) {
1083 res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
1084 }
1085 uploadParam.localMark = res.second;
1086 return res;
1087 }
1088
ChangeProcessStatusAndNotifyIfNeed(UploadParam & uploadParam,InnerProcessInfo & info)1089 void CloudSyncer::ChangeProcessStatusAndNotifyIfNeed(UploadParam &uploadParam, InnerProcessInfo &info)
1090 {
1091 if (info.tableStatus == ProcessStatus::FINISHED) {
1092 // if process here, the process should't be finished and notify.
1093 info.tableStatus = ProcessStatus::PROCESSING;
1094 NotifyInBatchUpload(uploadParam, info, false);
1095 }
1096 }
1097
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken,std::vector<ReviseModTimeInfo> & revisedData)1098 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
1099 CloudSyncData &uploadData, ContinueToken &continueStmtToken, std::vector<ReviseModTimeInfo> &revisedData)
1100 {
1101 int ret = E_OK;
1102 uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
1103 bool isLocked = false;
1104 while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
1105 ChangeProcessStatusAndNotifyIfNeed(uploadParam, info);
1106 revisedData.insert(revisedData.end(), uploadData.revisedData.begin(), uploadData.revisedData.end());
1107 ret = PreProcessBatchUpload(uploadParam, info, uploadData);
1108 if (ret != E_OK) {
1109 break;
1110 }
1111 info.upLoadInfo.batchIndex = ++batchIndex;
1112 if (IsNeedLock(uploadParam) && !isLocked) {
1113 ret = LockCloudIfNeed(uploadParam.taskId);
1114 if (ret != E_OK) {
1115 break;
1116 }
1117 isLocked = true;
1118 }
1119 ret = DoBatchUpload(uploadData, uploadParam, info);
1120 if (ret != E_OK) {
1121 break;
1122 }
1123 uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
1124 if (continueStmtToken == nullptr) {
1125 break;
1126 }
1127 SetUploadDataFlag(uploadParam.taskId, uploadData);
1128 LOGI("[CloudSyncer] Write local water after upload one batch, table[%s length[%zu]], water[%llu]",
1129 DBCommon::StringMiddleMasking(uploadData.tableName).c_str(), uploadData.tableName.length(),
1130 uploadParam.localMark);
1131 RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
1132 ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1133 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1134 LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1135 break;
1136 }
1137 ChkIgnoredProcess(info, uploadData, uploadParam);
1138 }
1139 if ((ret != E_OK) && (ret != -E_UNFINISHED) && (ret != -E_TASK_PAUSED)) {
1140 NotifyUploadFailed(ret, info);
1141 }
1142 if (isLocked && IsNeedLock(uploadParam)) {
1143 UnlockIfNeed();
1144 }
1145 return ret;
1146 }
1147
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1148 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1149 {
1150 InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
1151 static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
1152 int errCode = E_OK;
1153 for (const auto &waterType: waterTypes) {
1154 uploadParam.mode = waterType;
1155 errCode = DoUploadByMode(tableName, uploadParam, info);
1156 if (errCode != E_OK) {
1157 break;
1158 }
1159 }
1160 int ret = E_OK;
1161 if (info.upLoadInfo.successCount > 0) {
1162 ret = UploadVersionRecordIfNeed(uploadParam);
1163 }
1164 return errCode != E_OK ? errCode : ret;
1165 }
1166
UploadVersionRecordIfNeed(const UploadParam & uploadParam)1167 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
1168 {
1169 if (uploadParam.count == 0) {
1170 // no record upload
1171 return E_OK;
1172 }
1173 if (!cloudDB_.IsExistCloudVersionCallback()) {
1174 return E_OK;
1175 }
1176 auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
1177 if (errCode != E_OK) {
1178 return errCode;
1179 }
1180 bool isInsert = !uploadData.insData.record.empty();
1181 CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
1182 if (batchData.record.empty()) {
1183 LOGE("[CloudSyncer] Get invalid cloud version record");
1184 return -E_INTERNAL_ERROR;
1185 }
1186 std::string oriVersion;
1187 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
1188 std::string newVersion;
1189 std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
1190 if (errCode != E_OK) {
1191 LOGE("[CloudSyncer] Get cloud version error %d", errCode);
1192 return errCode;
1193 }
1194 batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
1195 InnerProcessInfo processInfo;
1196 Info info;
1197 std::vector<VBucket> copyRecord = batchData.record;
1198 WaterMark waterMark;
1199 CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
1200 errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
1201 batchData.record = copyRecord;
1202 CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
1203 auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1204 return errCode != E_OK ? errCode : ret;
1205 }
1206
TagUploadAssets(CloudSyncData & uploadData)1207 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1208 {
1209 if (!IsDataContainAssets()) {
1210 return;
1211 }
1212 std::vector<Field> assetFields;
1213 {
1214 std::lock_guard<std::mutex> autoLock(dataLock_);
1215 assetFields = currentContext_.assetFields[currentContext_.tableName];
1216 }
1217
1218 for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1219 for (const Field &assetField : assetFields) {
1220 (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1221 }
1222 }
1223 for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1224 for (const Field &assetField : assetFields) {
1225 (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1226 }
1227 }
1228 }
1229
IsLockInDownload()1230 bool CloudSyncer::IsLockInDownload()
1231 {
1232 std::lock_guard<std::mutex> autoLock(dataLock_);
1233 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1234 return false;
1235 }
1236 auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1237 return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1238 }
1239
SetCurrentTaskFailedInMachine(int errCode)1240 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1241 {
1242 std::lock_guard<std::mutex> autoLock(dataLock_);
1243 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1244 return CloudSyncEvent::ERROR_EVENT;
1245 }
1246
InitCloudSyncStateMachine()1247 void CloudSyncer::InitCloudSyncStateMachine()
1248 {
1249 CloudSyncStateMachine::Initialize();
1250 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1251 return SyncMachineDoDownload();
1252 });
1253 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1254 return SyncMachineDoUpload();
1255 });
1256 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1257 return SyncMachineDoFinished();
1258 });
1259 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1260 return SyncMachineDoRepeatCheck();
1261 });
1262 }
1263
SyncMachineDoRepeatCheck()1264 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1265 {
1266 auto config = storageProxy_->GetCloudSyncConfig();
1267 {
1268 std::lock_guard<std::mutex> autoLock(dataLock_);
1269 if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1270 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1271 }
1272 currentContext_.repeatCount++;
1273 if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1274 LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1275 config.maxRetryConflictTimes);
1276 SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1277 return CloudSyncEvent::ERROR_EVENT;
1278 }
1279 LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1280 currentContext_.repeatCount);
1281 }
1282 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1283 }
1284
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1285 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1286 {
1287 // table exist reference should download every times
1288 if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1289 return;
1290 }
1291 std::lock_guard<std::mutex> autoLock(dataLock_);
1292 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1293 }
1294
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1295 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1296 {
1297 CloudSyncData uploadData(tableName, uploadParam.mode);
1298 SetUploadDataFlag(uploadParam.taskId, uploadData);
1299 auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1300 LOGI("[CloudSyncer] Get local water before upload result: %d, table[%s length[%zu]], water[%llu]", err,
1301 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), localWater);
1302 if (err != E_OK) {
1303 return err;
1304 }
1305 ContinueToken continueStmtToken = nullptr;
1306 int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1307 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1308 LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1309 return ret;
1310 }
1311 uploadParam.count -= uploadData.ignoredCount;
1312 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1313 std::vector<ReviseModTimeInfo> revisedData;
1314 ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken, revisedData);
1315 if (ret != -E_TASK_PAUSED) {
1316 // reset watermark to zero when task no paused
1317 RecordWaterMark(uploadParam.taskId, 0u);
1318 }
1319 if (continueStmtToken != nullptr) {
1320 storageProxy_->ReleaseContinueToken(continueStmtToken);
1321 }
1322 if (!revisedData.empty()) {
1323 int errCode = storageProxy_->ReviseLocalModTime(tableName, revisedData);
1324 if (errCode != E_OK) {
1325 LOGE("[CloudSyncer] Failed to revise local modify time: %d, table: %s",
1326 errCode, DBCommon::StringMiddleMasking(tableName).c_str());
1327 }
1328 }
1329 return ret;
1330 }
1331
IsTableFinishInUpload(const std::string & table)1332 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1333 {
1334 std::lock_guard<std::mutex> autoLock(dataLock_);
1335 return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1336 }
1337
MarkUploadFinishIfNeed(const std::string & table)1338 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1339 {
1340 // table exist reference or reference by should upload every times
1341 if (storageProxy_->IsTableExistReferenceOrReferenceBy(table)) {
1342 return;
1343 }
1344 std::lock_guard<std::mutex> autoLock(dataLock_);
1345 currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1346 }
1347
GetCloudTaskStatus(uint64_t taskId) const1348 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1349 {
1350 std::lock_guard<std::mutex> autoLock(dataLock_);
1351 auto iter = cloudTaskInfos_.find(taskId);
1352 SyncProcess syncProcess;
1353 if (iter == cloudTaskInfos_.end()) {
1354 syncProcess.process = ProcessStatus::FINISHED;
1355 syncProcess.errCode = NOT_FOUND;
1356 LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1357 return syncProcess;
1358 }
1359 syncProcess.process = iter->second.status;
1360 syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1361 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1362 if (currentContext_.currentTaskId == taskId) {
1363 notifier = currentContext_.notifier;
1364 }
1365 bool hasNotifier = notifier != nullptr;
1366 if (hasNotifier) {
1367 syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1368 }
1369 LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1370 iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1371 return syncProcess;
1372 }
1373
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1374 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1375 {
1376 if (taskInfo.taskId != INVALID_TASK_ID) {
1377 if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1378 LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1379 taskInfo.storeId.c_str());
1380 return -E_INVALID_ARGS;
1381 }
1382 LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1383 return E_OK;
1384 }
1385 lastTaskId_--;
1386 if (lastTaskId_ == INVALID_TASK_ID) {
1387 lastTaskId_ = UINT64_MAX;
1388 }
1389 taskInfo.taskId = lastTaskId_;
1390 return E_OK;
1391 }
1392
ProcessVersionConflictInfo(InnerProcessInfo & innerProcessInfo,uint32_t retryCount)1393 void CloudSyncer::ProcessVersionConflictInfo(InnerProcessInfo &innerProcessInfo, uint32_t retryCount)
1394 {
1395 innerProcessInfo.retryInfo.uploadBatchRetryCount = retryCount;
1396 CloudSyncConfig config = storageProxy_->GetCloudSyncConfig();
1397 {
1398 std::lock_guard<std::mutex> autoLock(dataLock_);
1399 if (config.maxRetryConflictTimes >= 0 &&
1400 currentContext_.repeatCount + 1 > config.maxRetryConflictTimes) {
1401 innerProcessInfo.upLoadInfo.failCount =
1402 innerProcessInfo.upLoadInfo.total - innerProcessInfo.upLoadInfo.successCount;
1403 }
1404 }
1405 }
1406
GetStoreIdByTask(TaskId taskId)1407 std::string CloudSyncer::GetStoreIdByTask(TaskId taskId)
1408 {
1409 std::lock_guard<std::mutex> autoLock(dataLock_);
1410 return cloudTaskInfos_[taskId].storeId;
1411 }
1412
StopSyncTask(std::function<int (void)> & removeFunc)1413 int CloudSyncer::StopSyncTask(std::function<int(void)> &removeFunc)
1414 {
1415 hasKvRemoveTask = true;
1416 CloudSyncer::TaskId currentTask;
1417 {
1418 // stop task if exist
1419 std::lock_guard<std::mutex> autoLock(dataLock_);
1420 currentTask = currentContext_.currentTaskId;
1421 }
1422 if (currentTask != INVALID_TASK_ID || asyncTaskId_ != INVALID_TASK_ID) {
1423 StopAllTasks(-E_CLOUD_ERROR);
1424 }
1425 int errCode = E_OK;
1426 {
1427 std::lock_guard<std::mutex> lock(syncMutex_);
1428 errCode = removeFunc();
1429 hasKvRemoveTask = false;
1430 }
1431 if (errCode != E_OK) {
1432 LOGE("[CloudSyncer] removeFunc execute failed errCode: %d.", errCode);
1433 }
1434 return errCode;
1435 }
1436
StopAllTasks(int errCode)1437 void CloudSyncer::StopAllTasks(int errCode)
1438 {
1439 CloudSyncer::TaskId currentTask;
1440 {
1441 std::lock_guard<std::mutex> autoLock(dataLock_);
1442 currentTask = currentContext_.currentTaskId;
1443 }
1444 // mark current task user_change
1445 SetTaskFailed(currentTask, errCode);
1446 UnlockIfNeed();
1447 WaitCurTaskFinished();
1448
1449 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
1450 for (auto &info: infoList) {
1451 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with errCode %d, isPriority %d.", info.taskId, errCode,
1452 info.priorityTask);
1453 auto processNotifier = std::make_shared<ProcessNotifier>(this);
1454 processNotifier->Init(info.table, info.devices, info.users);
1455 info.errCode = errCode;
1456 info.status = ProcessStatus::FINISHED;
1457 processNotifier->NotifyProcess(info, {}, true);
1458 }
1459 }
1460
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)1461 int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
1462 {
1463 OpType strategyOpResult = OpType::NOT_HANDLE;
1464 int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
1465 if (errCode != E_OK) {
1466 return errCode;
1467 }
1468 param.downloadData.opType[idx] = strategyOpResult;
1469 if (!IsDataContainAssets()) {
1470 return E_OK;
1471 }
1472 Key hashKey;
1473 if (isExist) {
1474 hashKey = dataInfo.localInfo.logInfo.hashKey;
1475 }
1476 if (param.isAssetsOnly) {
1477 return strategyOpResult == OpType::LOCKED_NOT_HANDLE ?
1478 E_OK : TagDownloadAssetsForAssetOnly(hashKey, idx, param, dataInfo, localAssetInfo);
1479 }
1480 return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
1481 }
1482
CloudDbBatchDownloadAssets(TaskId taskId,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets)1483 int CloudSyncer::CloudDbBatchDownloadAssets(TaskId taskId, const DownloadList &downloadList,
1484 const std::set<Key> &dupHashKeySet, InnerProcessInfo &info, ChangedData &changedAssets)
1485 {
1486 int errorCode = CheckTaskIdValid(taskId);
1487 if (errorCode != E_OK) {
1488 return errorCode;
1489 }
1490 bool isSharedTable = false;
1491 errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1492 if (errorCode != E_OK) {
1493 LOGE("[CloudSyncer] check is shared table failed %d", errorCode);
1494 return errorCode;
1495 }
1496 // prepare download data
1497 auto [downloadRecord, removeAssets, downloadAssets] =
1498 GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, IsAsyncDownloadAssets(taskId), info);
1499 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1500 std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1501 };
1502 return BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1503 }
1504
FillDownloadItem(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,const InnerProcessInfo & info,bool isSharedTable,DownloadItems & record)1505 void CloudSyncer::FillDownloadItem(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1506 const InnerProcessInfo &info, bool isSharedTable, DownloadItems &record)
1507 {
1508 CloudStorageUtils::EraseNoChangeAsset(record.downloadItem.assets);
1509 if (record.downloadItem.assets.empty()) { // Download data (include deleting)
1510 return;
1511 }
1512 if (isSharedTable) {
1513 // share table will not download asset, need to reset the status
1514 for (auto &entry: record.downloadItem.assets) {
1515 for (auto &asset: entry.second) {
1516 asset.status = AssetStatus::NORMAL;
1517 }
1518 }
1519 return;
1520 }
1521 int errCode = E_OK;
1522 VBucket dbAssets;
1523 if (!IsNeedSkipDownload(isSharedTable, errCode, info, record.downloadItem, dbAssets)) {
1524 GetAssetsToRemove(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToRemove);
1525 GetAssetsToDownload(record.downloadItem.assets, dbAssets, isSharedTable, record.assetsToDownload, record.flags);
1526 }
1527 }
1528
GetDownloadRecords(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,bool isSharedTable,bool isAsyncDownloadAssets,const InnerProcessInfo & info)1529 CloudSyncer::DownloadAssetDetail CloudSyncer::GetDownloadRecords(const DownloadList &downloadList,
1530 const std::set<Key> &dupHashKeySet, bool isSharedTable, bool isAsyncDownloadAssets, const InnerProcessInfo &info)
1531 {
1532 DownloadItemRecords downloadRecord;
1533 RemoveAssetsRecords removeAssets;
1534 DownloadAssetsRecords downloadAssets;
1535 for (size_t i = 0; i < downloadList.size(); i++) {
1536 DownloadItems record;
1537 GetDownloadItem(downloadList, i, record.downloadItem);
1538 FillDownloadItem(dupHashKeySet, downloadList, info, isSharedTable, record);
1539
1540 IAssetLoader::AssetRecord removeAsset = {
1541 record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToRemove)
1542 };
1543 removeAssets.push_back(std::move(removeAsset));
1544 if (isAsyncDownloadAssets) {
1545 record.assetsToDownload.clear();
1546 }
1547 IAssetLoader::AssetRecord downloadAsset = {
1548 record.downloadItem.gid, record.downloadItem.prefix, std::move(record.assetsToDownload)
1549 };
1550 downloadAssets.push_back(std::move(downloadAsset));
1551 downloadRecord.push_back(std::move(record));
1552 }
1553 return {downloadRecord, removeAssets, downloadAssets};
1554 }
1555
BatchDownloadAndCommitRes(const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,InnerProcessInfo & info,ChangedData & changedAssets,std::tuple<DownloadItemRecords,RemoveAssetsRecords,DownloadAssetsRecords,bool> & downloadDetail)1556 int CloudSyncer::BatchDownloadAndCommitRes(const DownloadList &downloadList, const std::set<Key> &dupHashKeySet,
1557 InnerProcessInfo &info, ChangedData &changedAssets,
1558 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> &downloadDetail)
1559 {
1560 auto &[downloadRecord, removeAssets, downloadAssets, isSharedTable] = downloadDetail;
1561 // download and remove in batch
1562 auto deleteRes = cloudDB_.BatchRemoveLocalAssets(info.tableName, removeAssets);
1563 auto downloadRes = cloudDB_.BatchDownload(info.tableName, downloadAssets);
1564 if (deleteRes == -E_NOT_SET || downloadRes == -E_NOT_SET) {
1565 return -E_NOT_SET;
1566 }
1567 int errorCode = E_OK;
1568 int index = 0;
1569 for (auto &item : downloadRecord) {
1570 auto deleteCode = removeAssets[index].status == OK ? E_OK : -E_REMOVE_ASSETS_FAILED;
1571 auto downloadCode = CloudDBProxy::GetInnerErrorCode(downloadAssets[index].status);
1572 downloadCode = downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT ? E_OK : downloadCode;
1573 if (!isSharedTable) {
1574 item.downloadItem.assets = BackFillAssetsAfterDownload(downloadCode, deleteCode, item.flags,
1575 downloadAssets[index].assets, removeAssets[index].assets);
1576 }
1577 StatisticDownloadRes(downloadAssets[index], removeAssets[index], info, item.downloadItem);
1578 AddNotifyDataFromDownloadAssets(dupHashKeySet, item.downloadItem, changedAssets);
1579 if (item.downloadItem.strategy == OpType::DELETE) {
1580 item.downloadItem.assets = {};
1581 item.downloadItem.gid = "";
1582 }
1583 // commit download res
1584 DownloadCommitList commitList;
1585 // Process result of each asset
1586 downloadCode = downloadAssets[index].status == SKIP_ASSET ? E_OK : downloadCode;
1587 commitList.push_back(std::make_tuple(item.downloadItem.gid, std::move(item.downloadItem.assets),
1588 deleteCode == E_OK && downloadCode == E_OK));
1589 errorCode = (errorCode != E_OK) ? errorCode : deleteCode;
1590 errorCode = (errorCode != E_OK) ? errorCode : downloadCode;
1591 int currErrorCode = (deleteCode != E_OK) ? deleteCode : downloadCode;
1592 int ret = CommitDownloadResult(item.downloadItem, info, commitList, currErrorCode);
1593 if (ret != E_OK) {
1594 errorCode = errorCode == E_OK ? ret : errorCode;
1595 }
1596 index++;
1597 }
1598 storageProxy_->PrintCursorChange(info.tableName);
1599 return errorCode;
1600 }
1601
StatisticDownloadRes(const IAssetLoader::AssetRecord & downloadRecord,const IAssetLoader::AssetRecord & removeRecord,InnerProcessInfo & info,DownloadItem & downloadItem)1602 void CloudSyncer::StatisticDownloadRes(const IAssetLoader::AssetRecord &downloadRecord,
1603 const IAssetLoader::AssetRecord &removeRecord, InnerProcessInfo &info, DownloadItem &downloadItem)
1604 {
1605 if ((downloadRecord.status == OK || downloadRecord.status == SKIP_ASSET) && (removeRecord.status == OK)) {
1606 return;
1607 }
1608 if ((downloadRecord.status == CLOUD_RECORD_EXIST_CONFLICT) ||
1609 (removeRecord.status == CLOUD_RECORD_EXIST_CONFLICT)) {
1610 downloadItem.recordConflict = true;
1611 return;
1612 }
1613 info.downLoadInfo.failCount += 1;
1614 if (info.downLoadInfo.successCount == 0) {
1615 LOGW("[CloudSyncer] Invalid successCount");
1616 } else {
1617 info.downLoadInfo.successCount -= 1;
1618 }
1619 }
1620
AddNotifyDataFromDownloadAssets(const std::set<Key> & dupHashKeySet,DownloadItem & downloadItem,ChangedData & changedAssets)1621 void CloudSyncer::AddNotifyDataFromDownloadAssets(const std::set<Key> &dupHashKeySet, DownloadItem &downloadItem,
1622 ChangedData &changedAssets)
1623 {
1624 if (downloadItem.assets.empty()) {
1625 return;
1626 }
1627 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1628 if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
1629 LOGW("[CloudSyncer] [AddNotifyDataFromDownloadAssets] strategy is invalid.");
1630 } else {
1631 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1632 downloadItem.primaryKeyValList);
1633 }
1634 } else if (downloadItem.strategy == OpType::INSERT) {
1635 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1636 }
1637 }
1638
CheckDataAfterDownload(const std::string & tableName)1639 void CloudSyncer::CheckDataAfterDownload(const std::string &tableName)
1640 {
1641 int dataCount = 0;
1642 int logicDeleteDataCount = 0;
1643 int errCode = storageProxy_->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
1644 if (errCode == E_OK) {
1645 LOGI("[CloudSyncer] Check local data after download[%s[%zu]], data count: %d, logic delete data count: %d",
1646 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length(), dataCount, logicDeleteDataCount);
1647 } else {
1648 LOGW("[CloudSyncer] Get local data after download fail: %d", errCode);
1649 }
1650 }
1651
WaitCurTaskFinished()1652 void CloudSyncer::WaitCurTaskFinished()
1653 {
1654 CancelBackgroundDownloadAssetsTask();
1655 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1656 if (currentContext_.currentTaskId != INVALID_TASK_ID) {
1657 LOGI("[CloudSyncer] begin wait current task %" PRIu64 " finished", currentContext_.currentTaskId);
1658 contextCv_.wait(uniqueLock, [this]() {
1659 return currentContext_.currentTaskId == INVALID_TASK_ID;
1660 });
1661 LOGI("[CloudSyncer] current task has been finished");
1662 }
1663 }
1664
IsAsyncDownloadAssets(TaskId taskId)1665 bool CloudSyncer::IsAsyncDownloadAssets(TaskId taskId)
1666 {
1667 std::lock_guard<std::mutex> autoLock(dataLock_);
1668 return cloudTaskInfos_[taskId].asyncDownloadAssets;
1669 }
1670
TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)1671 void CloudSyncer::TriggerAsyncDownloadAssetsInTaskIfNeed(bool isFirstDownload)
1672 {
1673 {
1674 std::lock_guard<std::mutex> autoLock(dataLock_);
1675 if (!cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets) {
1676 return;
1677 }
1678 }
1679 if (isFirstDownload) {
1680 bool isLockAction = IsLockInDownload();
1681 std::lock_guard<std::mutex> autoLock(dataLock_);
1682 if (currentContext_.isNeedUpload && isLockAction) {
1683 return;
1684 }
1685 }
1686 TriggerAsyncDownloadAssetsIfNeed();
1687 }
1688
BackgroundDownloadAssetsTask()1689 void CloudSyncer::BackgroundDownloadAssetsTask()
1690 {
1691 // remove listener first
1692 CancelDownloadListener();
1693 // add download count and register listener if failed
1694 auto manager = RuntimeContext::GetInstance()->GetAssetsDownloadManager();
1695 IncObjRef(this);
1696 auto [errCode, listener] = manager->BeginDownloadWithListener([this](void *) {
1697 TriggerAsyncDownloadAssetsIfNeed();
1698 }, [this]() {
1699 DecObjRef(this);
1700 });
1701 if (errCode == E_OK) {
1702 // increase download count success
1703 CancelDownloadListener();
1704 DoBackgroundDownloadAssets();
1705 RuntimeContext::GetInstance()->GetAssetsDownloadManager()->FinishDownload();
1706 DecObjRef(this);
1707 return;
1708 }
1709 if (listener != nullptr) {
1710 std::lock_guard<std::mutex> autoLock(listenerMutex_);
1711 waitDownloadListener_ = listener;
1712 return;
1713 }
1714 LOGW("[CloudSyncer] BeginDownloadWithListener failed %d", errCode);
1715 DecObjRef(this);
1716 }
1717
CancelDownloadListener()1718 void CloudSyncer::CancelDownloadListener()
1719 {
1720 NotificationChain::Listener *waitDownloadListener = nullptr;
1721 {
1722 std::lock_guard<std::mutex> autoLock(listenerMutex_);
1723 if (waitDownloadListener_ != nullptr) {
1724 waitDownloadListener = waitDownloadListener_;
1725 waitDownloadListener_ = nullptr;
1726 }
1727 }
1728 if (waitDownloadListener != nullptr) {
1729 waitDownloadListener->Drop(true);
1730 waitDownloadListener = nullptr;
1731 }
1732 }
1733
DoBackgroundDownloadAssets()1734 void CloudSyncer::DoBackgroundDownloadAssets()
1735 {
1736 bool allDownloadFinish = true;
1737 std::map<std::string, int64_t> downloadBeginTime;
1738 do {
1739 auto [errCode, tables] = storageProxy_->GetDownloadAssetTable();
1740 if (errCode != E_OK) {
1741 LOGE("[CloudSyncer] Get download asset table failed %d", errCode);
1742 return;
1743 }
1744 allDownloadFinish = true;
1745 std::list<std::string> tableQueue(tables.begin(), tables.end());
1746 while (!tableQueue.empty()) {
1747 if (cancelAsyncTask_ || closed_) {
1748 LOGW("[CloudSyncer] exit task by cancel %d closed %d", static_cast<int>(cancelAsyncTask_),
1749 static_cast<int>(closed_));
1750 return;
1751 }
1752 errCode = BackgroundDownloadAssetsByTable(tableQueue.front(), downloadBeginTime);
1753 if (errCode == E_OK) {
1754 allDownloadFinish = false;
1755 } else if (errCode == -E_FINISHED) {
1756 tableQueue.pop_front();
1757 errCode = E_OK;
1758 } else {
1759 LOGW("[CloudSyncer] BackgroundDownloadAssetsByTable table %s failed %d",
1760 DBCommon::StringMiddleMasking(tableQueue.front()).c_str(), errCode);
1761 allDownloadFinish = false;
1762 }
1763 }
1764 } while (!allDownloadFinish);
1765 }
1766
CancelBackgroundDownloadAssetsTaskIfNeed()1767 void CloudSyncer::CancelBackgroundDownloadAssetsTaskIfNeed()
1768 {
1769 bool cancelDownload = true;
1770 {
1771 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1772 if (cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets || asyncTaskId_ == INVALID_TASK_ID) {
1773 return;
1774 }
1775 if (cloudTaskInfos_[currentContext_.currentTaskId].compensatedTask) {
1776 cancelDownload = false;
1777 }
1778 }
1779 CancelBackgroundDownloadAssetsTask(cancelDownload);
1780 }
1781
CancelBackgroundDownloadAssetsTask(bool cancelDownload)1782 void CloudSyncer::CancelBackgroundDownloadAssetsTask(bool cancelDownload)
1783 {
1784 cancelAsyncTask_ = true;
1785 if (cancelDownload) {
1786 cloudDB_.CancelDownload();
1787 }
1788 std::unique_lock<std::mutex> uniqueLock(dataLock_);
1789 if (asyncTaskId_ != INVALID_TASK_ID) {
1790 LOGI("[CloudSyncer] begin wait async download task % " PRIu64 " finished", asyncTaskId_);
1791 asyncTaskCv_.wait(uniqueLock, [this]() {
1792 return asyncTaskId_ == INVALID_TASK_ID;
1793 });
1794 LOGI("[CloudSyncer] async download task has been finished");
1795 }
1796 cancelAsyncTask_ = false;
1797 }
1798
BackgroundDownloadAssetsByTable(const std::string & table,std::map<std::string,int64_t> & downloadBeginTime)1799 int CloudSyncer::BackgroundDownloadAssetsByTable(const std::string &table,
1800 std::map<std::string, int64_t> &downloadBeginTime)
1801 {
1802 auto [errCode, downloadData] = storageProxy_->GetDownloadAssetRecords(table, downloadBeginTime[table]);
1803 if (errCode != E_OK) {
1804 return errCode;
1805 }
1806 if (downloadData.empty()) {
1807 LOGD("[CloudSyncer] table %s async download finished", DBCommon::StringMiddleMasking(table).c_str());
1808 return -E_FINISHED;
1809 }
1810
1811 bool isSharedTable = false;
1812 errCode = storageProxy_->IsSharedTable(table, isSharedTable);
1813 if (errCode != E_OK) {
1814 LOGE("[CloudSyncer] check is shared table failed %d", errCode);
1815 return errCode;
1816 }
1817 DownloadList downloadList;
1818 ChangedData changedAssets;
1819 std::tie(errCode, downloadList, changedAssets) = CloudSyncUtils::GetDownloadListByGid(storageProxy_, downloadData,
1820 table);
1821 if (errCode != E_OK) {
1822 return errCode;
1823 }
1824 CloudSyncUtils::UpdateMaxTimeWithDownloadList(downloadList, table, downloadBeginTime);
1825 std::set<Key> dupHashKeySet;
1826 InnerProcessInfo info;
1827 info.tableName = table;
1828 info.isAsyncDownload = true;
1829
1830 // prepare download data
1831 auto [downloadRecord, removeAssets, downloadAssets] =
1832 GetDownloadRecords(downloadList, dupHashKeySet, isSharedTable, false, info);
1833 std::tuple<DownloadItemRecords, RemoveAssetsRecords, DownloadAssetsRecords, bool> detail = {
1834 std::move(downloadRecord), std::move(removeAssets), std::move(downloadAssets), isSharedTable
1835 };
1836 errCode = BatchDownloadAndCommitRes(downloadList, dupHashKeySet, info, changedAssets, detail);
1837 NotifyChangedDataWithDefaultDev(std::move(changedAssets));
1838 return errCode;
1839 }
1840
TagDownloadAssetsForAssetOnly(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)1841 int CloudSyncer::TagDownloadAssetsForAssetOnly(
1842 const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, VBucket &localAssetInfo)
1843 {
1844 Type prefix;
1845 std::vector<Type> pkVals;
1846 int ret = CloudSyncUtils::GetCloudPkVals(
1847 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
1848 if (ret != E_OK) {
1849 // if get pk vals failed, mean cloud data is deteled.
1850 LOGE("[CloudSyncer] TagDownloadAssetsForAssetOnly cannot get primary key value list. %d",
1851 -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1852 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1853 }
1854 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
1855 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
1856 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
1857 return -E_INTERNAL_ERROR;
1858 }
1859 std::map<std::string, Assets> downloadAssetsMap{};
1860 ret = CloudSyncUtils::GetDownloadAssetsOnlyMapFromDownLoadData(idx, param, downloadAssetsMap);
1861 if (ret != E_OK) {
1862 return ret;
1863 }
1864 param.assetsDownloadList.push_back(
1865 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, OpType::UPDATE, downloadAssetsMap, hashKey,
1866 pkVals, dataInfo.cloudLogInfo.timestamp));
1867 return ret;
1868 }
1869
PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam & param,std::vector<VBucket> & localInfo)1870 int CloudSyncer::PutCloudSyncDataOrUpdateStatusForAssetOnly(SyncParam ¶m, std::vector<VBucket> &localInfo)
1871 {
1872 int ret = E_OK;
1873 if (param.isAssetsOnly) {
1874 // download and save only asset, ignore other data.
1875 for (auto &item : localInfo) {
1876 ret = storageProxy_->UpdateAssetStatusForAssetOnly(param.tableName, item);
1877 if (ret != E_OK) {
1878 LOGE("[CloudSyncer] Cannot save asset data due to error code %d", ret);
1879 return ret;
1880 }
1881 }
1882 } else {
1883 ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
1884 if (ret != E_OK) {
1885 param.info.downLoadInfo.failCount += param.downloadData.data.size();
1886 LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
1887 }
1888 }
1889 return ret;
1890 }
1891
QueryCloudGidForAssetsOnly(TaskId taskId,SyncParam & param,int64_t groupIdx,std::vector<std::string> & cloudGid)1892 int CloudSyncer::QueryCloudGidForAssetsOnly(
1893 TaskId taskId, SyncParam ¶m, int64_t groupIdx, std::vector<std::string> &cloudGid)
1894 {
1895 auto tableName = param.info.tableName;
1896 QuerySyncObject syncObj = GetQuerySyncObject(tableName);
1897 VBucket extend = {{CloudDbConstant::CURSOR_FIELD, param.cloudWaterMarkForAssetsOnly}};
1898 QuerySyncObject obj;
1899 int ret = syncObj.GetQuerySyncObjectFromGroup(groupIdx, obj);
1900 if (ret != E_OK) {
1901 LOGE("Get query obj from group fail, errCode = %d", ret);
1902 return ret;
1903 }
1904 ret = GetCloudGid(taskId, tableName, obj, cloudGid);
1905 if (ret != E_OK) {
1906 LOGE("Get cloud gid fail, errCode = %d", ret);
1907 }
1908 return ret;
1909 }
1910
GetEmptyGidAssetsMapFromDownloadData(const std::vector<VBucket> & data,std::map<std::string,AssetsMap> & gidAssetsMap)1911 int CloudSyncer::GetEmptyGidAssetsMapFromDownloadData(
1912 const std::vector<VBucket> &data, std::map<std::string, AssetsMap> &gidAssetsMap)
1913 {
1914 for (uint32_t i = 0; i < data.size(); i++) {
1915 std::string gidStr;
1916 int errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, data[i], gidStr);
1917 if (errCode != E_OK) {
1918 LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", errCode);
1919 return errCode;
1920 }
1921 gidAssetsMap[gidStr] = AssetsMap{};
1922 }
1923 return E_OK;
1924 }
1925
SetAssetsMapAndEraseDataForAssetsOnly(TaskId taskId,SyncParam & param,std::vector<VBucket> & downloadData,std::map<std::string,AssetsMap> & gidAssetsMap)1926 int CloudSyncer::SetAssetsMapAndEraseDataForAssetsOnly(
1927 TaskId taskId, SyncParam ¶m, std::vector<VBucket> &downloadData, std::map<std::string, AssetsMap> &gidAssetsMap)
1928 {
1929 for (uint32_t i = 0; i < param.groupNum; i++) {
1930 std::vector<std::string> cloudGid;
1931 int ret = QueryCloudGidForAssetsOnly(taskId, param, i, cloudGid);
1932 if ((ret != E_OK && ret != -E_QUERY_END) || cloudGid.empty()) {
1933 LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1934 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1935 }
1936 if (!CloudSyncUtils::SetAssetsMapByCloudGid(cloudGid, param.assetsGroupMap[i], gidAssetsMap)) {
1937 // if group no match data, return error code.
1938 LOGE("[CloudSyncer] Cannot get the %u group data, error code: %d.", i, -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
1939 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
1940 }
1941 }
1942 for (auto iter = downloadData.begin(); iter != downloadData.end();) {
1943 std::string gidStr;
1944 int ret = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, *iter, gidStr);
1945 if (ret != E_OK) {
1946 LOGE("Get gid from bucket fail when mark flag as consistent, errCode = %d", ret);
1947 return ret;
1948 }
1949
1950 if (DBCommon::IsRecordDelete(*iter)) {
1951 iter = downloadData.erase(iter);
1952 gidAssetsMap.erase(gidStr);
1953 continue;
1954 }
1955
1956 auto assetsMap = gidAssetsMap[gidStr];
1957 if (!CloudSyncUtils::IsAssetOnlyData(*iter, assetsMap, false)) {
1958 iter = downloadData.erase(iter);
1959 gidAssetsMap.erase(gidStr);
1960 continue;
1961 }
1962 ++iter;
1963 }
1964 return E_OK;
1965 }
1966
CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId,SyncParam & param)1967 int CloudSyncer::CheckCloudQueryAssetsOnlyIfNeed(TaskId taskId, SyncParam ¶m)
1968 {
1969 {
1970 std::lock_guard<std::mutex> autoLock(dataLock_);
1971 if (!param.isAssetsOnly || cloudTaskInfos_[taskId].compensatedTask) {
1972 return E_OK;
1973 }
1974 if (!param.isVaildForAssetsOnly) {
1975 param.downloadData.data.clear();
1976 return E_OK;
1977 }
1978 cloudTaskInfos_[taskId].isAssetsOnly = param.isAssetsOnly;
1979 cloudTaskInfos_[taskId].groupNum = param.groupNum;
1980 cloudTaskInfos_[taskId].assetsGroupMap = param.assetsGroupMap;
1981 }
1982
1983 std::vector<VBucket> &downloadData = param.downloadData.data;
1984 auto &gidAssetsMap = param.gidAssetsMap;
1985 gidAssetsMap.clear();
1986 int ret = GetEmptyGidAssetsMapFromDownloadData(downloadData, gidAssetsMap);
1987 if (ret != E_OK) {
1988 return ret;
1989 }
1990
1991 // set assets map for every record and erase not match data.
1992 ret = SetAssetsMapAndEraseDataForAssetsOnly(taskId, param, downloadData, gidAssetsMap);
1993 if (ret != E_OK) {
1994 return ret;
1995 }
1996
1997 for (uint32_t i = 0; i < param.groupNum; i++) {
1998 bool isEmpty = CloudSyncUtils::CheckAssetsOnlyIsEmptyInGroup(gidAssetsMap, param.assetsGroupMap[i]);
1999 if (isEmpty) {
2000 LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2001 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2002 }
2003 }
2004 return E_OK;
2005 }
2006
CheckLocalQueryAssetsOnlyIfNeed(VBucket & localAssetInfo,SyncParam & param,DataInfoWithLog & logInfo)2007 int CloudSyncer::CheckLocalQueryAssetsOnlyIfNeed(VBucket &localAssetInfo, SyncParam ¶m, DataInfoWithLog &logInfo)
2008 {
2009 if (!param.isAssetsOnly) {
2010 return E_OK;
2011 }
2012 std::string gid = logInfo.logInfo.cloudGid;
2013 auto iter = param.gidAssetsMap.find(gid);
2014 if (iter == param.gidAssetsMap.end()) {
2015 LOGE("query assets failed, error code:%d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2016 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2017 }
2018
2019 auto assetsMap = iter->second;
2020 if (!CloudSyncUtils::IsAssetOnlyData(localAssetInfo, assetsMap, true)) {
2021 LOGE("[CloudSyncer] query assets failed, error code: %d", -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY);
2022 return -E_ASSET_NOT_FOUND_FOR_DOWN_ONLY;
2023 }
2024 return E_OK;
2025 }
2026
IsCurrentAsyncDownloadTask()2027 bool CloudSyncer::IsCurrentAsyncDownloadTask()
2028 {
2029 std::lock_guard<std::mutex> autoLock(dataLock_);
2030 return cloudTaskInfos_[currentContext_.currentTaskId].asyncDownloadAssets;
2031 }
2032
NotifyChangedDataWithDefaultDev(ChangedData && changedData)2033 void CloudSyncer::NotifyChangedDataWithDefaultDev(ChangedData &&changedData)
2034 {
2035 auto table = changedData.tableName;
2036 int ret = CloudSyncUtils::NotifyChangeData(CloudDbConstant::DEFAULT_CLOUD_DEV, storageProxy_,
2037 std::move(changedData));
2038 if (ret != E_OK) {
2039 LOGW("[CloudSyncer] Notify %s change data failed %d", DBCommon::StringMiddleMasking(table).c_str(), ret);
2040 }
2041 }
2042
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2043 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2044 {
2045 cloudDB_.SetGenCloudVersionCallback(callback);
2046 }
2047
GetDownloadAssetIndex(TaskId taskId)2048 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2049 {
2050 size_t index = 0u;
2051 std::lock_guard<std::mutex> autoLock(dataLock_);
2052 if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2053 index = resumeTaskInfos_[taskId].lastDownloadIndex;
2054 resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2055 }
2056 return index;
2057 }
2058
GetCurrentTableUploadBatchIndex()2059 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2060 {
2061 std::lock_guard<std::mutex> autoLock(dataLock_);
2062 return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2063 }
2064
ResetCurrentTableUploadBatchIndex()2065 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2066 {
2067 std::lock_guard<std::mutex> autoLock(dataLock_);
2068 if (currentContext_.notifier == nullptr) {
2069 return;
2070 }
2071 currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2072 }
2073
RecordWaterMark(TaskId taskId,Timestamp waterMark)2074 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2075 {
2076 std::lock_guard<std::mutex> autoLock(dataLock_);
2077 resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2078 }
2079
GetResumeWaterMark(TaskId taskId)2080 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2081 {
2082 std::lock_guard<std::mutex> autoLock(dataLock_);
2083 return resumeTaskInfos_[taskId].lastLocalWatermark;
2084 }
2085
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2086 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2087 {
2088 InnerProcessInfo info;
2089 info.tableName = tableName;
2090 info.tableStatus = ProcessStatus::PROCESSING;
2091 ReloadUploadInfoIfNeed(uploadParam, info);
2092 return info;
2093 }
2094
CopyAndClearTaskInfos()2095 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2096 {
2097 std::vector<CloudTaskInfo> infoList;
2098 std::lock_guard<std::mutex> autoLock(dataLock_);
2099 for (const auto &item: cloudTaskInfos_) {
2100 infoList.push_back(item.second);
2101 }
2102 taskQueue_.clear();
2103 cloudTaskInfos_.clear();
2104 resumeTaskInfos_.clear();
2105 currentContext_.notifier = nullptr;
2106 return infoList;
2107 }
2108
TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)2109 bool CloudSyncer::TryToInitQueryAndUserListForCompensatedSync(TaskId triggerTaskId)
2110 {
2111 std::vector<QuerySyncObject> syncQuery;
2112 std::vector<std::string> users;
2113 int errCode = CloudSyncUtils::GetQueryAndUsersForCompensatedSync(
2114 CloudSyncUtils::CanStartAsyncDownload(scheduleTaskCount_), storageProxy_, users, syncQuery);
2115 if (errCode != E_OK) {
2116 LOGW("[CloudSyncer] get query for compensated sync failed! errCode = %d", errCode);
2117 // if failed, finshed the task directly.
2118 DoFinished(triggerTaskId, errCode);
2119 return false;
2120 }
2121 if (syncQuery.empty()) {
2122 // if quey is empty, finshed the task directly.
2123 DoFinished(triggerTaskId, E_OK);
2124 return false;
2125 }
2126 std::vector<std::string> userList;
2127 CloudSyncUtils::GetUserListForCompensatedSync(cloudDB_, users, userList);
2128 std::lock_guard<std::mutex> autoLock(dataLock_);
2129 if (cloudTaskInfos_.find(triggerTaskId) == cloudTaskInfos_.end()) {
2130 return false;
2131 }
2132 cloudTaskInfos_[triggerTaskId].users = userList;
2133 cloudTaskInfos_[triggerTaskId].table.clear();
2134 cloudTaskInfos_[triggerTaskId].queryList.clear();
2135 cloudTaskInfos_[triggerTaskId].table.push_back(syncQuery[0].GetRelationTableName());
2136 cloudTaskInfos_[triggerTaskId].queryList.push_back(syncQuery[0]);
2137 return true;
2138 }
2139
ClearCloudWatermark(std::function<int (void)> & clearFunc)2140 int CloudSyncer::ClearCloudWatermark(std::function<int(void)> &clearFunc)
2141 {
2142 std::lock_guard<std::mutex> lock(syncMutex_);
2143 return clearFunc();
2144 }
2145
IsCloudForcePush(TaskId taskId)2146 bool CloudSyncer::IsCloudForcePush(TaskId taskId)
2147 {
2148 std::lock_guard<std::mutex> autoLock(dataLock_);
2149 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
2150 }
2151 }