1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud_sync_tag_assets.h"
25 #include "cloud_sync_utils.h"
26 #include "db_errno.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "runtime_context.h"
30 #include "storage_proxy.h"
31 #include "store_types.h"
32 #include "strategy_factory.h"
33 #include "version.h"
34
35 namespace DistributedDB {
ReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)36 void CloudSyncer::ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
37 {
38 Timestamp cacheWaterMark = GetResumeWaterMark(taskId);
39 waterMark = cacheWaterMark == 0u ? waterMark : cacheWaterMark;
40 RecordWaterMark(taskId, 0u);
41 }
42
ReloadCloudWaterMarkIfNeed(const std::string & tableName,std::string & cloudWaterMark)43 void CloudSyncer::ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark)
44 {
45 std::lock_guard<std::mutex> autoLock(dataLock_);
46 std::string cacheCloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
47 cloudWaterMark = cacheCloudWaterMark.empty() ? cloudWaterMark : cacheCloudWaterMark;
48 }
49
ReloadUploadInfoIfNeed(TaskId taskId,const UploadParam & param,InnerProcessInfo & info)50 void CloudSyncer::ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam ¶m, InnerProcessInfo &info)
51 {
52 info.upLoadInfo.total = static_cast<uint32_t>(param.count);
53 {
54 std::lock_guard<std::mutex> autoLock(dataLock_);
55 if (!cloudTaskInfos_[taskId].resume) {
56 return;
57 }
58 }
59 uint32_t lastSuccessCount = GetLastUploadSuccessCount(info.tableName);
60 if (lastSuccessCount == 0) {
61 return;
62 }
63 info.upLoadInfo.total += lastSuccessCount;
64 info.upLoadInfo.successCount += lastSuccessCount;
65 LOGD("[CloudSyncer] resume upload, last success count %" PRIu32, lastSuccessCount);
66 }
67
GetLastUploadSuccessCount(const std::string & tableName)68 uint32_t CloudSyncer::GetLastUploadSuccessCount(const std::string &tableName)
69 {
70 std::lock_guard<std::mutex> autoLock(dataLock_);
71 return currentContext_.notifier->GetLastUploadSuccessCount(tableName);
72 }
73
FillDownloadExtend(TaskId taskId,const std::string & tableName,const std::string & cloudWaterMark,VBucket & extend)74 int CloudSyncer::FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
75 VBucket &extend)
76 {
77 extend = {
78 {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
79 };
80
81 QuerySyncObject obj = GetQuerySyncObject(tableName);
82 if (obj.IsContainQueryNodes()) {
83 int errCode = GetCloudGid(taskId, tableName, obj);
84 if (errCode != E_OK) {
85 LOGE("[CloudSyncer] Failed to get cloud gid when fill extend, %d.", errCode);
86 return errCode;
87 }
88 Bytes bytes;
89 bytes.resize(obj.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
90 Parcel parcel(bytes.data(), bytes.size());
91 errCode = obj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
92 if (errCode != E_OK) {
93 LOGE("[CloudSyncer] Query serialize failed %d", errCode);
94 return errCode;
95 }
96 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::QUERY_FIELD);
97 extend[CloudDbConstant::QUERY_FIELD] = bytes;
98 } else {
99 extend[CloudDbConstant::TYPE_FIELD] = static_cast<int64_t>(CloudQueryType::FULL_TABLE);
100 }
101 return E_OK;
102 }
103
GetCloudGid(TaskId taskId,const std::string & tableName,QuerySyncObject & obj)104 int CloudSyncer::GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj)
105 {
106 std::vector<std::string> cloudGid;
107 bool isCloudForcePush = cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
108 int errCode = storageProxy_->GetCloudGid(obj, isCloudForcePush, IsCompensatedTask(taskId), cloudGid);
109 if (errCode != E_OK) {
110 LOGE("[CloudSyncer] Failed to get cloud gid, %d.", errCode);
111 } else if (!cloudGid.empty()) {
112 obj.SetCloudGid(cloudGid);
113 }
114 LOGI("[CloudSyncer] get cloud gid size:%zu", cloudGid.size());
115 return errCode;
116 }
117
GetQuerySyncObject(const std::string & tableName)118 QuerySyncObject CloudSyncer::GetQuerySyncObject(const std::string &tableName)
119 {
120 std::lock_guard<std::mutex> autoLock(dataLock_);
121 for (const auto &item : cloudTaskInfos_[currentContext_.currentTaskId].queryList) {
122 if (item.GetTableName() == tableName) {
123 return item;
124 }
125 }
126 LOGW("[CloudSyncer] not found query in cache");
127 QuerySyncObject querySyncObject;
128 querySyncObject.SetTableName(tableName);
129 return querySyncObject;
130 }
131
NotifyUploadFailed(int errCode,InnerProcessInfo & info)132 void CloudSyncer::NotifyUploadFailed(int errCode, InnerProcessInfo &info)
133 {
134 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
135 LOGI("[CloudSyncer] Stop upload due to version conflict, %d", errCode);
136 return;
137 } else {
138 LOGE("[CloudSyncer] Failed to do upload, %d", errCode);
139 }
140 info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
141 info.tableStatus = ProcessStatus::FINISHED;
142 {
143 std::lock_guard<std::mutex> autoLock(dataLock_);
144 currentContext_.notifier->UpdateProcess(info);
145 }
146 }
147
BatchInsert(Info & insertInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)148 int CloudSyncer::BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
149 {
150 int errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
151 uploadData.insData.extend, insertInfo);
152 innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
153 innerProcessInfo.upLoadInfo.insertCount += insertInfo.successCount;
154 innerProcessInfo.upLoadInfo.total -= insertInfo.total - insertInfo.successCount - insertInfo.failCount;
155 if (errCode != E_OK) {
156 LOGE("[CloudSyncer][BatchInsert] BatchInsert with error, ret is %d.", errCode);
157 }
158 if (uploadData.isCloudVersionRecord) {
159 return errCode;
160 }
161 bool isSharedTable = false;
162 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
163 if (ret != E_OK) {
164 LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
165 return ret;
166 }
167 if (!isSharedTable) {
168 ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.insData, errCode, CloudWaterType::INSERT);
169 if (ret != errCode) {
170 LOGW("[CloudSyncer][BatchInsert] FillAssetIdToAssets with error, ret is %d.", ret);
171 }
172 }
173 if (errCode != E_OK) {
174 storageProxy_->FillCloudGidIfSuccess(OpType::INSERT, uploadData);
175 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.insData.extend);
176 if (isSkip) {
177 LOGI("[CloudSyncer][BatchInsert] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
178 return E_OK;
179 } else {
180 LOGE("[CloudSyncer][BatchInsert] errCode: %d, can not skip assets missing record.", errCode);
181 return errCode;
182 }
183 }
184 // we need to fill back gid after insert data to cloud.
185 int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::INSERT, uploadData);
186 if ((errorCode != E_OK) || (ret != E_OK)) {
187 LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", errorCode);
188 return ret == E_OK ? errorCode : ret;
189 }
190 return E_OK;
191 }
192
BatchUpdate(Info & updateInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)193 int CloudSyncer::BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
194 {
195 int errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
196 uploadData.updData.extend, updateInfo);
197 innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
198 innerProcessInfo.upLoadInfo.updateCount += updateInfo.successCount;
199 innerProcessInfo.upLoadInfo.total -= updateInfo.total - updateInfo.successCount - updateInfo.failCount;
200 if (errCode != E_OK) {
201 LOGE("[CloudSyncer][BatchUpdate] BatchUpdate with error, ret is %d.", errCode);
202 }
203 if (uploadData.isCloudVersionRecord) {
204 return errCode;
205 }
206 bool isSharedTable = false;
207 int ret = storageProxy_->IsSharedTable(uploadData.tableName, isSharedTable);
208 if (ret != E_OK) {
209 LOGE("[CloudSyncer] DoBatchUpload cannot judge the table is shared table. %d", ret);
210 return ret;
211 }
212 if (!isSharedTable) {
213 ret = CloudSyncUtils::FillAssetIdToAssets(uploadData.updData, errCode, CloudWaterType::UPDATE);
214 if (ret != E_OK) {
215 LOGW("[CloudSyncer][BatchUpdate] FillAssetIdToAssets with error, ret is %d.", ret);
216 }
217 }
218 if (errCode != E_OK) {
219 storageProxy_->FillCloudGidIfSuccess(OpType::UPDATE, uploadData);
220 bool isSkip = CloudSyncUtils::IsSkipAssetsMissingRecord(uploadData.updData.extend);
221 if (isSkip) {
222 LOGI("[CloudSyncer][BatchUpdate] Try to FillCloudLogAndAsset when assets missing. errCode: %d", errCode);
223 return E_OK;
224 } else {
225 LOGE("[CloudSyncer][BatchUpdate] errCode: %d, can not skip assets missing record.", errCode);
226 return errCode;
227 }
228 }
229 int errorCode = storageProxy_->FillCloudLogAndAsset(OpType::UPDATE, uploadData);
230 if ((errorCode != E_OK) || (ret != E_OK)) {
231 LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errorCode);
232 return ret == E_OK ? errorCode : ret;
233 }
234 return E_OK;
235 }
236
DownloadAssetsOneByOne(const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)237 int CloudSyncer::DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
238 std::map<std::string, Assets> &downloadAssets)
239 {
240 bool isSharedTable = false;
241 int errCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
242 if (errCode != E_OK) {
243 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errCode);
244 return errCode;
245 }
246 int transactionCode = E_OK;
247 // shared table don't download, so just begin transaction once
248 if (isSharedTable) {
249 transactionCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
250 }
251 if (transactionCode != E_OK) {
252 LOGE("[CloudSyncer] begin transaction before download failed %d", transactionCode);
253 return transactionCode;
254 }
255 errCode = DownloadAssetsOneByOneInner(isSharedTable, info, downloadItem, downloadAssets);
256 if (isSharedTable) {
257 transactionCode = storageProxy_->Commit();
258 if (transactionCode != E_OK) {
259 LOGW("[CloudSyncer] commit transaction after download failed %d", transactionCode);
260 }
261 }
262 return (errCode == E_OK) ? transactionCode : errCode;
263 }
264
GetDBAssets(bool isSharedTable,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)265 std::pair<int, uint32_t> CloudSyncer::GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
266 const DownloadItem &downloadItem, VBucket &dbAssets)
267 {
268 std::pair<int, uint32_t> res = { E_OK, static_cast<uint32_t>(LockStatus::UNLOCK) };
269 auto &errCode = res.first;
270 if (!isSharedTable) {
271 errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
272 }
273 if (errCode != E_OK) {
274 LOGE("[CloudSyncer] begin transaction before download failed %d", errCode);
275 return res;
276 }
277 res = storageProxy_->GetAssetsByGidOrHashKey(info.tableName, downloadItem.gid,
278 downloadItem.hashKey, dbAssets);
279 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
280 if (errCode != -E_CLOUD_GID_MISMATCH) {
281 LOGE("[CloudSyncer] get assets from db failed %d", errCode);
282 }
283 if (!isSharedTable) {
284 (void)storageProxy_->Rollback();
285 }
286 return res;
287 }
288 if (!isSharedTable) {
289 errCode = storageProxy_->Commit();
290 }
291 if (errCode != E_OK) {
292 LOGE("[CloudSyncer] commit transaction before download failed %d", errCode);
293 }
294 return res;
295 }
296
BackFillAssetsAfterDownload(int downloadCode,int deleteCode,std::map<std::string,std::vector<uint32_t>> & tmpFlags,std::map<std::string,Assets> & tmpAssetsToDownload,std::map<std::string,Assets> & tmpAssetsToDelete)297 std::map<std::string, Assets>& CloudSyncer::BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
298 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
299 std::map<std::string, Assets> &tmpAssetsToDelete)
300 {
301 std::map<std::string, Assets> &downloadAssets = tmpAssetsToDownload;
302 for (auto &[col, assets] : tmpAssetsToDownload) {
303 int i = 0;
304 for (auto &asset : assets) {
305 asset.flag = tmpFlags[col][i++];
306 if (asset.flag == static_cast<uint32_t>(AssetOpType::NO_CHANGE)) {
307 continue;
308 }
309 if (downloadCode == E_OK) {
310 asset.status = NORMAL;
311 } else {
312 asset.status = (asset.status == NORMAL) ? NORMAL : ABNORMAL;
313 }
314 }
315 }
316 for (auto &[col, assets] : tmpAssetsToDelete) {
317 for (auto &asset : assets) {
318 asset.flag = static_cast<uint32_t>(AssetOpType::DELETE);
319 if (deleteCode == E_OK) {
320 asset.status = NORMAL;
321 } else {
322 asset.status = ABNORMAL;
323 }
324 downloadAssets[col].push_back(asset);
325 }
326 }
327 return downloadAssets;
328 }
329
IsNeedSkipDownload(bool isSharedTable,int & errCode,const InnerProcessInfo & info,const DownloadItem & downloadItem,VBucket & dbAssets)330 int CloudSyncer::IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
331 const DownloadItem &downloadItem, VBucket &dbAssets)
332 {
333 auto [tmpCode, status] = GetDBAssets(isSharedTable, info, downloadItem, dbAssets);
334 if (tmpCode == -E_CLOUD_GID_MISMATCH) {
335 LOGW("[CloudSyncer] skip download asset because gid mismatch");
336 errCode = E_OK;
337 return true;
338 }
339 if (CloudStorageUtils::IsDataLocked(status)) {
340 LOGI("[CloudSyncer] skip download asset because data lock:%u", status);
341 errCode = E_OK;
342 return true;
343 }
344 if (tmpCode != E_OK) {
345 errCode = (errCode != E_OK) ? errCode : tmpCode;
346 return true;
347 }
348 return false;
349 }
350
CheckDownloadOrDeleteCode(int & errCode,int downloadCode,int deleteCode,DownloadItem & downloadItem)351 bool CloudSyncer::CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem)
352 {
353 if (downloadCode == -E_CLOUD_RECORD_EXIST_CONFLICT || deleteCode == -E_CLOUD_RECORD_EXIST_CONFLICT) {
354 downloadItem.recordConflict = true;
355 errCode = E_OK;
356 return false;
357 }
358 errCode = (errCode != E_OK) ? errCode : deleteCode;
359 errCode = (errCode != E_OK) ? errCode : downloadCode;
360 if (downloadCode == -E_NOT_SET || deleteCode == -E_NOT_SET) {
361 return false;
362 }
363 return true;
364 }
365
DownloadAssetsOneByOneInner(bool isSharedTable,const InnerProcessInfo & info,DownloadItem & downloadItem,std::map<std::string,Assets> & downloadAssets)366 int CloudSyncer::DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info,
367 DownloadItem &downloadItem, std::map<std::string, Assets> &downloadAssets)
368 {
369 int errCode = E_OK;
370 std::map<std::string, Assets> tmpAssetsToDownload;
371 std::map<std::string, Assets> tmpAssetsToDelete;
372 std::map<std::string, std::vector<uint32_t>> tmpFlags;
373 for (auto &[col, assets] : downloadAssets) {
374 for (auto &asset : assets) {
375 VBucket dbAssets;
376 if (IsNeedSkipDownload(isSharedTable, errCode, info, downloadItem, dbAssets)) {
377 break;
378 }
379 if (!isSharedTable && asset.flag == static_cast<uint32_t>(AssetOpType::DELETE)) {
380 asset.status = static_cast<uint32_t>(AssetStatus::DELETE);
381 tmpAssetsToDelete[col].push_back(asset);
382 } else if (!isSharedTable && AssetOperationUtils::CalAssetOperation(col, asset, dbAssets,
383 AssetOperationUtils::CloudSyncAction::START_DOWNLOAD) == AssetOperationUtils::AssetOpType::HANDLE) {
384 asset.status = asset.flag == static_cast<uint32_t>(AssetOpType::INSERT) ?
385 static_cast<uint32_t>(AssetStatus::INSERT) : static_cast<uint32_t>(AssetStatus::UPDATE);
386 tmpAssetsToDownload[col].push_back(asset);
387 tmpFlags[col].push_back(asset.flag);
388 } else {
389 LOGD("[CloudSyncer] skip download asset...");
390 }
391 }
392 }
393 auto deleteCode = cloudDB_.RemoveLocalAssets(info.tableName, downloadItem.gid, downloadItem.prefix,
394 tmpAssetsToDelete);
395 auto downloadCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, tmpAssetsToDownload);
396 if (!CheckDownloadOrDeleteCode(errCode, downloadCode, deleteCode, downloadItem)) {
397 return errCode;
398 }
399
400 // copy asset back
401 downloadAssets = BackFillAssetsAfterDownload(downloadCode, deleteCode, tmpFlags, tmpAssetsToDownload,
402 tmpAssetsToDelete);
403 return errCode;
404 }
405
CommitDownloadAssets(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)406 int CloudSyncer::CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName,
407 DownloadCommitList &commitList, uint32_t &successCount)
408 {
409 int errCode = storageProxy_->SetLogTriggerStatus(false);
410 if (errCode != E_OK) {
411 return errCode;
412 }
413 for (auto &item : commitList) {
414 std::string gid = std::get<0>(item); // 0 means gid is the first element in assetsInfo
415 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
416 std::map<std::string, Assets> assetsMap = std::get<1>(item);
417 bool setAllNormal = std::get<2>(item); // 2 means whether the download return is E_OK
418 VBucket normalAssets;
419 VBucket failedAssets;
420 normalAssets[CloudDbConstant::GID_FIELD] = gid;
421 failedAssets[CloudDbConstant::GID_FIELD] = gid;
422 VBucket &assets = setAllNormal ? normalAssets : failedAssets;
423 for (auto &[key, asset] : assetsMap) {
424 assets[key] = std::move(asset);
425 }
426 if (!downloadItem.recordConflict) {
427 errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
428 if (errCode != E_OK) {
429 break;
430 }
431 }
432 LogInfo logInfo;
433 logInfo.cloudGid = gid;
434 // download must contain gid, just set the default value here.
435 logInfo.dataKey = DBConstant::DEFAULT_ROW_ID;
436 logInfo.hashKey = downloadItem.hashKey;
437 logInfo.timestamp = downloadItem.timestamp;
438 // there are failed assets, reset the timestamp to prevent the flag from being marked as consistent.
439 if (failedAssets.size() > 1) {
440 logInfo.timestamp = 0u;
441 }
442
443 errCode = storageProxy_->UpdateRecordFlag(tableName, downloadItem.recordConflict, logInfo);
444 if (errCode != E_OK) {
445 break;
446 }
447 successCount++;
448 }
449 int ret = storageProxy_->SetLogTriggerStatus(true);
450 return errCode == E_OK ? ret : errCode;
451 }
452
GenerateCompensatedSync(CloudTaskInfo & taskInfo)453 void CloudSyncer::GenerateCompensatedSync(CloudTaskInfo &taskInfo)
454 {
455 std::vector<QuerySyncObject> syncQuery;
456 int errCode = storageProxy_->GetCompensatedSyncQuery(syncQuery);
457 if (errCode != E_OK) {
458 LOGW("[CloudSyncer] Generate compensated sync failed by get query! errCode = %d", errCode);
459 return;
460 }
461 if (syncQuery.empty()) {
462 LOGD("[CloudSyncer] Not need generate compensated sync");
463 return;
464 }
465 for (const auto &query : syncQuery) {
466 taskInfo.table.push_back(query.GetRelationTableName());
467 taskInfo.queryList.push_back(query);
468 }
469 Sync(taskInfo);
470 LOGI("[CloudSyncer] Generate compensated sync finished");
471 }
472
ChkIgnoredProcess(InnerProcessInfo & info,const CloudSyncData & uploadData,UploadParam & uploadParam)473 void CloudSyncer::ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam)
474 {
475 if (uploadData.ignoredCount == 0) { // LCOV_EXCL_BR_LINE
476 return;
477 }
478 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
479 if (info.upLoadInfo.successCount + info.upLoadInfo.failCount != info.upLoadInfo.total) { // LCOV_EXCL_BR_LINE
480 return;
481 }
482 if (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) { // LCOV_EXCL_BR_LINE
483 return;
484 }
485 info.tableStatus = ProcessStatus::FINISHED;
486 info.upLoadInfo.batchIndex++;
487 NotifyInBatchUpload(uploadParam, info, true);
488 }
489
SaveCursorIfNeed(const std::string & tableName)490 int CloudSyncer::SaveCursorIfNeed(const std::string &tableName)
491 {
492 std::string cursor = "";
493 int errCode = storageProxy_->GetCloudWaterMark(tableName, cursor);
494 if (errCode != E_OK) {
495 LOGE("[CloudSyncer] get cloud water mark before download failed %d", errCode);
496 return errCode;
497 }
498 if (!cursor.empty()) {
499 return E_OK;
500 }
501 auto res = cloudDB_.GetEmptyCursor(tableName);
502 if (res.first != E_OK) {
503 LOGE("[CloudSyncer] get empty cursor failed %d", res.first);
504 return res.first;
505 }
506 if (res.second.empty()) {
507 LOGE("[CloudSyncer] get cursor is empty %d", -E_CLOUD_ERROR);
508 return -E_CLOUD_ERROR;
509 }
510 errCode = storageProxy_->SetCloudWaterMark(tableName, res.second);
511 if (errCode != E_OK) {
512 LOGE("[CloudSyncer] set cloud water mark before download failed %d", errCode);
513 }
514 return errCode;
515 }
516
PrepareAndDownload(const std::string & table,const CloudTaskInfo & taskInfo,bool isFirstDownload)517 int CloudSyncer::PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload)
518 {
519 int errCode = SaveCursorIfNeed(table);
520 if (errCode != E_OK) {
521 return errCode;
522 }
523 bool isShared = false;
524 errCode = storageProxy_->IsSharedTable(table, isShared);
525 if (errCode != E_OK) {
526 LOGE("[CloudSyncer] check shared table failed %d", errCode);
527 return errCode;
528 }
529 // shared table not allow logic delete
530 storageProxy_->SetCloudTaskConfig({ !isShared });
531 errCode = CheckTaskIdValid(taskInfo.taskId);
532 if (errCode != E_OK) {
533 LOGW("[CloudSyncer] task is invalid, abort sync");
534 return errCode;
535 }
536 errCode = DoDownload(taskInfo.taskId, isFirstDownload);
537 if (errCode != E_OK) {
538 LOGE("[CloudSyncer] download failed %d", errCode);
539 }
540 return errCode;
541 }
542
IsClosed() const543 bool CloudSyncer::IsClosed() const
544 {
545 return closed_ || IsKilled();
546 }
547
UpdateFlagForSavedRecord(const SyncParam & param)548 int CloudSyncer::UpdateFlagForSavedRecord(const SyncParam ¶m)
549 {
550 DownloadList downloadList;
551 {
552 std::lock_guard<std::mutex> autoLock(dataLock_);
553 downloadList = currentContext_.assetDownloadList;
554 }
555 std::set<std::string> gidFilters;
556 for (const auto &tuple: downloadList) {
557 gidFilters.insert(std::get<CloudSyncUtils::GID_INDEX>(tuple));
558 }
559 return storageProxy_->MarkFlagAsConsistent(param.tableName, param.downloadData, gidFilters);
560 }
561
BatchDelete(Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)562 int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo)
563 {
564 int errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
565 uploadData.delData.extend, deleteInfo);
566 innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
567 innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount;
568 if (errCode != E_OK) {
569 LOGE("[CloudSyncer] Failed to batch delete, %d", errCode);
570 storageProxy_->FillCloudGidIfSuccess(OpType::DELETE, uploadData);
571 return errCode;
572 }
573 errCode = storageProxy_->FillCloudLogAndAsset(OpType::DELETE, uploadData);
574 if (errCode != E_OK) {
575 LOGE("[CloudSyncer] Failed to fill back when doing upload delData, %d.", errCode);
576 }
577 return errCode;
578 }
579
IsCompensatedTask(TaskId taskId)580 bool CloudSyncer::IsCompensatedTask(TaskId taskId)
581 {
582 std::lock_guard<std::mutex> autoLock(dataLock_);
583 return cloudTaskInfos_[taskId].compensatedTask;
584 }
585
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)586 int CloudSyncer::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
587 {
588 return cloudDB_.SetCloudDB(cloudDBs);
589 }
590
CleanAllWaterMark()591 void CloudSyncer::CleanAllWaterMark()
592 {
593 storageProxy_->CleanAllWaterMark();
594 }
595
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)596 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
597 {
598 downloadItem.gid = std::get<CloudSyncUtils::GID_INDEX>(downloadList[i]);
599 downloadItem.prefix = std::get<CloudSyncUtils::PREFIX_INDEX>(downloadList[i]);
600 downloadItem.strategy = std::get<CloudSyncUtils::STRATEGY_INDEX>(downloadList[i]);
601 downloadItem.assets = std::get<CloudSyncUtils::ASSETS_INDEX>(downloadList[i]);
602 downloadItem.hashKey = std::get<CloudSyncUtils::HASH_KEY_INDEX>(downloadList[i]);
603 downloadItem.primaryKeyValList = std::get<CloudSyncUtils::PRIMARY_KEY_INDEX>(downloadList[i]);
604 downloadItem.timestamp = std::get<CloudSyncUtils::TIMESTAMP_INDEX>(downloadList[i]);
605 }
606
DoNotifyInNeed(const CloudSyncer::TaskId & taskId,const std::vector<std::string> & needNotifyTables,const bool isFirstDownload)607 void CloudSyncer::DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
608 const bool isFirstDownload)
609 {
610 bool isNeedNotify = false;
611 {
612 std::lock_guard<std::mutex> autoLock(dataLock_);
613 // only when the first download and the task no need upload actually, notify the process, otherwise,
614 // the process will notify in the upload procedure, which can guarantee the notify order of the tables
615 isNeedNotify = isFirstDownload && !currentContext_.isNeedUpload;
616 }
617 if (!isNeedNotify) {
618 return;
619 }
620 for (size_t i = 0; i < needNotifyTables.size(); ++i) {
621 UpdateProcessInfoWithoutUpload(taskId, needNotifyTables[i], i != (needNotifyTables.size() - 1u));
622 }
623 }
624
GetUploadCountByTable(const CloudSyncer::TaskId & taskId,int64_t & count)625 int CloudSyncer::GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count)
626 {
627 std::string tableName;
628 int ret = GetCurrentTableName(tableName);
629 if (ret != E_OK) {
630 LOGE("[CloudSyncer] Invalid table name for get local water mark: %d", ret);
631 return ret;
632 }
633
634 ret = storageProxy_->StartTransaction();
635 if (ret != E_OK) {
636 LOGE("[CloudSyncer] start transaction failed before getting upload count.");
637 return ret;
638 }
639
640 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
641 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
642 if (ret != E_OK) {
643 // GetUploadCount will return E_OK when upload count is zero.
644 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
645 }
646 // No need Rollback when GetUploadCount failed
647 storageProxy_->Commit();
648 return ret;
649 }
650
UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId,const std::string & tableName,bool needNotify)651 void CloudSyncer::UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName,
652 bool needNotify)
653 {
654 LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
655 InnerProcessInfo innerProcessInfo;
656 innerProcessInfo.tableName = tableName;
657 innerProcessInfo.upLoadInfo.total = 0; // count is zero
658 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
659 {
660 std::lock_guard<std::mutex> autoLock(dataLock_);
661 if (!needNotify) {
662 currentContext_.notifier->UpdateProcess(innerProcessInfo);
663 } else {
664 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
665 }
666 }
667 }
668
DoDownloadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload,bool isFirstDownload)669 int CloudSyncer::DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload)
670 {
671 std::vector<std::string> needNotifyTables;
672 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
673 std::string table;
674 {
675 std::lock_guard<std::mutex> autoLock(dataLock_);
676 if (currentContext_.processRecorder->IsDownloadFinish(currentContext_.currentUserIndex,
677 taskInfo.table[i])) {
678 continue;
679 }
680 LOGD("[CloudSyncer] try download table, index: %zu", i);
681 currentContext_.tableName = taskInfo.table[i];
682 table = currentContext_.tableName;
683 }
684 int errCode = PrepareAndDownload(table, taskInfo, isFirstDownload);
685 if (errCode != E_OK) {
686 return errCode;
687 }
688 MarkDownloadFinishIfNeed(table);
689 // needUpload indicate that the syncMode need push
690 if (needUpload) {
691 int64_t count = 0;
692 errCode = GetUploadCountByTable(taskInfo.taskId, count);
693 if (errCode != E_OK) {
694 LOGE("[CloudSyncer] GetUploadCountByTable failed %d", errCode);
695 return errCode;
696 }
697 // count > 0 means current table need upload actually
698 if (count > 0) {
699 {
700 std::lock_guard<std::mutex> autoLock(dataLock_);
701 currentContext_.isNeedUpload = true;
702 }
703 continue;
704 }
705 needNotifyTables.emplace_back(table);
706 }
707 errCode = SaveCloudWaterMark(taskInfo.table[i], taskInfo.taskId);
708 if (errCode != E_OK) {
709 LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
710 return errCode;
711 }
712 }
713 DoNotifyInNeed(taskInfo.taskId, needNotifyTables, isFirstDownload);
714 return E_OK;
715 }
716
IsNeedGetLocalWater(TaskId taskId)717 bool CloudSyncer::IsNeedGetLocalWater(TaskId taskId)
718 {
719 return !IsModeForcePush(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId)) &&
720 !IsCompensatedTask(taskId);
721 }
722
TryToAddSyncTask(CloudTaskInfo && taskInfo)723 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
724 {
725 if (closed_) {
726 LOGW("[CloudSyncer] syncer is closed, should not sync now");
727 return -E_DB_CLOSED;
728 }
729 std::shared_ptr<DataBaseSchema> cloudSchema;
730 int errCode = storageProxy_->GetCloudDbSchema(cloudSchema);
731 if (errCode != E_OK) {
732 LOGE("[CloudSyncer] Get cloud schema failed %d when add task", errCode);
733 return errCode;
734 }
735 std::lock_guard<std::mutex> autoLock(dataLock_);
736 errCode = CheckQueueSizeWithNoLock(taskInfo.priorityTask);
737 if (errCode != E_OK) {
738 return errCode;
739 }
740 errCode = GenerateTaskIdIfNeed(taskInfo);
741 if (errCode != E_OK) {
742 return errCode;
743 }
744 cloudTaskInfos_[lastTaskId_] = std::move(taskInfo);
745 if (cloudTaskInfos_[lastTaskId_].priorityTask) {
746 priorityTaskQueue_.push_back(lastTaskId_);
747 LOGI("[CloudSyncer] Add priority task ok, storeId %.3s, taskId %" PRIu64,
748 cloudTaskInfos_[lastTaskId_].storeId.c_str(), cloudTaskInfos_[lastTaskId_].taskId);
749 return E_OK;
750 }
751 if (!MergeTaskInfo(cloudSchema, lastTaskId_)) {
752 taskQueue_.push_back(lastTaskId_);
753 LOGI("[CloudSyncer] Add task ok, storeId %.3s, taskId %" PRIu64,
754 cloudTaskInfos_[lastTaskId_].storeId.c_str(), cloudTaskInfos_[lastTaskId_].taskId);
755 }
756 return E_OK;
757 }
758
MergeTaskInfo(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId taskId)759 bool CloudSyncer::MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId)
760 {
761 if (!cloudTaskInfos_[taskId].merge) { // LCOV_EXCL_BR_LINE
762 return false;
763 }
764 bool isMerge = false;
765 bool mergeHappen = false;
766 TaskId checkTaskId = taskId;
767 do {
768 std::tie(isMerge, checkTaskId) = TryMergeTask(cloudSchema, checkTaskId);
769 mergeHappen = mergeHappen || isMerge;
770 } while (isMerge);
771 return mergeHappen;
772 }
773
TryMergeTask(const std::shared_ptr<DataBaseSchema> & cloudSchema,TaskId tryTaskId)774 std::pair<bool, TaskId> CloudSyncer::TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId)
775 {
776 std::pair<bool, TaskId> res;
777 auto &[merge, nextTryTask] = res;
778 TaskId beMergeTask = INVALID_TASK_ID;
779 TaskId runningTask = currentContext_.currentTaskId;
780 for (const auto &taskId : taskQueue_) {
781 if (taskId == runningTask || taskId == tryTaskId) { // LCOV_EXCL_BR_LINE
782 continue;
783 }
784 if (!IsTasksCanMerge(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
785 continue;
786 }
787 if (MergeTaskTablesIfConsistent(taskId, tryTaskId)) { // LCOV_EXCL_BR_LINE
788 beMergeTask = taskId;
789 nextTryTask = tryTaskId;
790 merge = true;
791 break;
792 }
793 if (MergeTaskTablesIfConsistent(tryTaskId, taskId)) { // LCOV_EXCL_BR_LINE
794 beMergeTask = tryTaskId;
795 nextTryTask = taskId;
796 merge = true;
797 break;
798 }
799 }
800 if (!merge) { // LCOV_EXCL_BR_LINE
801 return res;
802 }
803 if (beMergeTask < nextTryTask) { // LCOV_EXCL_BR_LINE
804 std::tie(beMergeTask, nextTryTask) = SwapTwoTaskAndCopyTable(beMergeTask, nextTryTask);
805 }
806 AdjustTableBasedOnSchema(cloudSchema, cloudTaskInfos_[nextTryTask]);
807 auto processNotifier = std::make_shared<ProcessNotifier>(this);
808 processNotifier->Init(cloudTaskInfos_[beMergeTask].table, cloudTaskInfos_[beMergeTask].devices,
809 cloudTaskInfos_[beMergeTask].users);
810 cloudTaskInfos_[beMergeTask].errCode = -E_CLOUD_SYNC_TASK_MERGED;
811 cloudTaskInfos_[beMergeTask].status = ProcessStatus::FINISHED;
812 processNotifier->SetAllTableFinish();
813 processNotifier->NotifyProcess(cloudTaskInfos_[beMergeTask], {}, true);
814 cloudTaskInfos_.erase(beMergeTask);
815 taskQueue_.remove(beMergeTask);
816 LOGW("[CloudSyncer] TaskId %" PRIu64 " has been merged", beMergeTask);
817 return res;
818 }
819
IsTaskCanMerge(const CloudTaskInfo & taskInfo)820 bool CloudSyncer::IsTaskCanMerge(const CloudTaskInfo &taskInfo)
821 {
822 return !taskInfo.compensatedTask && !taskInfo.priorityTask &&
823 taskInfo.merge && taskInfo.mode == SYNC_MODE_CLOUD_MERGE;
824 }
825
IsTasksCanMerge(TaskId taskId,TaskId tryMergeTaskId)826 bool CloudSyncer::IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId)
827 {
828 const auto &taskInfo = cloudTaskInfos_[taskId];
829 const auto &tryMergeTaskInfo = cloudTaskInfos_[tryMergeTaskId];
830 return IsTaskCanMerge(taskInfo) && IsTaskCanMerge(tryMergeTaskInfo) &&
831 taskInfo.devices == tryMergeTaskInfo.devices;
832 }
833
MergeTaskTablesIfConsistent(TaskId sourceId,TaskId targetId)834 bool CloudSyncer::MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId)
835 {
836 const auto &source = cloudTaskInfos_[sourceId];
837 const auto &target = cloudTaskInfos_[targetId];
838 bool isMerge = true;
839 for (const auto &table : source.table) {
840 if (std::find(target.table.begin(), target.table.end(), table) == target.table.end()) { // LCOV_EXCL_BR_LINE
841 isMerge = false;
842 break;
843 }
844 }
845 return isMerge;
846 }
847
AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> & cloudSchema,CloudTaskInfo & taskInfo)848 void CloudSyncer::AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema,
849 CloudTaskInfo &taskInfo)
850 {
851 std::vector<std::string> tmpTables = taskInfo.table;
852 taskInfo.table.clear();
853 taskInfo.queryList.clear();
854 for (const auto &table : cloudSchema->tables) {
855 if (std::find(tmpTables.begin(), tmpTables.end(), table.name) != tmpTables.end()) { // LCOV_EXCL_BR_LINE
856 taskInfo.table.push_back(table.name);
857 QuerySyncObject querySyncObject;
858 querySyncObject.SetTableName(table.name);
859 taskInfo.queryList.push_back(querySyncObject);
860 }
861 }
862 }
863
SwapTwoTaskAndCopyTable(TaskId source,TaskId target)864 std::pair<TaskId, TaskId> CloudSyncer::SwapTwoTaskAndCopyTable(TaskId source, TaskId target)
865 {
866 cloudTaskInfos_[source].table = cloudTaskInfos_[target].table;
867 cloudTaskInfos_[source].queryList = cloudTaskInfos_[target].queryList;
868 std::set<std::string> users;
869 users.insert(cloudTaskInfos_[source].users.begin(), cloudTaskInfos_[source].users.end());
870 users.insert(cloudTaskInfos_[target].users.begin(), cloudTaskInfos_[target].users.end());
871 cloudTaskInfos_[source].users = std::vector<std::string>(users.begin(), users.end());
872 return {target, source};
873 }
874
IsQueryListEmpty(TaskId taskId)875 bool CloudSyncer::IsQueryListEmpty(TaskId taskId)
876 {
877 std::lock_guard<std::mutex> autoLock(dataLock_);
878 return !std::any_of(cloudTaskInfos_[taskId].queryList.begin(), cloudTaskInfos_[taskId].queryList.end(),
879 [](const auto &item) {
880 return item.IsContainQueryNodes();
881 });
882 }
883
IsNeedLock(const UploadParam & param)884 bool CloudSyncer::IsNeedLock(const UploadParam ¶m)
885 {
886 return param.lockAction == LockAction::INSERT && param.mode == CloudWaterType::INSERT;
887 }
888
GetLocalWater(const std::string & tableName,UploadParam & uploadParam)889 std::pair<int, Timestamp> CloudSyncer::GetLocalWater(const std::string &tableName, UploadParam &uploadParam)
890 {
891 std::pair<int, Timestamp> res = { E_OK, 0u };
892 if (IsNeedGetLocalWater(uploadParam.taskId)) {
893 res.first = storageProxy_->GetLocalWaterMarkByMode(tableName, uploadParam.mode, res.second);
894 }
895 uploadParam.localMark = res.second;
896 return res;
897 }
898
HandleBatchUpload(UploadParam & uploadParam,InnerProcessInfo & info,CloudSyncData & uploadData,ContinueToken & continueStmtToken)899 int CloudSyncer::HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info,
900 CloudSyncData &uploadData, ContinueToken &continueStmtToken)
901 {
902 int ret = E_OK;
903 uint32_t batchIndex = GetCurrentTableUploadBatchIndex();
904 bool isLocked = false;
905 while (!CloudSyncUtils::CheckCloudSyncDataEmpty(uploadData)) {
906 ret = PreProcessBatchUpload(uploadParam, info, uploadData);
907 if (ret != E_OK) {
908 break;
909 }
910 info.upLoadInfo.batchIndex = ++batchIndex;
911 if (IsNeedLock(uploadParam) && !isLocked) {
912 ret = LockCloudIfNeed(uploadParam.taskId);
913 if (ret != E_OK) {
914 break;
915 }
916 isLocked = true;
917 }
918 ret = DoBatchUpload(uploadData, uploadParam, info);
919 if (ret != E_OK) {
920 break;
921 }
922 uploadData = CloudSyncData(uploadData.tableName, uploadParam.mode);
923 if (continueStmtToken == nullptr) {
924 break;
925 }
926 SetUploadDataFlag(uploadParam.taskId, uploadData);
927 RecordWaterMark(uploadParam.taskId, uploadParam.localMark);
928 ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
929 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
930 LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
931 break;
932 }
933 ChkIgnoredProcess(info, uploadData, uploadParam);
934 }
935 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
936 NotifyUploadFailed(ret, info);
937 }
938 if (isLocked && IsNeedLock(uploadParam)) {
939 UnlockIfNeed();
940 }
941 return ret;
942 }
943
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)944 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
945 {
946 InnerProcessInfo info = GetInnerProcessInfo(tableName, uploadParam);
947 static std::vector<CloudWaterType> waterTypes = DBCommon::GetWaterTypeVec();
948 int errCode = E_OK;
949 for (const auto &waterType: waterTypes) {
950 uploadParam.mode = waterType;
951 errCode = DoUploadByMode(tableName, uploadParam, info);
952 if (errCode != E_OK) {
953 break;
954 }
955 }
956 int ret = E_OK;
957 if (info.upLoadInfo.successCount > 0) {
958 ret = UploadVersionRecordIfNeed(uploadParam);
959 }
960 return errCode != E_OK ? errCode : ret;
961 }
962
UploadVersionRecordIfNeed(const UploadParam & uploadParam)963 int CloudSyncer::UploadVersionRecordIfNeed(const UploadParam &uploadParam)
964 {
965 if (uploadParam.count == 0) {
966 // no record upload
967 return E_OK;
968 }
969 if (!cloudDB_.IsExistCloudVersionCallback()) {
970 return E_OK;
971 }
972 auto [errCode, uploadData] = storageProxy_->GetLocalCloudVersion();
973 if (errCode != E_OK) {
974 return errCode;
975 }
976 bool isInsert = !uploadData.insData.record.empty();
977 CloudSyncBatch &batchData = isInsert ? uploadData.insData : uploadData.updData;
978 if (batchData.record.empty()) {
979 LOGE("[CloudSyncer] Get invalid cloud version record");
980 return -E_INTERNAL_ERROR;
981 }
982 std::string oriVersion;
983 CloudStorageUtils::GetStringFromCloudData(CloudDbConstant::CLOUD_KV_FIELD_VALUE, batchData.record[0], oriVersion);
984 std::string newVersion;
985 std::tie(errCode, newVersion) = cloudDB_.GetCloudVersion(oriVersion);
986 if (errCode != E_OK) {
987 LOGE("[CloudSyncer] Get cloud version error %d", errCode);
988 return errCode;
989 }
990 batchData.record[0][CloudDbConstant::CLOUD_KV_FIELD_VALUE] = newVersion;
991 InnerProcessInfo processInfo;
992 Info info;
993 std::vector<VBucket> copyRecord = batchData.record;
994 WaterMark waterMark;
995 CloudSyncUtils::GetWaterMarkAndUpdateTime(batchData.extend, waterMark);
996 errCode = isInsert ? BatchInsert(info, uploadData, processInfo) : BatchUpdate(info, uploadData, processInfo);
997 batchData.record = copyRecord;
998 CloudSyncUtils::ModifyCloudDataTime(batchData.extend[0]);
999 auto ret = storageProxy_->FillCloudLogAndAsset(isInsert ? OpType::INSERT : OpType::UPDATE, uploadData);
1000 return errCode != E_OK ? errCode : ret;
1001 }
1002
TagUploadAssets(CloudSyncData & uploadData)1003 void CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1004 {
1005 if (!IsDataContainAssets()) {
1006 return;
1007 }
1008 std::vector<Field> assetFields;
1009 {
1010 std::lock_guard<std::mutex> autoLock(dataLock_);
1011 assetFields = currentContext_.assetFields[currentContext_.tableName];
1012 }
1013
1014 for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1015 for (const Field &assetField : assetFields) {
1016 (void)TagAssetsInSingleCol(assetField, true, uploadData.insData.record[i]);
1017 }
1018 }
1019 for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1020 for (const Field &assetField : assetFields) {
1021 (void)TagAssetsInSingleCol(assetField, false, uploadData.updData.record[i]);
1022 }
1023 }
1024 }
1025
IsLockInDownload()1026 bool CloudSyncer::IsLockInDownload()
1027 {
1028 std::lock_guard<std::mutex> autoLock(dataLock_);
1029 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1030 return false;
1031 }
1032 auto currentLockAction = static_cast<uint32_t>(cloudTaskInfos_[currentContext_.currentTaskId].lockAction);
1033 return (currentLockAction & static_cast<uint32_t>(LockAction::DOWNLOAD)) != 0;
1034 }
1035
SetCurrentTaskFailedInMachine(int errCode)1036 CloudSyncEvent CloudSyncer::SetCurrentTaskFailedInMachine(int errCode)
1037 {
1038 std::lock_guard<std::mutex> autoLock(dataLock_);
1039 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1040 return CloudSyncEvent::ERROR_EVENT;
1041 }
1042
InitCloudSyncStateMachine()1043 void CloudSyncer::InitCloudSyncStateMachine()
1044 {
1045 CloudSyncStateMachine::Initialize();
1046 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_DOWNLOAD, [this]() {
1047 return SyncMachineDoDownload();
1048 });
1049 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_UPLOAD, [this]() {
1050 return SyncMachineDoUpload();
1051 });
1052 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_FINISHED, [this]() {
1053 return SyncMachineDoFinished();
1054 });
1055 cloudSyncStateMachine_.RegisterFunc(CloudSyncState::DO_REPEAT_CHECK, [this]() {
1056 return SyncMachineDoRepeatCheck();
1057 });
1058 }
1059
SyncMachineDoRepeatCheck()1060 CloudSyncEvent CloudSyncer::SyncMachineDoRepeatCheck()
1061 {
1062 auto config = storageProxy_->GetCloudSyncConfig();
1063 {
1064 std::lock_guard<std::mutex> autoLock(dataLock_);
1065 if (config.maxRetryConflictTimes < 0) { // unlimited repeat counts
1066 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1067 }
1068 currentContext_.repeatCount++;
1069 if (currentContext_.repeatCount > config.maxRetryConflictTimes) {
1070 LOGD("[CloudSyncer] Repeat too much times current %d limit %" PRId32, currentContext_.repeatCount,
1071 config.maxRetryConflictTimes);
1072 SetCurrentTaskFailedWithoutLock(-E_CLOUD_VERSION_CONFLICT);
1073 return CloudSyncEvent::ERROR_EVENT;
1074 }
1075 LOGD("[CloudSyncer] Repeat taskId %" PRIu64 " download current %d", currentContext_.currentTaskId,
1076 currentContext_.repeatCount);
1077 }
1078 return CloudSyncEvent::REPEAT_DOWNLOAD_EVENT;
1079 }
1080
MarkDownloadFinishIfNeed(const std::string & downloadTable,bool isFinish)1081 void CloudSyncer::MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish)
1082 {
1083 // table exist reference should download every times
1084 if (IsLockInDownload() || storageProxy_->IsTableExistReferenceOrReferenceBy(downloadTable)) {
1085 return;
1086 }
1087 std::lock_guard<std::mutex> autoLock(dataLock_);
1088 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex, downloadTable, isFinish);
1089 }
1090
DoUploadByMode(const std::string & tableName,UploadParam & uploadParam,InnerProcessInfo & info)1091 int CloudSyncer::DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info)
1092 {
1093 CloudSyncData uploadData(tableName, uploadParam.mode);
1094 SetUploadDataFlag(uploadParam.taskId, uploadData);
1095 auto [err, localWater] = GetLocalWater(tableName, uploadParam);
1096 if (err != E_OK) {
1097 return err;
1098 }
1099 ContinueToken continueStmtToken = nullptr;
1100 int ret = storageProxy_->GetCloudData(GetQuerySyncObject(tableName), localWater, continueStmtToken, uploadData);
1101 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1102 LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1103 return ret;
1104 }
1105 uploadParam.count -= uploadData.ignoredCount;
1106 info.upLoadInfo.total -= static_cast<uint32_t>(uploadData.ignoredCount);
1107 ret = HandleBatchUpload(uploadParam, info, uploadData, continueStmtToken);
1108 if (ret != -E_TASK_PAUSED) {
1109 // reset watermark to zero when task no paused
1110 RecordWaterMark(uploadParam.taskId, 0u);
1111 }
1112 if (continueStmtToken != nullptr) {
1113 storageProxy_->ReleaseContinueToken(continueStmtToken);
1114 }
1115 return ret;
1116 }
1117
IsTableFinishInUpload(const std::string & table)1118 bool CloudSyncer::IsTableFinishInUpload(const std::string &table)
1119 {
1120 std::lock_guard<std::mutex> autoLock(dataLock_);
1121 return currentContext_.processRecorder->IsUploadFinish(currentContext_.currentUserIndex, table);
1122 }
1123
MarkUploadFinishIfNeed(const std::string & table)1124 void CloudSyncer::MarkUploadFinishIfNeed(const std::string &table)
1125 {
1126 // table exist reference should upload every times
1127 if (storageProxy_->IsTableExistReference(table)) {
1128 return;
1129 }
1130 std::lock_guard<std::mutex> autoLock(dataLock_);
1131 currentContext_.processRecorder->MarkUploadFinish(currentContext_.currentUserIndex, table, true);
1132 }
1133
IsNeedUpdateAsset(const VBucket & data)1134 bool CloudSyncer::IsNeedUpdateAsset(const VBucket &data)
1135 {
1136 for (const auto &item : data) {
1137 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
1138 if (asset != nullptr) {
1139 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(asset->status);
1140 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1141 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1142 return true;
1143 }
1144 continue;
1145 }
1146 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
1147 if (assets == nullptr) {
1148 continue;
1149 }
1150 for (const auto &oneAsset : *assets) {
1151 uint32_t lowBitStatus = AssetOperationUtils::EraseBitMask(oneAsset.status);
1152 if (lowBitStatus == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
1153 lowBitStatus == static_cast<uint32_t>(AssetStatus::DOWNLOADING)) {
1154 return true;
1155 }
1156 }
1157 }
1158 return false;
1159 }
1160
GetCloudTaskStatus(uint64_t taskId) const1161 SyncProcess CloudSyncer::GetCloudTaskStatus(uint64_t taskId) const
1162 {
1163 std::lock_guard<std::mutex> autoLock(dataLock_);
1164 auto iter = cloudTaskInfos_.find(taskId);
1165 SyncProcess syncProcess;
1166 if (iter == cloudTaskInfos_.end()) {
1167 syncProcess.process = ProcessStatus::FINISHED;
1168 syncProcess.errCode = NOT_FOUND;
1169 LOGE("[CloudSyncer] Not found task %" PRIu64, taskId);
1170 return syncProcess;
1171 }
1172 syncProcess.process = iter->second.status;
1173 syncProcess.errCode = TransferDBErrno(iter->second.errCode);
1174 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1175 if (currentContext_.currentTaskId == taskId) {
1176 notifier = currentContext_.notifier;
1177 }
1178 bool hasNotifier = notifier != nullptr;
1179 if (hasNotifier) {
1180 syncProcess.tableProcess = notifier->GetCurrentTableProcess();
1181 }
1182 LOGI("[CloudSyncer] Found task %" PRIu64 " storeId %.3s status %d has notifier %d", taskId,
1183 iter->second.storeId.c_str(), static_cast<int64_t>(syncProcess.process), static_cast<int>(hasNotifier));
1184 return syncProcess;
1185 }
1186
GenerateTaskIdIfNeed(CloudTaskInfo & taskInfo)1187 int CloudSyncer::GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo)
1188 {
1189 if (taskInfo.taskId != INVALID_TASK_ID) {
1190 if (cloudTaskInfos_.find(taskInfo.taskId) != cloudTaskInfos_.end()) {
1191 LOGE("[CloudSyncer] Sync with exist taskId %" PRIu64 " storeId %.3s", taskInfo.taskId,
1192 taskInfo.storeId.c_str());
1193 return -E_INVALID_ARGS;
1194 }
1195 lastTaskId_ = taskInfo.taskId;
1196 LOGI("[CloudSyncer] Sync with taskId %" PRIu64 " storeId %.3s", taskInfo.taskId, taskInfo.storeId.c_str());
1197 return E_OK;
1198 }
1199 lastTaskId_++;
1200 if (lastTaskId_ == UINT64_MAX) {
1201 lastTaskId_ = 1u;
1202 }
1203 taskInfo.taskId = lastTaskId_;
1204 return E_OK;
1205 }
1206 }