1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38 std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39 : lastTaskId_(INVALID_TASK_ID),
40 storageProxy_(std::move(storageProxy)),
41 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42 closed_(false),
43 hasKvRemoveTask(false),
44 timerId_(0u),
45 isKvScene_(isKvScene),
46 policy_(policy),
47 asyncTaskId_(INVALID_TASK_ID),
48 cancelAsyncTask_(false),
49 scheduleTaskCount_(0),
50 waitDownloadListener_(nullptr)
51 {
52 if (storageProxy_ != nullptr) {
53 id_ = storageProxy_->GetIdentify();
54 }
55 InitCloudSyncStateMachine();
56 }
57
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)58 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
59 const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
60 {
61 CloudTaskInfo taskInfo;
62 taskInfo.mode = mode;
63 taskInfo.table = tables;
64 taskInfo.callback = callback;
65 taskInfo.timeout = waitTime;
66 taskInfo.devices = devices;
67 for (const auto &item: tables) {
68 QuerySyncObject syncObject;
69 syncObject.SetTableName(item);
70 taskInfo.queryList.push_back(syncObject);
71 }
72 return Sync(taskInfo);
73 }
74
Sync(const CloudTaskInfo & taskInfo)75 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
76 {
77 int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
78 if (errCode != E_OK) {
79 return errCode;
80 }
81 if (cloudDB_.IsNotExistCloudDB()) {
82 LOGE("[CloudSyncer] Not set cloudDB!");
83 return -E_CLOUD_ERROR;
84 }
85 if (closed_) {
86 LOGE("[CloudSyncer] DB is closed!");
87 return -E_DB_CLOSED;
88 }
89 if (hasKvRemoveTask) {
90 LOGE("[CloudSyncer] has remove task, stop sync task!");
91 return -E_CLOUD_ERROR;
92 }
93 CloudTaskInfo info = taskInfo;
94 info.status = ProcessStatus::PREPARED;
95 errCode = TryToAddSyncTask(std::move(info));
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 return TriggerSync();
100 }
101
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)102 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
103 {
104 cloudDB_.SetCloudDB(cloudDB);
105 LOGI("[CloudSyncer] SetCloudDB finish");
106 }
107
GetCloudDB() const108 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
109 {
110 return cloudDB_.GetCloudDB();
111 }
112
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)113 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
114 {
115 storageProxy_->SetIAssetLoader(loader);
116 cloudDB_.SetIAssetLoader(loader);
117 LOGI("[CloudSyncer] SetIAssetLoader finish");
118 }
119
Close()120 void CloudSyncer::Close()
121 {
122 closed_ = true;
123 CloudSyncer::TaskId currentTask;
124 {
125 std::lock_guard<std::mutex> autoLock(dataLock_);
126 currentTask = currentContext_.currentTaskId;
127 }
128 // mark current task db_closed
129 SetTaskFailed(currentTask, -E_DB_CLOSED);
130 UnlockIfNeed();
131 cloudDB_.Close();
132 WaitCurTaskFinished();
133
134 // copy all task from queue
135 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
136 for (auto &info: infoList) {
137 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
138 }
139 storageProxy_->Close();
140 }
141
TriggerSync()142 int CloudSyncer::TriggerSync()
143 {
144 if (closed_) {
145 return -E_DB_CLOSED;
146 }
147 RefObject::IncObjRef(this);
148 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
149 DoSyncIfNeed();
150 RefObject::DecObjRef(this);
151 });
152 if (errCode != E_OK) {
153 LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
154 RefObject::DecObjRef(this);
155 }
156 return errCode;
157 }
158
SetProxyUser(const std::string & user)159 void CloudSyncer::SetProxyUser(const std::string &user)
160 {
161 std::lock_guard<std::mutex> autoLock(dataLock_);
162 if (storageProxy_ != nullptr) {
163 storageProxy_->SetUser(user);
164 }
165 if (currentContext_.notifier != nullptr) {
166 currentContext_.notifier->SetUser(user);
167 }
168 currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
169 cloudDB_.SwitchCloudDB(user);
170 }
171
DoSyncIfNeed()172 void CloudSyncer::DoSyncIfNeed()
173 {
174 if (closed_) {
175 return;
176 }
177 // do all sync task in this loop
178 do {
179 // get taskId from queue
180 TaskId triggerTaskId = GetNextTaskId();
181 if (triggerTaskId == INVALID_TASK_ID) {
182 LOGD("[CloudSyncer] task queue empty");
183 break;
184 }
185 // pop taskId in queue
186 if (PrepareSync(triggerTaskId) != E_OK) {
187 break;
188 }
189 // if the task is compensated task, we should get the query data and user list.
190 if (IsCompensatedTask(triggerTaskId) && !TryToInitQueryAndUserListForCompensatedSync(triggerTaskId)) {
191 // try to init query fail, we finish the compensated task.
192 continue;
193 }
194 CancelBackgroundDownloadAssetsTaskIfNeed();
195 // do sync logic
196 std::vector<std::string> usersList;
197 {
198 std::lock_guard<std::mutex> autoLock(dataLock_);
199 usersList = cloudTaskInfos_[triggerTaskId].users;
200 currentContext_.currentUserIndex = 0;
201 }
202 int errCode = E_OK;
203 if (usersList.empty()) {
204 SetProxyUser("");
205 errCode = DoSync(triggerTaskId);
206 } else {
207 for (const auto &user : usersList) {
208 SetProxyUser(user);
209 errCode = DoSync(triggerTaskId);
210 }
211 }
212 LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
213 } while (!closed_);
214 LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
215 }
216
DoSync(TaskId taskId)217 int CloudSyncer::DoSync(TaskId taskId)
218 {
219 std::lock_guard<std::mutex> lock(syncMutex_);
220 ResetCurrentTableUploadBatchIndex();
221 CloudTaskInfo taskInfo;
222 {
223 std::lock_guard<std::mutex> autoLock(dataLock_);
224 taskInfo = cloudTaskInfos_[taskId];
225 cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
226 LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
227 static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
228 }
229 bool needUpload = true;
230 bool isNeedFirstDownload = false;
231 bool isLockAction = IsLockInDownload();
232 {
233 std::lock_guard<std::mutex> autoLock(dataLock_);
234 if (currentContext_.strategy != nullptr) {
235 needUpload = currentContext_.strategy->JudgeUpload();
236 }
237 // 1. if the locker is already exist, directly reuse the lock, no need do the first download
238 // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
239 isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload) &&
240 isLockAction;
241 }
242 bool isFirstDownload = true;
243 if (isNeedFirstDownload) {
244 // do first download
245 int errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
246 SetTaskFailed(taskId, errCode);
247 if (errCode != E_OK) {
248 SyncMachineDoFinished();
249 return errCode;
250 }
251 bool isActuallyNeedUpload = false; // whether the task actually has data to upload
252 {
253 std::lock_guard<std::mutex> autoLock(dataLock_);
254 isActuallyNeedUpload = currentContext_.isNeedUpload;
255 }
256 if (!isActuallyNeedUpload) {
257 LOGI("[CloudSyncer] no table need upload!");
258 SyncMachineDoFinished();
259 return E_OK;
260 }
261 isFirstDownload = false;
262 }
263
264 {
265 std::lock_guard<std::mutex> autoLock(dataLock_);
266 currentContext_.isFirstDownload = isFirstDownload;
267 currentContext_.isRealNeedUpload = needUpload;
268 }
269 return DoSyncInner(taskInfo);
270 }
271
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)272 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
273 {
274 {
275 std::lock_guard<std::mutex> autoLock(dataLock_);
276 currentContext_.tableName = taskInfo.table[index];
277 }
278 int errCode = CheckTaskIdValid(taskInfo.taskId);
279 if (errCode != E_OK) {
280 LOGE("[CloudSyncer] task is invalid, abort sync");
281 return errCode;
282 }
283 if (taskInfo.table.empty()) {
284 LOGE("[CloudSyncer] Invalid taskInfo table");
285 return -E_INVALID_ARGS;
286 }
287 errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
288 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
289 {
290 std::lock_guard<std::mutex> autoLock(dataLock_);
291 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
292 taskInfo.table[index], false);
293 LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
294 }
295 return errCode;
296 }
297 if (errCode != E_OK) {
298 LOGE("[CloudSyncer] upload failed %d", errCode);
299 return errCode;
300 }
301 return errCode;
302 }
303
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)304 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
305 {
306 if (!needUpload || taskInfo.isAssetsOnly) {
307 return E_OK;
308 }
309 storageProxy_->BeforeUploadTransaction();
310 int errCode = CloudSyncUtils::StartTransactionIfNeed(taskInfo, storageProxy_);
311 if (errCode != E_OK) {
312 LOGE("[CloudSyncer] start transaction failed before doing upload.");
313 return errCode;
314 }
315 for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
316 LOGD("[CloudSyncer] try upload table, index: %zu, table name: %s, length: %u",
317 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
318 if (IsTableFinishInUpload(taskInfo.table[i])) {
319 continue;
320 }
321 errCode = PrepareAndUpload(taskInfo, i);
322 if (errCode == -E_TASK_PAUSED) { // should re download [paused table, last table]
323 for (size_t j = i; j < taskInfo.table.size(); ++j) {
324 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
325 }
326 }
327 if (errCode != E_OK) {
328 break;
329 }
330 MarkUploadFinishIfNeed(taskInfo.table[i]);
331 }
332 if (errCode == -E_TASK_PAUSED) {
333 std::lock_guard<std::mutex> autoLock(dataLock_);
334 resumeTaskInfos_[taskInfo.taskId].upload = true;
335 }
336 CloudSyncUtils::EndTransactionIfNeed(errCode, taskInfo, storageProxy_);
337 return errCode;
338 }
339
SyncMachineDoDownload()340 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
341 {
342 CloudTaskInfo taskInfo;
343 bool needUpload;
344 bool isFirstDownload;
345 {
346 std::lock_guard<std::mutex> autoLock(dataLock_);
347 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
348 needUpload = currentContext_.isRealNeedUpload;
349 isFirstDownload = currentContext_.isFirstDownload;
350 }
351 int errCode = E_OK;
352 if (IsLockInDownload()) {
353 errCode = LockCloudIfNeed(taskInfo.taskId);
354 }
355 if (errCode != E_OK) {
356 return SetCurrentTaskFailedInMachine(errCode);
357 }
358 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
359 if (errCode != E_OK) {
360 if (errCode == -E_TASK_PAUSED) {
361 DBDfxAdapter::ReportBehavior(
362 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
363 } else {
364 DBDfxAdapter::ReportBehavior(
365 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
366 }
367 return SetCurrentTaskFailedInMachine(errCode);
368 }
369 DBDfxAdapter::ReportBehavior(
370 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
371 return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
372 }
373
SyncMachineDoUpload()374 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
375 {
376 CloudTaskInfo taskInfo;
377 bool needUpload;
378 {
379 std::lock_guard<std::mutex> autoLock(dataLock_);
380 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
381 needUpload = currentContext_.isRealNeedUpload;
382 }
383 DBDfxAdapter::ReportBehavior(
384 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
385 int errCode = DoUploadInNeed(taskInfo, needUpload);
386 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
387 DBDfxAdapter::ReportBehavior(
388 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
389 return CloudSyncEvent::REPEAT_CHECK_EVENT;
390 }
391 if (errCode != E_OK) {
392 {
393 std::lock_guard<std::mutex> autoLock(dataLock_);
394 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
395 }
396 if (errCode == -E_TASK_PAUSED) {
397 DBDfxAdapter::ReportBehavior(
398 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
399 } else {
400 DBDfxAdapter::ReportBehavior(
401 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
402 }
403 return CloudSyncEvent::ERROR_EVENT;
404 }
405 DBDfxAdapter::ReportBehavior(
406 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
407 return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
408 }
409
SyncMachineDoFinished()410 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
411 {
412 UnlockIfNeed();
413 TaskId taskId;
414 int errCode;
415 int currentUserIndex;
416 int userListSize;
417 {
418 std::lock_guard<std::mutex> autoLock(dataLock_);
419 taskId = currentContext_.currentTaskId;
420 errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
421 currentUserIndex = currentContext_.currentUserIndex;
422 userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
423 }
424 if (currentUserIndex >= userListSize) {
425 {
426 std::lock_guard<std::mutex> autoLock(dataLock_);
427 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
428 }
429 DoFinished(taskId, errCode);
430 } else {
431 CloudTaskInfo taskInfo;
432 {
433 std::lock_guard<std::mutex> autoLock(dataLock_);
434 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
435 }
436 taskInfo.status = ProcessStatus::FINISHED;
437 if (currentContext_.notifier != nullptr) {
438 currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
439 }
440 {
441 std::lock_guard<std::mutex> autoLock(dataLock_);
442 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
443 }
444 }
445 return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
446 }
447
DoSyncInner(const CloudTaskInfo & taskInfo)448 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
449 {
450 cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
451 DBDfxAdapter::ReportBehavior(
452 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
453 return E_OK;
454 }
455
DoFinished(TaskId taskId,int errCode)456 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
457 {
458 storageProxy_->OnSyncFinish();
459 if (errCode == -E_TASK_PAUSED) {
460 DBDfxAdapter::ReportBehavior(
461 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
462 LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
463 {
464 std::lock_guard<std::mutex> autoLock(dataLock_);
465 resumeTaskInfos_[taskId].context = std::move(currentContext_);
466 currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
467 resumeTaskInfos_[taskId].context.locker = nullptr;
468 ClearCurrentContextWithoutLock();
469 }
470 contextCv_.notify_all();
471 return;
472 }
473 {
474 std::lock_guard<std::mutex> autoLock(dataLock_);
475 RemoveTaskFromQueue(cloudTaskInfos_[taskId].priorityLevel, taskId);
476 }
477 ClearContextAndNotify(taskId, errCode);
478 }
479
480 /**
481 * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
482 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)483 int CloudSyncer::UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList)
484 {
485 if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
486 return E_OK;
487 }
488 int ret = E_OK;
489 for (size_t j : param.withoutRowIdData.insertData) {
490 VBucket &datum = param.downloadData.data[j];
491 std::vector<Type> primaryValues;
492 ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
493 std::get<int64_t>(datum[DBConstant::ROWID]), primaryValues);
494 if (ret != E_OK) {
495 LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
496 return ret;
497 }
498 param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
499 }
500 for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
501 size_t downloadIndex = std::get<0>(tuple);
502 VBucket &datum = param.downloadData.data[downloadIndex];
503 size_t insertIdx = std::get<1>(tuple);
504 std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
505 pkVal[0] = datum[DBConstant::ROWID];
506 }
507 for (const auto &tuple : param.withoutRowIdData.updateData) {
508 size_t downloadIndex = std::get<0>(tuple);
509 size_t updateIndex = std::get<1>(tuple);
510 VBucket &datum = param.downloadData.data[downloadIndex];
511 size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
512 if (updateIndex >= size) {
513 LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
514 return -E_INTERNAL_ERROR;
515 }
516 if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
517 LOGE("[CloudSyncer] primary key value list should not be empty.");
518 return -E_INTERNAL_ERROR;
519 }
520 // no primary key or composite primary key, the first element is rowid
521 param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
522 datum[DBConstant::ROWID];
523 }
524 return ret;
525 }
526
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)527 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
528 {
529 for (const auto &assetField : assetFields) {
530 if (data.find(assetField.colName) == data.end()) {
531 return false;
532 }
533 if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
534 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
535 return true;
536 }
537 }
538 }
539 return false;
540 }
541
IsDataContainAssets()542 bool CloudSyncer::IsDataContainAssets()
543 {
544 std::lock_guard<std::mutex> autoLock(dataLock_);
545 bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
546 if (!hasTable) {
547 LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
548 -E_INTERNAL_ERROR);
549 return false;
550 }
551 if (currentContext_.assetFields[currentContext_.tableName].empty()) {
552 return false;
553 }
554 return true;
555 }
556
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,bool isForcePullAseets,int & errCode)557 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
558 bool setNormalStatus, bool isForcePullAseets, int &errCode)
559 {
560 // Define a map to store the result
561 std::map<std::string, Assets> res = {};
562 std::vector<Field> assetFields;
563 {
564 std::lock_guard<std::mutex> autoLock(dataLock_);
565 assetFields = currentContext_.assetFields[currentContext_.tableName];
566 }
567 // For every column contain asset or assets, assetFields are in context
568 TagAssetsInfo tagAssetsInfo = {coveredData, beCoveredData, setNormalStatus, isForcePullAseets};
569 for (const Field &assetField : assetFields) {
570 Assets assets = TagAssetsInSingleCol(tagAssetsInfo, assetField, errCode);
571 if (!assets.empty()) {
572 res[assetField.colName] = assets;
573 }
574 if (errCode != E_OK) {
575 break;
576 }
577 }
578 return res;
579 }
580
FillCloudAssets(InnerProcessInfo & info,VBucket & normalAssets,VBucket & failedAssets)581 int CloudSyncer::FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets)
582 {
583 int ret = E_OK;
584 if (normalAssets.size() > 1) {
585 if (info.isAsyncDownload) {
586 ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, normalAssets, true);
587 } else {
588 ret = storageProxy_->FillCloudAssetForDownload(info.tableName, normalAssets, true);
589 }
590 if (ret != E_OK) {
591 LOGE("[CloudSyncer]%s download: Can not fill normal assets for download",
592 info.isAsyncDownload ? "Async" : "Sync");
593 return ret;
594 }
595 }
596 if (failedAssets.size() > 1) {
597 if (info.isAsyncDownload) {
598 ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, failedAssets, false);
599 } else {
600 ret = storageProxy_->FillCloudAssetForDownload(info.tableName, failedAssets, false);
601 }
602 if (ret != E_OK) {
603 LOGE("[CloudSyncer]%s download: Can not fill abnormal assets for download",
604 info.isAsyncDownload ? "Async" : "Sync");
605 return ret;
606 }
607 }
608 return E_OK;
609 }
610
HandleDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)611 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
612 DownloadCommitList &commitList, uint32_t &successCount)
613 {
614 successCount = 0;
615 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
616 if (errCode != E_OK) {
617 LOGE("[CloudSyncer] start transaction Failed before handle download.");
618 return errCode;
619 }
620 errCode = CommitDownloadAssets(downloadItem, info, commitList, successCount);
621 if (errCode != E_OK) {
622 successCount = 0;
623 int ret = E_OK;
624 if (errCode == -E_REMOVE_ASSETS_FAILED) {
625 // remove assets failed no effect to asset status, just commit
626 ret = storageProxy_->Commit();
627 } else {
628 ret = storageProxy_->Rollback();
629 }
630 LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
631 return errCode;
632 }
633 errCode = storageProxy_->Commit();
634 if (errCode != E_OK) {
635 successCount = 0;
636 LOGE("[CloudSyncer] commit failed %d", errCode);
637 }
638 return errCode;
639 }
640
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)641 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
642 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
643 {
644 int downloadStatus = E_OK;
645 {
646 std::lock_guard<std::mutex> autoLock(dataLock_);
647 downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
648 resumeTaskInfos_[taskId].downloadStatus = E_OK;
649 }
650 int errorCode = E_OK;
651 DownloadCommitList commitList;
652 for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
653 errorCode = CheckTaskIdValid(taskId);
654 if (errorCode != E_OK) {
655 std::lock_guard<std::mutex> autoLock(dataLock_);
656 resumeTaskInfos_[taskId].lastDownloadIndex = i;
657 resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
658 break;
659 }
660 DownloadItem downloadItem;
661 GetDownloadItem(downloadList, i, downloadItem);
662 errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
663 if (errorCode == -E_NOT_SET) {
664 info.downLoadInfo.failCount += (downloadList.size() - i);
665 info.downLoadInfo.successCount -= (downloadList.size() - i);
666 return errorCode;
667 }
668 if (downloadItem.strategy == OpType::DELETE) {
669 downloadItem.assets = {};
670 downloadItem.gid = "";
671 }
672 // Process result of each asset
673 commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
674 downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
675 int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
676 if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
677 return ret;
678 }
679 downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
680 }
681 LOGD("Download status is %d", downloadStatus);
682 return errorCode == E_OK ? downloadStatus : errorCode;
683 }
684
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)685 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
686 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
687 {
688 if (!IsDataContainAssets()) {
689 return E_OK;
690 }
691 // update changed data info
692 if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
693 // changedData.primaryData should have no member inside
694 return -E_INVALID_ARGS;
695 }
696 changedAssets.tableName = info.tableName;
697 changedAssets.type = ChangedDataType::ASSET;
698 changedAssets.field = pKColNames;
699
700 // Get AssetDownloadList
701 DownloadList changeList;
702 TaskId taskId;
703 {
704 std::lock_guard<std::mutex> autoLock(dataLock_);
705 changeList = currentContext_.assetDownloadList;
706 taskId = currentContext_.currentTaskId;
707 }
708 // Download data (include deleting) will handle return Code in this situation
709 int ret = E_OK;
710 if (RuntimeContext::GetInstance()->IsBatchDownloadAssets()) {
711 ret = CloudDbBatchDownloadAssets(taskId, changeList, dupHashKeySet, info, changedAssets);
712 } else {
713 ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
714 }
715 if (ret != E_OK) {
716 LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
717 }
718 return ret;
719 }
720
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)721 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo,
722 VBucket &localAssetInfo)
723 {
724 int ret = E_OK;
725 OpType strategy = param.downloadData.opType[idx];
726 switch (strategy) {
727 case OpType::INSERT:
728 case OpType::UPDATE:
729 case OpType::DELETE:
730 ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
731 break;
732 case OpType::NOT_HANDLE:
733 case OpType::ONLY_UPDATE_GID:
734 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
735 (void)TagAssetsInSingleRecord(
736 localAssetInfo, param.downloadData.data[idx], true, param.isForcePullAseets, ret);
737 for (const auto &[col, value]: localAssetInfo) {
738 param.downloadData.data[idx].insert_or_assign(col, value);
739 }
740 break;
741 }
742 default:
743 break;
744 }
745 return ret;
746 }
747
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)748 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m,
749 VBucket &localAssetInfo)
750 {
751 Type prefix;
752 std::vector<Type> pkVals;
753 OpType strategy = param.downloadData.opType[idx];
754 bool isDelStrategy = (strategy == OpType::DELETE);
755 int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
756 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
757 if (ret != E_OK) {
758 LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
759 return ret;
760 }
761 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
762 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
763 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
764 return -E_INTERNAL_ERROR;
765 }
766 AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
767 std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
768 false, param.isForcePullAseets, ret);
769 if (ret != E_OK) {
770 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
771 return ret;
772 }
773 strategy = CloudSyncUtils::CalOpType(param, idx);
774 if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
775 param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
776 }
777 param.assetsDownloadList.push_back(
778 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
779 pkVals, dataInfo.cloudLogInfo.timestamp));
780 return ret;
781 }
782
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache,std::vector<VBucket> & localInfo)783 int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
784 std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo)
785 {
786 int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
787 if (ret != E_OK) {
788 LOGE("[CloudSyncer] Invalid download data:%d", ret);
789 return ret;
790 }
791 CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
792 DataInfo dataInfo;
793 VBucket localAssetInfo;
794 bool isExist = true;
795 ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
796 if (ret == -E_NOT_FOUND) {
797 isExist = false;
798 } else if (ret != E_OK) {
799 LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
800 return ret;
801 }
802 // Get cloudLogInfo from cloud data
803 dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
804 // Tag datum to get opType
805 ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
806 if (ret != E_OK) {
807 LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
808 return ret;
809 }
810 CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
811 localLogInfoCache);
812
813 if (param.isAssetsOnly && param.downloadData.opType[idx] != OpType::LOCKED_NOT_HANDLE) {
814 // if data is lock, not need to save the local info.
815 auto findGid = param.downloadData.data[idx].find(CloudDbConstant::GID_FIELD);
816 if (findGid == param.downloadData.data[idx].end()) {
817 LOGE("[CloudSyncer] data doe's not have gid field when download only asset: %d.", -E_NOT_FOUND);
818 return -E_NOT_FOUND;
819 }
820 localAssetInfo[CloudDbConstant::GID_FIELD] = findGid->second;
821 localAssetInfo[CloudDbConstant::HASH_KEY_FIELD] = dataInfo.localInfo.logInfo.hashKey;
822 localInfo.push_back(localAssetInfo);
823 return ret; // assets only not need to save changed data without assets.
824 }
825
826 ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
827 if (ret != E_OK) {
828 LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
829 }
830 return ret;
831 }
832
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)833 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m)
834 {
835 if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
836 LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
837 return -E_INVALID_ARGS;
838 }
839 // Update download batch Info
840 param.info.downLoadInfo.batchIndex += 1;
841 param.info.downLoadInfo.total += param.downloadData.data.size();
842 param.info.retryInfo.downloadBatchOpCount = 0;
843 int ret = E_OK;
844 param.deletePrimaryKeySet.clear();
845 param.dupHashKeySet.clear();
846 CloudSyncUtils::ClearWithoutData(param);
847 std::vector<std::pair<Key, size_t>> deletedList;
848 // use for record local delete status
849 std::map<std::string, LogInfo> localLogInfoCache;
850 std::vector<VBucket> localInfo;
851 for (size_t i = 0; i < param.downloadData.data.size(); i++) {
852 ret = SaveDatum(param, i, deletedList, localLogInfoCache, localInfo);
853 if (ret != E_OK) {
854 param.info.downLoadInfo.failCount += param.downloadData.data.size();
855 LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
856 return ret;
857 }
858 }
859 // Save assetsMap into current context
860 {
861 std::lock_guard<std::mutex> autoLock(dataLock_);
862 currentContext_.assetDownloadList = param.assetsDownloadList;
863 }
864
865 ret = PutCloudSyncDataOrUpdateStatusForAssetOnly(param, localInfo);
866 if (ret != E_OK) {
867 return ret;
868 }
869
870 ret = UpdateChangedData(param, currentContext_.assetDownloadList);
871 if (ret != E_OK) {
872 param.info.downLoadInfo.failCount += param.downloadData.data.size();
873 LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
874 return ret;
875 }
876 // Update downloadInfo
877 param.info.downLoadInfo.successCount += param.downloadData.data.size();
878 // Get latest cloudWaterMark
879 VBucket &lastData = param.downloadData.data.back();
880 if (!IsNeedProcessCloudCursor(taskId) && param.isLastBatch) {
881 // the last batch of cursor in the conditional query is useless
882 param.cloudWaterMark = {};
883 } else {
884 param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
885 }
886 return param.isAssetsOnly ? E_OK : UpdateFlagForSavedRecord(param);
887 }
888
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)889 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
890 {
891 // Check Input and Context Validity
892 int ret = CheckTaskIdValid(taskId);
893 if (ret != E_OK) {
894 return ret;
895 }
896 {
897 std::lock_guard<std::mutex> autoLock(dataLock_);
898 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
899 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
900 return -E_INVALID_ARGS;
901 }
902 }
903 if (currentContext_.strategy == nullptr) {
904 LOGE("[CloudSyncer] Strategy has not been initialized");
905 return -E_INVALID_ARGS;
906 }
907 ret = storageProxy_->CheckSchema(tableName);
908 if (ret != E_OK) {
909 LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
910 return ret;
911 }
912 return E_OK;
913 }
914
NeedNotifyChangedData(const ChangedData & changedData)915 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
916 {
917 TaskId taskId;
918 {
919 std::lock_guard<std::mutex> autoLock(dataLock_);
920 taskId = currentContext_.currentTaskId;
921 }
922 if (IsModeForcePush(taskId)) {
923 return false;
924 }
925 // when there have no data been changed, it don't need fill back
926 if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
927 changedData.primaryData[OP_DELETE].empty()) {
928 return false;
929 }
930 return true;
931 }
932
NotifyChangedDataInCurrentTask(ChangedData && changedData)933 int CloudSyncer::NotifyChangedDataInCurrentTask(ChangedData &&changedData)
934 {
935 if (!NeedNotifyChangedData(changedData)) {
936 return E_OK;
937 }
938 std::string deviceName;
939 {
940 std::lock_guard<std::mutex> autoLock(dataLock_);
941 std::vector<std::string> devices = currentContext_.notifier->GetDevices();
942 if (devices.empty()) {
943 DBDfxAdapter::ReportBehavior(
944 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
945 LOGE("[CloudSyncer] CurrentContext do not contain device info");
946 return -E_CLOUD_ERROR;
947 }
948 // We use first device name as the target of NotifyChangedData
949 deviceName = devices[0];
950 }
951 return CloudSyncUtils::NotifyChangeData(deviceName, storageProxy_, std::move(changedData));
952 }
953
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)954 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
955 {
956 if (!isFirstDownload && param.downloadData.data.empty()) {
957 // if the second download and there is no download data, do not notify
958 return;
959 }
960 std::lock_guard<std::mutex> autoLock(dataLock_);
961 if (currentContext_.strategy->JudgeUpload()) {
962 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
963 } else {
964 if (param.isLastBatch) {
965 param.info.tableStatus = ProcessStatus::FINISHED;
966 }
967 if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
968 currentContext_.notifier->UpdateProcess(param.info);
969 } else {
970 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
971 }
972 }
973 }
974
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)975 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m)
976 {
977 int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
978 if (ret != E_OK) {
979 LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
980 return ret;
981 }
982 uint64_t cursor = DBConstant::INVALID_CURSOR;
983 std::string maskStoreId = DBCommon::StringMiddleMasking(GetStoreIdByTask(taskId));
984 std::string maskTableName = DBCommon::StringMiddleMasking(param.info.tableName);
985 if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
986 LOGI("[CloudSyncer] cursor before save data is %llu, db: %s, table: %s, task id: %llu", cursor,
987 maskStoreId.c_str(), maskTableName.c_str(), taskId);
988 }
989 (void)storageProxy_->SetCursorIncFlag(true);
990 if (!IsModeForcePush(taskId)) {
991 param.changedData.tableName = param.info.tableName;
992 param.changedData.field = param.pkColNames;
993 param.changedData.type = ChangedDataType::DATA;
994 }
995 ret = SaveData(taskId, param);
996 (void)storageProxy_->SetCursorIncFlag(false);
997 if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
998 LOGI("[CloudSyncer] cursor after save data is %llu, db: %s, table: %s, task id: %llu", cursor,
999 maskStoreId.c_str(), maskTableName.c_str(), taskId);
1000 }
1001 param.insertPk.clear();
1002 if (ret != E_OK) {
1003 LOGE("[CloudSyncer] cannot save data: %d.", ret);
1004 int rollBackErrorCode = storageProxy_->Rollback();
1005 if (rollBackErrorCode != E_OK) {
1006 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1007 } else {
1008 LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1009 }
1010 return ret;
1011 }
1012 ret = storageProxy_->Commit();
1013 if (ret != E_OK) {
1014 LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1015 }
1016 return ret;
1017 }
1018
DoDownloadAssets(SyncParam & param)1019 int CloudSyncer::DoDownloadAssets(SyncParam ¶m)
1020 {
1021 // Begin downloading assets
1022 ChangedData changedAssets;
1023 int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1024 bool isSharedTable = false;
1025 int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1026 if (errCode != E_OK) {
1027 LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1028 return errCode;
1029 }
1030 if (!isSharedTable) {
1031 (void)NotifyChangedDataInCurrentTask(std::move(changedAssets));
1032 }
1033 if (ret != E_OK) {
1034 LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1035 } else {
1036 param.assetsDownloadList.clear();
1037 }
1038 return ret;
1039 }
1040
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param,bool downloadAssetOnly)1041 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m, bool downloadAssetOnly)
1042 {
1043 InnerProcessInfo tmpInfo = param.info;
1044 int ret = E_OK;
1045 if (!downloadAssetOnly) {
1046 ChangedData changedData;
1047 param.changedData = changedData;
1048 param.downloadData.opType.resize(param.downloadData.data.size());
1049 param.downloadData.existDataKey.resize(param.downloadData.data.size());
1050 param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1051 ret = SaveDataInTransaction(taskId, param);
1052 if (ret != E_OK) {
1053 return ret;
1054 }
1055 // call OnChange to notify changedData object first time (without Assets)
1056 ret = NotifyChangedDataInCurrentTask(std::move(param.changedData));
1057 if (ret != E_OK) {
1058 LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1059 return ret;
1060 }
1061 }
1062 ret = DoDownloadAssets(param);
1063 if (ret == -E_TASK_PAUSED) {
1064 param.info = tmpInfo;
1065 if (IsAsyncDownloadAssets(taskId)) {
1066 UpdateCloudWaterMark(taskId, param);
1067 (void)SaveCloudWaterMark(param.tableName, taskId);
1068 }
1069 }
1070 if (ret != E_OK) {
1071 return ret;
1072 }
1073 UpdateCloudWaterMark(taskId, param);
1074 return E_OK;
1075 }
1076
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1077 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1078 bool lastBatch)
1079 {
1080 CloudTaskInfo taskInfo;
1081 {
1082 std::lock_guard<std::mutex> autoLock(dataLock_);
1083 taskInfo = cloudTaskInfos_[uploadParam.taskId];
1084 }
1085 std::lock_guard<std::mutex> autoLock(dataLock_);
1086 if (uploadParam.lastTable && lastBatch) {
1087 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1088 } else {
1089 currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1090 }
1091 }
1092
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1093 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1094 {
1095 SyncParam param;
1096 int errCode = GetSyncParamForDownload(taskId, param);
1097 if (errCode != E_OK) {
1098 LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1099 return errCode;
1100 }
1101 for (auto it = param.assetsDownloadList.begin(); it != param.assetsDownloadList.end();) {
1102 if (std::get<CloudSyncUtils::STRATEGY_INDEX>(*it) != OpType::DELETE) {
1103 it = param.assetsDownloadList.erase(it);
1104 } else {
1105 it++;
1106 }
1107 }
1108 (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1109 errCode = DoDownloadInner(taskId, param, isFirstDownload);
1110 (void)storageProxy_->ClearAllTempSyncTrigger();
1111 if (errCode == -E_TASK_PAUSED) {
1112 // No need to handle ret.
1113 int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1114 if (ret != E_OK) {
1115 LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1116 }
1117 std::lock_guard<std::mutex> autoLock(dataLock_);
1118 resumeTaskInfos_[taskId].syncParam = std::move(param);
1119 }
1120 return errCode;
1121 }
1122
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1123 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1124 {
1125 // Query data by batch until reaching end and not more data need to be download
1126 int ret = PreCheck(taskId, param.info.tableName);
1127 if (ret != E_OK) {
1128 return ret;
1129 }
1130 do {
1131 ret = DownloadOneBatch(taskId, param, isFirstDownload);
1132 if (ret != E_OK) {
1133 return ret;
1134 }
1135 } while (!param.isLastBatch);
1136 return E_OK;
1137 }
1138
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1139 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1140 {
1141 std::lock_guard<std::mutex> autoLock(dataLock_);
1142 if (currentContext_.strategy->JudgeUpload()) {
1143 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1144 } else {
1145 info.tableStatus = FINISHED;
1146 if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1147 currentContext_.notifier->UpdateProcess(info);
1148 } else {
1149 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1150 }
1151 }
1152 }
1153
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1154 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1155 {
1156 int ret = PreCheck(taskId, tableName);
1157 if (ret != E_OK) {
1158 return ret;
1159 }
1160 {
1161 std::lock_guard<std::mutex> autoLock(dataLock_);
1162 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1163 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1164 return -E_INVALID_ARGS;
1165 }
1166 if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1167 (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1168 LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1169 static_cast<int>(cloudTaskInfos_[taskId].mode));
1170 return -E_INVALID_ARGS;
1171 }
1172 }
1173
1174 return ret;
1175 }
1176
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1177 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1178 InnerProcessInfo &innerProcessInfo)
1179 {
1180 int errCode = E_OK;
1181 if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1182 errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1183 if (errCode != E_OK) {
1184 return errCode;
1185 }
1186 }
1187
1188 if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1189 errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1190 if (errCode != E_OK) {
1191 return errCode;
1192 }
1193 }
1194
1195 if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1196 errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1197 if (errCode != E_OK) {
1198 return errCode;
1199 }
1200 }
1201
1202 if (!uploadData.lockData.rowid.empty()) {
1203 errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1204 if (errCode != E_OK) {
1205 LOGE("[CloudSyncer] Failed to fill cloud log and asset after save upload data: %d", errCode);
1206 }
1207 }
1208 return errCode;
1209 }
1210
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1211 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1212 {
1213 int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1214 if (errCode != E_OK) {
1215 LOGE("[CloudSyncer] Failed to fill cloud log and asset before save upload data: %d", errCode);
1216 return errCode;
1217 }
1218 Info insertInfo;
1219 Info updateInfo;
1220 Info deleteInfo;
1221 errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1222 if (errCode != E_OK) {
1223 return errCode;
1224 }
1225 bool lastBatch = (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount) >=
1226 innerProcessInfo.upLoadInfo.total;
1227 if (lastBatch) {
1228 innerProcessInfo.upLoadInfo.total =
1229 (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount);
1230 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1231 }
1232 // After each batch upload successed, call NotifyProcess
1233 NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1234
1235 // if batch upload successed, update local water mark
1236 // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1237 errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1238 if (errCode != E_OK) {
1239 LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1240 }
1241 return errCode;
1242 }
1243
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1244 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1245 {
1246 int errCode = E_OK;
1247 storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1248 // if we use local cover cloud strategy, it won't update local water mark also.
1249 if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1250 !IsNeedProcessCloudCursor(uploadParam.taskId))) {
1251 return E_OK;
1252 }
1253 errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1254 if (errCode != E_OK) {
1255 LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1256 }
1257 return errCode;
1258 }
1259
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1260 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1261 {
1262 std::string tableName;
1263 int ret = GetCurrentTableName(tableName);
1264 if (ret != E_OK) {
1265 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1266 return ret;
1267 }
1268
1269 Timestamp localMark = 0u;
1270 ret = PreCheckUpload(taskId, tableName, localMark);
1271 if (ret != E_OK) {
1272 LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1273 return ret;
1274 }
1275 ReloadWaterMarkIfNeed(taskId, localMark);
1276 storageProxy_->OnUploadStart();
1277
1278 int64_t count = 0;
1279 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1280 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1281 LOGI("get upload count:%zu", count);
1282 if (ret != E_OK) {
1283 // GetUploadCount will return E_OK when upload count is zero.
1284 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1285 return ret;
1286 }
1287 if (count == 0) {
1288 UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1289 return E_OK;
1290 }
1291 UploadParam param;
1292 param.count = count;
1293 param.lastTable = lastTable;
1294 param.taskId = taskId;
1295 param.lockAction = lockAction;
1296 return DoUploadInner(tableName, param);
1297 }
1298
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1299 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1300 CloudSyncData &uploadData)
1301 {
1302 // Precheck and calculate local water mark which would be updated if batch upload successed.
1303 int ret = CheckTaskIdValid(uploadParam.taskId);
1304 if (ret != E_OK) {
1305 return ret;
1306 }
1307 ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1308 innerProcessInfo.upLoadInfo.total);
1309 if (ret != E_OK) {
1310 LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1311 return ret;
1312 }
1313 TagUploadAssets(uploadData);
1314 CloudSyncUtils::UpdateAssetsFlag(uploadData);
1315 // get local water mark to be updated in future.
1316 ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1317 uploadParam.taskId, uploadParam.localMark);
1318 if (ret != E_OK) {
1319 LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1320 }
1321 return ret;
1322 }
1323
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1324 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1325 {
1326 {
1327 std::lock_guard<std::mutex> autoLock(dataLock_);
1328 if (cloudTaskInfos_[taskId].isAssetsOnly) {
1329 // assset only task does not need to save water mark.
1330 return E_OK;
1331 }
1332 }
1333 std::string cloudWaterMark;
1334 bool isUpdateCloudCursor = true;
1335 {
1336 std::lock_guard<std::mutex> autoLock(dataLock_);
1337 if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1338 currentContext_.cloudWaterMarks.end() ||
1339 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1340 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1341 LOGD("[CloudSyncer] Not found water mark just return");
1342 return E_OK;
1343 }
1344 cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1345 isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1346 }
1347 isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsNeedProcessCloudCursor(taskId));
1348 if (isUpdateCloudCursor) {
1349 int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1350 if (errCode != E_OK) {
1351 LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1352 }
1353 return errCode;
1354 }
1355 return E_OK;
1356 }
1357
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1358 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1359 {
1360 std::lock_guard<std::mutex> autoLock(dataLock_);
1361 uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1362 uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1363 }
1364
IsModeForcePush(TaskId taskId)1365 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1366 {
1367 std::lock_guard<std::mutex> autoLock(dataLock_);
1368 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1369 }
1370
IsModeForcePull(const TaskId taskId)1371 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1372 {
1373 std::lock_guard<std::mutex> autoLock(dataLock_);
1374 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1375 }
1376
IsPriorityTask(TaskId taskId)1377 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1378 {
1379 std::lock_guard<std::mutex> autoLock(dataLock_);
1380 return cloudTaskInfos_[taskId].priorityTask;
1381 }
1382
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1383 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1384 {
1385 // type index of field in fields, true means mandatory filed
1386 static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1387 std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1388 std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1389 std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1390 std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1391 std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1392 std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1393 };
1394
1395 for (const auto &fieldIndex : fieldAndIndex) {
1396 if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1397 if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1398 continue;
1399 }
1400 LOGE("[CloudSyncer] Cloud data do not contain expected field.");
1401 return -E_CLOUD_ERROR;
1402 }
1403 if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1404 LOGE("[CloudSyncer] Cloud data's field, doesn't has expected type.");
1405 return -E_CLOUD_ERROR;
1406 }
1407 }
1408
1409 if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1410 CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1411 }
1412 std::lock_guard<std::mutex> autoLock(dataLock_);
1413 if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1414 LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1415 return -E_CLOUD_ERROR;
1416 }
1417 return E_OK;
1418 }
1419
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1420 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1421 DownloadData &downloadData)
1422 {
1423 VBucket extend;
1424 int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1425 if (ret != E_OK) {
1426 return ret;
1427 }
1428 ret = cloudDB_.Query(tableName, extend, downloadData.data);
1429 if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1430 if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1431 LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1432 return -E_CLOUD_ERROR;
1433 }
1434 cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1435 LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1436 return ret;
1437 }
1438 if (ret == -E_QUERY_END) {
1439 LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1440 return -E_QUERY_END;
1441 }
1442 if (ret != E_OK) {
1443 LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1444 }
1445 return ret;
1446 }
1447
CheckTaskIdValid(TaskId taskId)1448 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1449 {
1450 if (closed_) {
1451 LOGE("[CloudSyncer] DB is closed.");
1452 return -E_DB_CLOSED;
1453 }
1454 std::lock_guard<std::mutex> autoLock(dataLock_);
1455 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1456 LOGE("[CloudSyncer] not found task.");
1457 return -E_INVALID_ARGS;
1458 }
1459 if (cloudTaskInfos_[taskId].pause) {
1460 LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1461 return -E_TASK_PAUSED;
1462 }
1463 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1464 LOGE("[CloudSyncer] task %" PRIu64 " has error: %d", taskId, cloudTaskInfos_[taskId].errCode);
1465 return cloudTaskInfos_[taskId].errCode;
1466 }
1467 return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1468 }
1469
GetCurrentTableName(std::string & tableName)1470 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1471 {
1472 std::lock_guard<std::mutex> autoLock(dataLock_);
1473 if (currentContext_.tableName.empty()) {
1474 return -E_BUSY;
1475 }
1476 tableName = currentContext_.tableName;
1477 return E_OK;
1478 }
1479
GetCurrentCommonTaskNum()1480 size_t CloudSyncer::GetCurrentCommonTaskNum()
1481 {
1482 size_t queuedTaskNum = taskQueue_.count(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1483 auto rang = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1484 size_t compensatedTaskNum = 0;
1485 for (auto it = rang.first; it != rang.second; ++it) {
1486 compensatedTaskNum += cloudTaskInfos_[it->second].compensatedTask ? 1 : 0;
1487 }
1488 return queuedTaskNum - compensatedTaskNum;
1489 }
1490
CheckQueueSizeWithNoLock(bool priorityTask)1491 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1492 {
1493 int32_t limit = queuedManualSyncLimit_;
1494 size_t totalTaskNum = taskQueue_.size();
1495 size_t commonTaskNum = GetCurrentCommonTaskNum();
1496 size_t queuedTaskNum = priorityTask ? totalTaskNum - commonTaskNum : commonTaskNum;
1497 if (queuedTaskNum >= static_cast<size_t>(limit)) {
1498 std::string priorityName = priorityTask ? CLOUD_PRIORITY_TASK_STRING : CLOUD_COMMON_TASK_STRING;
1499 LOGW("[CloudSyncer] too much %s sync task", priorityName.c_str());
1500 return -E_BUSY;
1501 }
1502 return E_OK;
1503 }
1504
PrepareSync(TaskId taskId)1505 int CloudSyncer::PrepareSync(TaskId taskId)
1506 {
1507 std::lock_guard<std::mutex> autoLock(dataLock_);
1508 if (hasKvRemoveTask) {
1509 return -E_CLOUD_ERROR;
1510 }
1511 if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1512 LOGW("[CloudSyncer] Abort sync because syncer is closed");
1513 return -E_DB_CLOSED;
1514 }
1515 if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1516 LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1517 return -E_DB_CLOSED;
1518 }
1519 currentContext_.currentTaskId = taskId;
1520 cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1521 cloudTaskInfos_[taskId].pause = false;
1522 cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1523 if (cloudTaskInfos_[taskId].resume) {
1524 auto tempLocker = currentContext_.locker;
1525 currentContext_ = resumeTaskInfos_[taskId].context;
1526 currentContext_.locker = tempLocker;
1527 } else {
1528 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1529 currentContext_.strategy =
1530 StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1531 currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1532 cloudTaskInfos_[taskId].users);
1533 currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1534 }
1535 LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64 " priority[%d] compensated[%d] logicDelete[%d]",
1536 cloudTaskInfos_[taskId].storeId.c_str(), taskId, static_cast<int>(cloudTaskInfos_[taskId].priorityTask),
1537 static_cast<int>(cloudTaskInfos_[taskId].compensatedTask),
1538 static_cast<int>(storageProxy_->IsCurrentLogicDelete()));
1539 return E_OK;
1540 }
1541
LockCloud(TaskId taskId)1542 int CloudSyncer::LockCloud(TaskId taskId)
1543 {
1544 int period;
1545 {
1546 auto res = cloudDB_.Lock();
1547 if (res.first != E_OK) {
1548 return res.first;
1549 }
1550 period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1551 }
1552 int errCode = StartHeartBeatTimer(period, taskId);
1553 if (errCode != E_OK) {
1554 UnlockCloud();
1555 }
1556 return errCode;
1557 }
1558
UnlockCloud()1559 int CloudSyncer::UnlockCloud()
1560 {
1561 FinishHeartBeatTimer();
1562 return cloudDB_.UnLock();
1563 }
1564
StartHeartBeatTimer(int period,TaskId taskId)1565 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1566 {
1567 if (timerId_ != 0u) {
1568 LOGW("[CloudSyncer] HeartBeat timer has been start!");
1569 return E_OK;
1570 }
1571 TimerId timerId = 0;
1572 int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1573 HeartBeat(timerId, taskId);
1574 return E_OK;
1575 }, nullptr, timerId);
1576 if (errCode != E_OK) {
1577 LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1578 return errCode;
1579 }
1580 timerId_ = timerId;
1581 return E_OK;
1582 }
1583
FinishHeartBeatTimer()1584 void CloudSyncer::FinishHeartBeatTimer()
1585 {
1586 if (timerId_ == 0u) {
1587 return;
1588 }
1589 RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1590 timerId_ = 0u;
1591 LOGD("[CloudSyncer] Finish heartbeat timer ok");
1592 }
1593
HeartBeat(TimerId timerId,TaskId taskId)1594 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1595 {
1596 if (timerId_ != timerId) {
1597 return;
1598 }
1599 IncObjRef(this);
1600 {
1601 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1602 heartbeatCount_[taskId]++;
1603 }
1604 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1605 {
1606 std::lock_guard<std::mutex> guard(dataLock_);
1607 if (currentContext_.currentTaskId != taskId) {
1608 heartbeatCount_.erase(taskId);
1609 failedHeartbeatCount_.erase(taskId);
1610 DecObjRef(this);
1611 return;
1612 }
1613 }
1614 if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1615 // heartbeat block twice should finish task now
1616 SetTaskFailed(taskId, -E_CLOUD_ERROR);
1617 } else {
1618 int ret = cloudDB_.HeartBeat();
1619 if (ret != E_OK) {
1620 HeartBeatFailed(taskId, ret);
1621 } else {
1622 failedHeartbeatCount_[taskId] = 0;
1623 }
1624 }
1625 {
1626 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1627 heartbeatCount_[taskId]--;
1628 if (currentContext_.currentTaskId != taskId) {
1629 heartbeatCount_.erase(taskId);
1630 failedHeartbeatCount_.erase(taskId);
1631 }
1632 }
1633 DecObjRef(this);
1634 });
1635 if (errCode != E_OK) {
1636 LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1637 DecObjRef(this);
1638 }
1639 }
1640
HeartBeatFailed(TaskId taskId,int errCode)1641 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1642 {
1643 failedHeartbeatCount_[taskId]++;
1644 if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1645 return;
1646 }
1647 LOGW("[CloudSyncer] heartbeat failed too much times!");
1648 FinishHeartBeatTimer();
1649 SetTaskFailed(taskId, errCode);
1650 }
1651
SetTaskFailed(TaskId taskId,int errCode)1652 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1653 {
1654 std::lock_guard<std::mutex> autoLock(dataLock_);
1655 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1656 return;
1657 }
1658 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1659 return;
1660 }
1661 cloudTaskInfos_[taskId].errCode = errCode;
1662 }
1663
GetCloudSyncTaskCount()1664 int32_t CloudSyncer::GetCloudSyncTaskCount()
1665 {
1666 std::lock_guard<std::mutex> autoLock(dataLock_);
1667 return static_cast<int32_t>(taskQueue_.size());
1668 }
1669
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1670 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1671 const RelationalSchemaObject &localSchema)
1672 {
1673 std::lock_guard<std::mutex> lock(syncMutex_);
1674 int index = 1;
1675 for (const auto &tableName: tableNameList) {
1676 LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1677 int ret = storageProxy_->CleanWaterMark(tableName);
1678 if (ret != E_OK) {
1679 LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1680 return ret;
1681 }
1682 index++;
1683 }
1684 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1685 if (errCode != E_OK) {
1686 LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1687 return errCode;
1688 }
1689
1690 std::vector<Asset> assets;
1691 errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1692 if (errCode != E_OK) {
1693 LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1694 storageProxy_->Rollback();
1695 return errCode;
1696 }
1697 LOGI("[CloudSyncer] Clean cloud data success, mode=%d", mode);
1698 if (!assets.empty() && mode == FLAG_AND_DATA) {
1699 errCode = cloudDB_.RemoveLocalAssets(assets);
1700 if (errCode != E_OK) {
1701 LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1702 storageProxy_->Rollback();
1703 return errCode;
1704 }
1705 LOGI("[CloudSyncer] Remove local asset success, size=%zu", assets.size());
1706 }
1707
1708 storageProxy_->Commit();
1709 return errCode;
1710 }
1711
ClearCloudWatermark(const std::vector<std::string> & tableNameList)1712 int CloudSyncer::ClearCloudWatermark(const std::vector<std::string> &tableNameList)
1713 {
1714 std::lock_guard<std::mutex> lock(syncMutex_);
1715 return CloudSyncUtils::ClearCloudWatermark(tableNameList, storageProxy_);
1716 }
1717
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1718 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1719 {
1720 for (const auto &tableName: tableNameList) {
1721 int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1722 if (ret != E_OK) {
1723 LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1724 return ret;
1725 }
1726 }
1727 return E_OK;
1728 }
1729
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1730 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m)
1731 {
1732 LOGI("[CloudSyncer] save cloud water [%s] of table [%s length[%u]]", param.cloudWaterMark.c_str(),
1733 DBCommon::StringMiddleMasking(param.tableName).c_str(), param.tableName.length());
1734 {
1735 std::lock_guard<std::mutex> autoLock(dataLock_);
1736 if (cloudTaskInfos_[taskId].isAssetsOnly) {
1737 LOGI("[CloudSyncer] assets only task not save water mark.");
1738 return;
1739 }
1740 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1741 }
1742 }
1743
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1744 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1745 DownloadCommitList &commitList, int errCode)
1746 {
1747 if (commitList.empty()) {
1748 return E_OK;
1749 }
1750 uint32_t successCount = 0u;
1751 int ret = E_OK;
1752 if (info.isAsyncDownload) {
1753 ret = HandleDownloadResultForAsyncDownload(downloadItem, info, commitList, successCount);
1754 } else {
1755 ret = HandleDownloadResult(downloadItem, info, commitList, successCount);
1756 }
1757 if (errCode == E_OK) {
1758 info.downLoadInfo.failCount += (commitList.size() - successCount);
1759 info.downLoadInfo.successCount -= (commitList.size() - successCount);
1760 }
1761 if (ret != E_OK) {
1762 LOGE("Commit download result failed.%d", ret);
1763 }
1764 commitList.clear();
1765 return ret;
1766 }
1767
GetIdentify() const1768 std::string CloudSyncer::GetIdentify() const
1769 {
1770 return id_;
1771 }
1772
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1773 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult)
1774 {
1775 strategyOpResult = OpType::NOT_HANDLE;
1776 // ignore same record with local generate data
1777 if (dataInfo.localInfo.logInfo.device.empty() &&
1778 !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1779 // not handle same data
1780 return E_OK;
1781 }
1782 {
1783 std::lock_guard<std::mutex> autoLock(dataLock_);
1784 if (!currentContext_.strategy) {
1785 LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1786 return -E_INTERNAL_ERROR;
1787 }
1788 bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1789 dataInfo.cloudLogInfo, policy_);
1790 strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1791 dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1792 }
1793 if (strategyOpResult == OpType::DELETE) {
1794 param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1795 }
1796 return E_OK;
1797 }
1798
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1799 int CloudSyncer::GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo,
1800 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1801 {
1802 int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index], true,
1803 logInfo, localAssetInfo);
1804 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1805 return errCode;
1806 }
1807 std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1808 if (hashKey.empty()) {
1809 return errCode;
1810 }
1811
1812 int ret = CheckLocalQueryAssetsOnlyIfNeed(localAssetInfo, param, logInfo);
1813 if (ret != E_OK) {
1814 LOGE("[CloudSyncer] check local assets failed, error code: %d", ret);
1815 return ret;
1816 }
1817
1818 param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1819 param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1820 if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1821 LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1822 DBCommon::TransferStringToHex(hashKey).c_str());
1823 logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1824 logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1825 logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1826 logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1827 logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1828 logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1829 logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1830 // delete record should remove local asset info
1831 if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1832 localAssetInfo.clear();
1833 }
1834 errCode = E_OK;
1835 }
1836 logInfo.logInfo.isNeedUpdateAsset = CloudSyncUtils::IsNeedUpdateAsset(localAssetInfo);
1837 return errCode;
1838 }
1839
GetNextTaskId()1840 TaskId CloudSyncer::GetNextTaskId()
1841 {
1842 std::lock_guard<std::mutex> autoLock(dataLock_);
1843 if (!taskQueue_.empty()) {
1844 return taskQueue_.begin()->second;
1845 }
1846 return INVALID_TASK_ID;
1847 }
1848
MarkCurrentTaskPausedIfNeed(const CloudTaskInfo & taskInfo)1849 void CloudSyncer::MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo)
1850 {
1851 // must sure have dataLock_ before call this function.
1852 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1853 return;
1854 }
1855 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1856 return;
1857 }
1858 if (cloudTaskInfos_[currentContext_.currentTaskId].priorityLevel < taskInfo.priorityLevel) {
1859 cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1860 LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1861 }
1862 }
1863
SetCurrentTaskFailedWithoutLock(int errCode)1864 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1865 {
1866 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1867 return;
1868 }
1869 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1870 }
1871
LockCloudIfNeed(TaskId taskId)1872 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1873 {
1874 {
1875 std::lock_guard<std::mutex> autoLock(dataLock_);
1876 if (currentContext_.locker != nullptr) {
1877 LOGD("[CloudSyncer] lock exist");
1878 return E_OK;
1879 }
1880 }
1881 std::shared_ptr<CloudLocker> locker = nullptr;
1882 int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1883 return LockCloud(taskId);
1884 }, [this]() {
1885 int unlockCode = UnlockCloud();
1886 if (unlockCode != E_OK) {
1887 SetCurrentTaskFailedWithoutLock(unlockCode);
1888 }
1889 }, locker);
1890 {
1891 std::lock_guard<std::mutex> autoLock(dataLock_);
1892 currentContext_.locker = locker;
1893 }
1894 return errCode;
1895 }
1896
UnlockIfNeed()1897 void CloudSyncer::UnlockIfNeed()
1898 {
1899 std::shared_ptr<CloudLocker> cacheLocker;
1900 {
1901 std::lock_guard<std::mutex> autoLock(dataLock_);
1902 cacheLocker = currentContext_.locker;
1903 currentContext_.locker = nullptr;
1904 }
1905 // unlock without mutex
1906 cacheLocker = nullptr;
1907 }
1908
ClearCurrentContextWithoutLock()1909 void CloudSyncer::ClearCurrentContextWithoutLock()
1910 {
1911 heartbeatCount_.erase(currentContext_.currentTaskId);
1912 failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1913 currentContext_.currentTaskId = INVALID_TASK_ID;
1914 currentContext_.notifier = nullptr;
1915 currentContext_.strategy = nullptr;
1916 currentContext_.processRecorder = nullptr;
1917 currentContext_.tableName.clear();
1918 currentContext_.assetDownloadList.clear();
1919 currentContext_.assetFields.clear();
1920 currentContext_.assetsInfo.clear();
1921 currentContext_.cloudWaterMarks.clear();
1922 currentContext_.isNeedUpload = false;
1923 currentContext_.currentUserIndex = 0;
1924 currentContext_.repeatCount = 0;
1925 }
1926
IsAlreadyHaveCompensatedSyncTask()1927 bool CloudSyncer::IsAlreadyHaveCompensatedSyncTask()
1928 {
1929 std::lock_guard<std::mutex> autoLock(dataLock_);
1930 for (const auto &item : cloudTaskInfos_) {
1931 const auto &taskInfo = item.second;
1932 if (taskInfo.compensatedTask) {
1933 return true;
1934 }
1935 }
1936 return false;
1937 }
1938
ClearContextAndNotify(TaskId taskId,int errCode)1939 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1940 {
1941 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1942 CloudTaskInfo info;
1943 {
1944 // clear current context
1945 std::lock_guard<std::mutex> autoLock(dataLock_);
1946 notifier = currentContext_.notifier;
1947 ClearCurrentContextWithoutLock();
1948 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1949 LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1950 contextCv_.notify_all();
1951 return;
1952 }
1953 info = std::move(cloudTaskInfos_[taskId]);
1954 cloudTaskInfos_.erase(taskId);
1955 resumeTaskInfos_.erase(taskId);
1956 }
1957 int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1958 if (err != E_OK) {
1959 // if clear unlocking failed, no return to avoid affecting the entire process
1960 LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1961 }
1962 contextCv_.notify_all();
1963 if (info.errCode == E_OK) {
1964 info.errCode = errCode;
1965 }
1966 LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1967 info.errCode);
1968 info.status = ProcessStatus::FINISHED;
1969 if (notifier != nullptr) {
1970 notifier->UpdateAllTablesFinally();
1971 notifier->NotifyProcess(info, {}, true);
1972 }
1973 // generate compensated sync
1974 // if already have compensated sync task in queue, no need to generate new compensated sync task
1975 if (!info.compensatedTask && !IsAlreadyHaveCompensatedSyncTask()) {
1976 CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1977 GenerateCompensatedSync(taskInfo);
1978 }
1979 }
1980
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1981 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1982 {
1983 int ret = CheckTaskIdValid(taskId);
1984 if (ret != E_OK) {
1985 return ret;
1986 }
1987 ret = DownloadDataFromCloud(taskId, param, isFirstDownload);
1988 if (ret != E_OK) {
1989 return ret;
1990 }
1991 bool downloadAssetOnly = false;
1992 if (param.downloadData.data.empty()) {
1993 if (param.assetsDownloadList.empty()) {
1994 return E_OK;
1995 }
1996 LOGI("[CloudSyncer] Resume task need download assets");
1997 downloadAssetOnly = true;
1998 }
1999 // Save data in transaction, update cloud water mark, notify process and changed data
2000 ret = SaveDataNotifyProcess(taskId, param, downloadAssetOnly);
2001 if (ret == -E_TASK_PAUSED) {
2002 return ret;
2003 }
2004 if (ret != E_OK) {
2005 std::lock_guard<std::mutex> autoLock(dataLock_);
2006 param.info.tableStatus = ProcessStatus::FINISHED;
2007 currentContext_.notifier->UpdateProcess(param.info);
2008 return ret;
2009 }
2010 (void)NotifyInDownload(taskId, param, isFirstDownload);
2011 return SaveCloudWaterMark(param.tableName, taskId);
2012 }
2013
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)2014 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
2015 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
2016 {
2017 CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
2018 if (downloadItem.assets.empty()) { // Download data (include deleting)
2019 return E_OK;
2020 }
2021 bool isSharedTable = false;
2022 int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
2023 if (errorCode != E_OK) {
2024 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
2025 return errorCode;
2026 }
2027 if (!isSharedTable) {
2028 errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
2029 if (errorCode == -E_NOT_SET) {
2030 return -E_NOT_SET;
2031 }
2032 } else {
2033 // share table will not download asset, need to reset the status
2034 for (auto &entry: downloadItem.assets) {
2035 for (auto &asset: entry.second) {
2036 asset.status = AssetStatus::NORMAL;
2037 }
2038 }
2039 }
2040 ModifyDownLoadInfoCount(errorCode, info);
2041 if (!downloadItem.assets.empty()) {
2042 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
2043 if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
2044 LOGW("[CloudSyncer] [DownloadOneAssetRecord] strategy is invalid.");
2045 } else {
2046 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
2047 downloadItem.primaryKeyValList);
2048 }
2049 } else if (downloadItem.strategy == OpType::INSERT) {
2050 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2051 }
2052 }
2053 return errorCode;
2054 }
2055
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2056 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m)
2057 {
2058 if (IsCurrentTableResume(taskId, false)) {
2059 std::lock_guard<std::mutex> autoLock(dataLock_);
2060 if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2061 param = resumeTaskInfos_[taskId].syncParam;
2062 resumeTaskInfos_[taskId].syncParam = {};
2063 int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2064 if (ret != E_OK) {
2065 LOGW("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2066 }
2067 return E_OK;
2068 }
2069 }
2070 int ret = GetCurrentTableName(param.tableName);
2071 if (ret != E_OK) {
2072 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2073 return ret;
2074 }
2075 param.info.tableName = param.tableName;
2076 std::vector<Field> assetFields;
2077 // only no primary key and composite primary key contains rowid.
2078 ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2079 if (ret != E_OK) {
2080 LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2081 return ret;
2082 }
2083 std::shared_ptr<ProcessNotifier> notifier = nullptr;
2084 {
2085 std::lock_guard<std::mutex> autoLock(dataLock_);
2086 currentContext_.assetFields[currentContext_.tableName] = assetFields;
2087 notifier = currentContext_.notifier;
2088 }
2089 param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2090 if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsNeedProcessCloudCursor(taskId))) {
2091 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2092 if (ret != E_OK) {
2093 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2094 }
2095 if (!IsCurrentTaskResume(taskId)) {
2096 ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2097 }
2098 }
2099 notifier->GetDownloadInfoByTableName(param.info);
2100 auto queryObject = GetQuerySyncObject(param.tableName);
2101 param.isAssetsOnly = queryObject.IsAssetsOnly();
2102 param.groupNum = queryObject.GetGroupNum();
2103 param.assetsGroupMap = queryObject.GetAssetsOnlyGroupMap();
2104 param.isVaildForAssetsOnly = queryObject.AssetsOnlyErrFlag() == E_OK;
2105 param.isForcePullAseets = IsModeForcePull(taskId) && queryObject.IsContainQueryNodes();
2106 return ret;
2107 }
2108
IsCurrentTaskResume(TaskId taskId)2109 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2110 {
2111 std::lock_guard<std::mutex> autoLock(dataLock_);
2112 return cloudTaskInfos_[taskId].resume;
2113 }
2114
IsCurrentTableResume(TaskId taskId,bool upload)2115 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2116 {
2117 std::lock_guard<std::mutex> autoLock(dataLock_);
2118 if (!cloudTaskInfos_[taskId].resume) {
2119 return false;
2120 }
2121 if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2122 return false;
2123 }
2124 return upload == resumeTaskInfos_[taskId].upload;
2125 }
2126
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool isFirstDownload)2127 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
2128 {
2129 // Get cloud data after cloud water mark
2130 param.info.tableStatus = ProcessStatus::PROCESSING;
2131 param.downloadData = {};
2132 if (param.isAssetsOnly) {
2133 param.cloudWaterMarkForAssetsOnly = param.cloudWaterMark;
2134 }
2135 int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2136 {
2137 std::lock_guard<std::mutex> autoLock(dataLock_);
2138 CloudSyncUtils::CheckQueryCloudData(
2139 cloudTaskInfos_[taskId].prepareTraceId, param.downloadData, param.pkColNames);
2140 }
2141 if (ret == -E_QUERY_END) {
2142 // Won't break here since downloadData may not be null
2143 param.isLastBatch = true;
2144 } else if (ret != E_OK) {
2145 std::lock_guard<std::mutex> autoLock(dataLock_);
2146 param.info.tableStatus = ProcessStatus::FINISHED;
2147 currentContext_.notifier->UpdateProcess(param.info);
2148 return ret;
2149 }
2150
2151 ret = CheckCloudQueryAssetsOnlyIfNeed(taskId, param);
2152 if (ret != E_OK) {
2153 LOGE("[CloudSyncer] query assets failed, error code: %d", ret);
2154 std::lock_guard<std::mutex> autoLock(dataLock_);
2155 param.info.tableStatus = ProcessStatus::FINISHED;
2156 currentContext_.notifier->UpdateProcess(param.info);
2157 return ret;
2158 }
2159
2160 if (param.downloadData.data.empty()) {
2161 if (ret == E_OK || isFirstDownload) {
2162 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2163 UpdateCloudWaterMark(taskId, param);
2164 // Cloud water may change on the cloud, it needs to be saved here
2165 SaveCloudWaterMark(param.tableName, taskId);
2166 }
2167 if (isFirstDownload) {
2168 NotifyInEmptyDownload(taskId, param.info);
2169 }
2170 }
2171 return E_OK;
2172 }
2173
ModifyDownLoadInfoCount(const int errorCode,InnerProcessInfo & info)2174 void CloudSyncer::ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info)
2175 {
2176 if (errorCode == E_OK) {
2177 return;
2178 }
2179 info.downLoadInfo.failCount += 1;
2180 if (info.downLoadInfo.successCount == 0) {
2181 LOGW("[CloudSyncer] Invalid successCount");
2182 } else {
2183 info.downLoadInfo.successCount -= 1;
2184 }
2185 }
2186 } // namespace DistributedDB
2187