1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38 std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39 : lastTaskId_(INVALID_TASK_ID),
40 storageProxy_(std::move(storageProxy)),
41 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42 closed_(false),
43 hasKvRemoveTask(false),
44 timerId_(0u),
45 isKvScene_(isKvScene),
46 policy_(policy),
47 asyncTaskId_(INVALID_TASK_ID),
48 cancelAsyncTask_(false),
49 waitDownloadListener_(nullptr)
50 {
51 if (storageProxy_ != nullptr) {
52 id_ = storageProxy_->GetIdentify();
53 }
54 InitCloudSyncStateMachine();
55 }
56
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)57 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
58 const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
59 {
60 CloudTaskInfo taskInfo;
61 taskInfo.mode = mode;
62 taskInfo.table = tables;
63 taskInfo.callback = callback;
64 taskInfo.timeout = waitTime;
65 taskInfo.devices = devices;
66 for (const auto &item: tables) {
67 QuerySyncObject syncObject;
68 syncObject.SetTableName(item);
69 taskInfo.queryList.push_back(syncObject);
70 }
71 return Sync(taskInfo);
72 }
73
Sync(const CloudTaskInfo & taskInfo)74 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
75 {
76 int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
77 if (errCode != E_OK) {
78 return errCode;
79 }
80 if (cloudDB_.IsNotExistCloudDB()) {
81 LOGE("[CloudSyncer] Not set cloudDB!");
82 return -E_CLOUD_ERROR;
83 }
84 if (closed_) {
85 LOGE("[CloudSyncer] DB is closed!");
86 return -E_DB_CLOSED;
87 }
88 if (hasKvRemoveTask) {
89 LOGE("[CloudSyncer] has remove task, stop sync task!");
90 return -E_CLOUD_ERROR;
91 }
92 CloudTaskInfo info = taskInfo;
93 info.status = ProcessStatus::PREPARED;
94 errCode = TryToAddSyncTask(std::move(info));
95 if (errCode != E_OK) {
96 return errCode;
97 }
98 return TriggerSync();
99 }
100
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)101 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
102 {
103 cloudDB_.SetCloudDB(cloudDB);
104 LOGI("[CloudSyncer] SetCloudDB finish");
105 }
106
GetCloudDB() const107 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
108 {
109 return cloudDB_.GetCloudDB();
110 }
111
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)112 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
113 {
114 storageProxy_->SetIAssetLoader(loader);
115 cloudDB_.SetIAssetLoader(loader);
116 LOGI("[CloudSyncer] SetIAssetLoader finish");
117 }
118
Close()119 void CloudSyncer::Close()
120 {
121 closed_ = true;
122 CloudSyncer::TaskId currentTask;
123 {
124 std::lock_guard<std::mutex> autoLock(dataLock_);
125 currentTask = currentContext_.currentTaskId;
126 }
127 // mark current task db_closed
128 SetTaskFailed(currentTask, -E_DB_CLOSED);
129 UnlockIfNeed();
130 cloudDB_.Close();
131 WaitCurTaskFinished();
132
133 // copy all task from queue
134 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
135 for (auto &info: infoList) {
136 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
137 }
138 storageProxy_->Close();
139 }
140
TriggerSync()141 int CloudSyncer::TriggerSync()
142 {
143 if (closed_) {
144 return -E_DB_CLOSED;
145 }
146 RefObject::IncObjRef(this);
147 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
148 DoSyncIfNeed();
149 RefObject::DecObjRef(this);
150 });
151 if (errCode != E_OK) {
152 LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
153 RefObject::DecObjRef(this);
154 }
155 return errCode;
156 }
157
SetProxyUser(const std::string & user)158 void CloudSyncer::SetProxyUser(const std::string &user)
159 {
160 std::lock_guard<std::mutex> autoLock(dataLock_);
161 if (storageProxy_ != nullptr) {
162 storageProxy_->SetUser(user);
163 }
164 if (currentContext_.notifier != nullptr) {
165 currentContext_.notifier->SetUser(user);
166 }
167 currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
168 cloudDB_.SwitchCloudDB(user);
169 }
170
DoSyncIfNeed()171 void CloudSyncer::DoSyncIfNeed()
172 {
173 if (closed_) {
174 return;
175 }
176 // do all sync task in this loop
177 do {
178 // get taskId from queue
179 TaskId triggerTaskId = GetNextTaskId();
180 if (triggerTaskId == INVALID_TASK_ID) {
181 LOGD("[CloudSyncer] task queue empty");
182 break;
183 }
184 // pop taskId in queue
185 if (PrepareSync(triggerTaskId) != E_OK) {
186 break;
187 }
188 // if the task is compensated task, we should get the query data and user list.
189 if (IsCompensatedTask(triggerTaskId) && !TryToInitQueryAndUserListForCompensatedSync(triggerTaskId)) {
190 // try to init query fail, we finish the compensated task.
191 continue;
192 }
193 CancelBackgroundDownloadAssetsTaskIfNeed();
194 // do sync logic
195 std::vector<std::string> usersList;
196 {
197 std::lock_guard<std::mutex> autoLock(dataLock_);
198 usersList = cloudTaskInfos_[triggerTaskId].users;
199 currentContext_.currentUserIndex = 0;
200 }
201 int errCode = E_OK;
202 if (usersList.empty()) {
203 SetProxyUser("");
204 errCode = DoSync(triggerTaskId);
205 } else {
206 for (const auto &user : usersList) {
207 SetProxyUser(user);
208 errCode = DoSync(triggerTaskId);
209 }
210 }
211 LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
212 } while (!closed_);
213 LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
214 }
215
DoSync(TaskId taskId)216 int CloudSyncer::DoSync(TaskId taskId)
217 {
218 std::lock_guard<std::mutex> lock(syncMutex_);
219 ResetCurrentTableUploadBatchIndex();
220 CloudTaskInfo taskInfo;
221 {
222 std::lock_guard<std::mutex> autoLock(dataLock_);
223 taskInfo = cloudTaskInfos_[taskId];
224 cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
225 LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
226 static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
227 }
228 bool needUpload = true;
229 bool isNeedFirstDownload = false;
230 {
231 std::lock_guard<std::mutex> autoLock(dataLock_);
232 if (currentContext_.strategy != nullptr) {
233 needUpload = currentContext_.strategy->JudgeUpload();
234 }
235 // 1. if the locker is already exist, directly reuse the lock, no need do the first download
236 // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
237 isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
238 }
239 int errCode = E_OK;
240 bool isFirstDownload = true;
241 if (isNeedFirstDownload) {
242 // do first download
243 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
244 SetTaskFailed(taskId, errCode);
245 if (errCode != E_OK) {
246 SyncMachineDoFinished();
247 return errCode;
248 }
249 bool isActuallyNeedUpload = false; // whether the task actually has data to upload
250 {
251 std::lock_guard<std::mutex> autoLock(dataLock_);
252 isActuallyNeedUpload = currentContext_.isNeedUpload;
253 }
254 if (!isActuallyNeedUpload) {
255 LOGI("[CloudSyncer] no table need upload!");
256 SyncMachineDoFinished();
257 return E_OK;
258 }
259 isFirstDownload = false;
260 }
261
262 {
263 std::lock_guard<std::mutex> autoLock(dataLock_);
264 currentContext_.isFirstDownload = isFirstDownload;
265 currentContext_.isRealNeedUpload = needUpload;
266 }
267 errCode = DoSyncInner(taskInfo);
268 return errCode;
269 }
270
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)271 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
272 {
273 {
274 std::lock_guard<std::mutex> autoLock(dataLock_);
275 currentContext_.tableName = taskInfo.table[index];
276 }
277 int errCode = CheckTaskIdValid(taskInfo.taskId);
278 if (errCode != E_OK) {
279 LOGE("[CloudSyncer] task is invalid, abort sync");
280 return errCode;
281 }
282 if (taskInfo.table.empty()) {
283 LOGE("[CloudSyncer] Invalid taskInfo table");
284 return -E_INVALID_ARGS;
285 }
286 errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
287 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
288 {
289 std::lock_guard<std::mutex> autoLock(dataLock_);
290 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
291 taskInfo.table[index], false);
292 LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
293 }
294 return errCode;
295 }
296 if (errCode != E_OK) {
297 LOGE("[CloudSyncer] upload failed %d", errCode);
298 return errCode;
299 }
300 return errCode;
301 }
302
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)303 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
304 {
305 if (!needUpload || taskInfo.isAssetsOnly) {
306 return E_OK;
307 }
308 storageProxy_->BeforeUploadTransaction();
309 int errCode = storageProxy_->StartTransaction();
310 if (errCode != E_OK) {
311 LOGE("[CloudSyncer] start transaction failed before doing upload.");
312 return errCode;
313 }
314 for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
315 LOGD("[CloudSyncer] try upload table, index: %zu, table name: %s, length: %u",
316 i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
317 if (IsTableFinishInUpload(taskInfo.table[i])) {
318 continue;
319 }
320 errCode = PrepareAndUpload(taskInfo, i);
321 if (errCode == -E_TASK_PAUSED) { // should re download [paused table, last table]
322 for (size_t j = i; j < taskInfo.table.size(); ++j) {
323 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
324 }
325 }
326 if (errCode != E_OK) {
327 break;
328 }
329 MarkUploadFinishIfNeed(taskInfo.table[i]);
330 }
331 if (errCode == -E_TASK_PAUSED) {
332 std::lock_guard<std::mutex> autoLock(dataLock_);
333 resumeTaskInfos_[taskInfo.taskId].upload = true;
334 }
335 if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
336 int commitErrorCode = storageProxy_->Commit();
337 if (commitErrorCode != E_OK) {
338 LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
339 }
340 } else {
341 int rollBackErrorCode = storageProxy_->Rollback();
342 if (rollBackErrorCode != E_OK) {
343 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
344 }
345 }
346 return errCode;
347 }
348
SyncMachineDoDownload()349 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
350 {
351 CloudTaskInfo taskInfo;
352 bool needUpload;
353 bool isFirstDownload;
354 {
355 std::lock_guard<std::mutex> autoLock(dataLock_);
356 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
357 needUpload = currentContext_.isRealNeedUpload;
358 isFirstDownload = currentContext_.isFirstDownload;
359 }
360 int errCode = E_OK;
361 if (IsLockInDownload()) {
362 errCode = LockCloudIfNeed(taskInfo.taskId);
363 }
364 if (errCode != E_OK) {
365 return SetCurrentTaskFailedInMachine(errCode);
366 }
367 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
368 if (errCode != E_OK) {
369 if (errCode == -E_TASK_PAUSED) {
370 DBDfxAdapter::ReportBehavior(
371 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
372 } else {
373 DBDfxAdapter::ReportBehavior(
374 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
375 }
376 return SetCurrentTaskFailedInMachine(errCode);
377 }
378 DBDfxAdapter::ReportBehavior(
379 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
380 return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
381 }
382
SyncMachineDoUpload()383 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
384 {
385 CloudTaskInfo taskInfo;
386 bool needUpload;
387 {
388 std::lock_guard<std::mutex> autoLock(dataLock_);
389 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
390 needUpload = currentContext_.isRealNeedUpload;
391 }
392 DBDfxAdapter::ReportBehavior(
393 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
394 int errCode = DoUploadInNeed(taskInfo, needUpload);
395 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
396 DBDfxAdapter::ReportBehavior(
397 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
398 return CloudSyncEvent::REPEAT_CHECK_EVENT;
399 }
400 if (errCode != E_OK) {
401 {
402 std::lock_guard<std::mutex> autoLock(dataLock_);
403 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
404 }
405 if (errCode == -E_TASK_PAUSED) {
406 DBDfxAdapter::ReportBehavior(
407 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
408 } else {
409 DBDfxAdapter::ReportBehavior(
410 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
411 }
412 return CloudSyncEvent::ERROR_EVENT;
413 }
414 DBDfxAdapter::ReportBehavior(
415 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
416 return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
417 }
418
SyncMachineDoFinished()419 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
420 {
421 UnlockIfNeed();
422 TaskId taskId;
423 int errCode;
424 int currentUserIndex;
425 int userListSize;
426 {
427 std::lock_guard<std::mutex> autoLock(dataLock_);
428 taskId = currentContext_.currentTaskId;
429 errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
430 currentUserIndex = currentContext_.currentUserIndex;
431 userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
432 }
433 if (currentUserIndex >= userListSize) {
434 {
435 std::lock_guard<std::mutex> autoLock(dataLock_);
436 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
437 }
438 DoFinished(taskId, errCode);
439 } else {
440 CloudTaskInfo taskInfo;
441 {
442 std::lock_guard<std::mutex> autoLock(dataLock_);
443 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
444 }
445 taskInfo.status = ProcessStatus::FINISHED;
446 currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
447 {
448 std::lock_guard<std::mutex> autoLock(dataLock_);
449 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
450 }
451 }
452 return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
453 }
454
DoSyncInner(const CloudTaskInfo & taskInfo)455 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
456 {
457 cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
458 DBDfxAdapter::ReportBehavior(
459 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
460 return E_OK;
461 }
462
DoFinished(TaskId taskId,int errCode)463 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
464 {
465 storageProxy_->OnSyncFinish();
466 if (errCode == -E_TASK_PAUSED) {
467 DBDfxAdapter::ReportBehavior(
468 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
469 LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
470 {
471 std::lock_guard<std::mutex> autoLock(dataLock_);
472 resumeTaskInfos_[taskId].context = std::move(currentContext_);
473 currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
474 resumeTaskInfos_[taskId].context.locker = nullptr;
475 ClearCurrentContextWithoutLock();
476 }
477 contextCv_.notify_all();
478 return;
479 }
480 {
481 std::lock_guard<std::mutex> autoLock(dataLock_);
482 RemoveTaskFromQueue(cloudTaskInfos_[taskId].priorityLevel, taskId);
483 }
484 ClearContextAndNotify(taskId, errCode);
485 }
486
487 /**
488 * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
489 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)490 int CloudSyncer::UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList)
491 {
492 if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
493 return E_OK;
494 }
495 int ret = E_OK;
496 for (size_t j : param.withoutRowIdData.insertData) {
497 VBucket &datum = param.downloadData.data[j];
498 std::vector<Type> primaryValues;
499 ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
500 std::get<int64_t>(datum[DBConstant::ROWID]), primaryValues);
501 if (ret != E_OK) {
502 LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
503 return ret;
504 }
505 param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
506 }
507 for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
508 size_t downloadIndex = std::get<0>(tuple);
509 VBucket &datum = param.downloadData.data[downloadIndex];
510 size_t insertIdx = std::get<1>(tuple);
511 std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
512 pkVal[0] = datum[DBConstant::ROWID];
513 }
514 for (const auto &tuple : param.withoutRowIdData.updateData) {
515 size_t downloadIndex = std::get<0>(tuple);
516 size_t updateIndex = std::get<1>(tuple);
517 VBucket &datum = param.downloadData.data[downloadIndex];
518 size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
519 if (updateIndex >= size) {
520 LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
521 return -E_INTERNAL_ERROR;
522 }
523 if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
524 LOGE("[CloudSyncer] primary key value list should not be empty.");
525 return -E_INTERNAL_ERROR;
526 }
527 // no primary key or composite primary key, the first element is rowid
528 param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
529 datum[DBConstant::ROWID];
530 }
531 return ret;
532 }
533
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)534 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
535 {
536 for (const auto &assetField : assetFields) {
537 if (data.find(assetField.colName) == data.end()) {
538 return false;
539 }
540 if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
541 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
542 return true;
543 }
544 }
545 }
546 return false;
547 }
548
IsDataContainAssets()549 bool CloudSyncer::IsDataContainAssets()
550 {
551 std::lock_guard<std::mutex> autoLock(dataLock_);
552 bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
553 if (!hasTable) {
554 LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
555 -E_INTERNAL_ERROR);
556 return false;
557 }
558 if (currentContext_.assetFields[currentContext_.tableName].empty()) {
559 return false;
560 }
561 return true;
562 }
563
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,bool isForcePullAseets,int & errCode)564 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
565 bool setNormalStatus, bool isForcePullAseets, int &errCode)
566 {
567 // Define a map to store the result
568 std::map<std::string, Assets> res = {};
569 std::vector<Field> assetFields;
570 {
571 std::lock_guard<std::mutex> autoLock(dataLock_);
572 assetFields = currentContext_.assetFields[currentContext_.tableName];
573 }
574 // For every column contain asset or assets, assetFields are in context
575 TagAssetsInfo tagAssetsInfo = {coveredData, beCoveredData, setNormalStatus, isForcePullAseets};
576 for (const Field &assetField : assetFields) {
577 Assets assets = TagAssetsInSingleCol(tagAssetsInfo, assetField, errCode);
578 if (!assets.empty()) {
579 res[assetField.colName] = assets;
580 }
581 if (errCode != E_OK) {
582 break;
583 }
584 }
585 return res;
586 }
587
FillCloudAssets(InnerProcessInfo & info,VBucket & normalAssets,VBucket & failedAssets)588 int CloudSyncer::FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets)
589 {
590 int ret = E_OK;
591 if (normalAssets.size() > 1) {
592 if (info.isAsyncDownload) {
593 ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, normalAssets, true);
594 } else {
595 ret = storageProxy_->FillCloudAssetForDownload(info.tableName, normalAssets, true);
596 }
597 if (ret != E_OK) {
598 LOGE("[CloudSyncer]%s download: Can not fill normal assets for download",
599 info.isAsyncDownload ? "Async" : "Sync");
600 return ret;
601 }
602 }
603 if (failedAssets.size() > 1) {
604 if (info.isAsyncDownload) {
605 ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, failedAssets, false);
606 } else {
607 ret = storageProxy_->FillCloudAssetForDownload(info.tableName, failedAssets, false);
608 }
609 if (ret != E_OK) {
610 LOGE("[CloudSyncer]%s download: Can not fill abnormal assets for download",
611 info.isAsyncDownload ? "Async" : "Sync");
612 return ret;
613 }
614 }
615 return E_OK;
616 }
617
HandleDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)618 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
619 DownloadCommitList &commitList, uint32_t &successCount)
620 {
621 successCount = 0;
622 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
623 if (errCode != E_OK) {
624 LOGE("[CloudSyncer] start transaction Failed before handle download.");
625 return errCode;
626 }
627 errCode = CommitDownloadAssets(downloadItem, info, commitList, successCount);
628 if (errCode != E_OK) {
629 successCount = 0;
630 int ret = E_OK;
631 if (errCode == -E_REMOVE_ASSETS_FAILED) {
632 // remove assets failed no effect to asset status, just commit
633 ret = storageProxy_->Commit();
634 } else {
635 ret = storageProxy_->Rollback();
636 }
637 LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
638 return errCode;
639 }
640 errCode = storageProxy_->Commit();
641 if (errCode != E_OK) {
642 successCount = 0;
643 LOGE("[CloudSyncer] commit failed %d", errCode);
644 }
645 return errCode;
646 }
647
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)648 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
649 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
650 {
651 int downloadStatus = E_OK;
652 {
653 std::lock_guard<std::mutex> autoLock(dataLock_);
654 downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
655 resumeTaskInfos_[taskId].downloadStatus = E_OK;
656 }
657 int errorCode = E_OK;
658 DownloadCommitList commitList;
659 for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
660 errorCode = CheckTaskIdValid(taskId);
661 if (errorCode != E_OK) {
662 std::lock_guard<std::mutex> autoLock(dataLock_);
663 resumeTaskInfos_[taskId].lastDownloadIndex = i;
664 resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
665 break;
666 }
667 DownloadItem downloadItem;
668 GetDownloadItem(downloadList, i, downloadItem);
669 errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
670 if (errorCode == -E_NOT_SET) {
671 info.downLoadInfo.failCount += (downloadList.size() - i);
672 info.downLoadInfo.successCount -= (downloadList.size() - i);
673 return errorCode;
674 }
675 if (downloadItem.strategy == OpType::DELETE) {
676 downloadItem.assets = {};
677 downloadItem.gid = "";
678 }
679 // Process result of each asset
680 commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
681 downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
682 int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
683 if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
684 return ret;
685 }
686 downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
687 }
688 LOGD("Download status is %d", downloadStatus);
689 return errorCode == E_OK ? downloadStatus : errorCode;
690 }
691
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)692 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
693 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
694 {
695 if (!IsDataContainAssets()) {
696 return E_OK;
697 }
698 // update changed data info
699 if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
700 // changedData.primaryData should have no member inside
701 return -E_INVALID_ARGS;
702 }
703 changedAssets.tableName = info.tableName;
704 changedAssets.type = ChangedDataType::ASSET;
705 changedAssets.field = pKColNames;
706
707 // Get AssetDownloadList
708 DownloadList changeList;
709 TaskId taskId;
710 {
711 std::lock_guard<std::mutex> autoLock(dataLock_);
712 changeList = currentContext_.assetDownloadList;
713 taskId = currentContext_.currentTaskId;
714 }
715 // Download data (include deleting) will handle return Code in this situation
716 int ret = E_OK;
717 if (RuntimeContext::GetInstance()->IsBatchDownloadAssets()) {
718 ret = CloudDbBatchDownloadAssets(taskId, changeList, dupHashKeySet, info, changedAssets);
719 } else {
720 ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
721 }
722 if (ret != E_OK) {
723 LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
724 }
725 return ret;
726 }
727
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)728 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo,
729 VBucket &localAssetInfo)
730 {
731 int ret = E_OK;
732 OpType strategy = param.downloadData.opType[idx];
733 switch (strategy) {
734 case OpType::INSERT:
735 case OpType::UPDATE:
736 case OpType::DELETE:
737 ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
738 break;
739 case OpType::NOT_HANDLE:
740 case OpType::ONLY_UPDATE_GID:
741 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
742 (void)TagAssetsInSingleRecord(
743 localAssetInfo, param.downloadData.data[idx], true, param.isForcePullAseets, ret);
744 for (const auto &[col, value]: localAssetInfo) {
745 param.downloadData.data[idx].insert_or_assign(col, value);
746 }
747 break;
748 }
749 default:
750 break;
751 }
752 return ret;
753 }
754
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)755 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m,
756 VBucket &localAssetInfo)
757 {
758 Type prefix;
759 std::vector<Type> pkVals;
760 OpType strategy = param.downloadData.opType[idx];
761 bool isDelStrategy = (strategy == OpType::DELETE);
762 int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
763 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
764 if (ret != E_OK) {
765 LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
766 return ret;
767 }
768 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
769 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
770 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
771 return -E_INTERNAL_ERROR;
772 }
773 AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
774 std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
775 false, param.isForcePullAseets, ret);
776 if (ret != E_OK) {
777 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
778 return ret;
779 }
780 strategy = CloudSyncUtils::CalOpType(param, idx);
781 if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
782 param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
783 }
784 param.assetsDownloadList.push_back(
785 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
786 pkVals, dataInfo.cloudLogInfo.timestamp));
787 return ret;
788 }
789
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache,std::vector<VBucket> & localInfo)790 int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
791 std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo)
792 {
793 int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
794 if (ret != E_OK) {
795 LOGE("[CloudSyncer] Invalid download data:%d", ret);
796 return ret;
797 }
798 CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
799 DataInfo dataInfo;
800 VBucket localAssetInfo;
801 bool isExist = true;
802 ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
803 if (ret == -E_NOT_FOUND) {
804 isExist = false;
805 } else if (ret != E_OK) {
806 LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
807 return ret;
808 }
809 // Get cloudLogInfo from cloud data
810 dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
811 // Tag datum to get opType
812 ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
813 if (ret != E_OK) {
814 LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
815 return ret;
816 }
817 CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
818 localLogInfoCache);
819
820 if (param.isAssetsOnly && param.downloadData.opType[idx] != OpType::LOCKED_NOT_HANDLE) {
821 // if data is lock, not need to save the local info.
822 auto findGid = param.downloadData.data[idx].find(CloudDbConstant::GID_FIELD);
823 if (findGid == param.downloadData.data[idx].end()) {
824 LOGE("[CloudSyncer] data doe's not have gid field when download only asset: %d.", -E_NOT_FOUND);
825 return -E_NOT_FOUND;
826 }
827 localAssetInfo[CloudDbConstant::GID_FIELD] = findGid->second;
828 localAssetInfo[CloudDbConstant::HASH_KEY_FIELD] = dataInfo.localInfo.logInfo.hashKey;
829 localInfo.push_back(localAssetInfo);
830 return ret; // assets only not need to save changed data without assets.
831 }
832
833 ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
834 if (ret != E_OK) {
835 LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
836 }
837 return ret;
838 }
839
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)840 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m)
841 {
842 if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
843 LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
844 return -E_INVALID_ARGS;
845 }
846 // Update download batch Info
847 param.info.downLoadInfo.batchIndex += 1;
848 param.info.downLoadInfo.total += param.downloadData.data.size();
849 param.info.retryInfo.downloadBatchOpCount = 0;
850 int ret = E_OK;
851 param.deletePrimaryKeySet.clear();
852 param.dupHashKeySet.clear();
853 CloudSyncUtils::ClearWithoutData(param);
854 std::vector<std::pair<Key, size_t>> deletedList;
855 // use for record local delete status
856 std::map<std::string, LogInfo> localLogInfoCache;
857 std::vector<VBucket> localInfo;
858 for (size_t i = 0; i < param.downloadData.data.size(); i++) {
859 ret = SaveDatum(param, i, deletedList, localLogInfoCache, localInfo);
860 if (ret != E_OK) {
861 param.info.downLoadInfo.failCount += param.downloadData.data.size();
862 LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
863 return ret;
864 }
865 }
866 // Save assetsMap into current context
867 {
868 std::lock_guard<std::mutex> autoLock(dataLock_);
869 currentContext_.assetDownloadList = param.assetsDownloadList;
870 }
871
872 ret = PutCloudSyncDataOrUpdateStatusForAssetOnly(param, localInfo);
873 if (ret != E_OK) {
874 return ret;
875 }
876
877 ret = UpdateChangedData(param, currentContext_.assetDownloadList);
878 if (ret != E_OK) {
879 param.info.downLoadInfo.failCount += param.downloadData.data.size();
880 LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
881 return ret;
882 }
883 // Update downloadInfo
884 param.info.downLoadInfo.successCount += param.downloadData.data.size();
885 // Get latest cloudWaterMark
886 VBucket &lastData = param.downloadData.data.back();
887 if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
888 // the last batch of cursor in the conditional query is useless
889 param.cloudWaterMark = {};
890 } else {
891 param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
892 }
893 return param.isAssetsOnly ? E_OK : UpdateFlagForSavedRecord(param);
894 }
895
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)896 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
897 {
898 // Check Input and Context Validity
899 int ret = CheckTaskIdValid(taskId);
900 if (ret != E_OK) {
901 return ret;
902 }
903 {
904 std::lock_guard<std::mutex> autoLock(dataLock_);
905 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
906 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
907 return -E_INVALID_ARGS;
908 }
909 }
910 if (currentContext_.strategy == nullptr) {
911 LOGE("[CloudSyncer] Strategy has not been initialized");
912 return -E_INVALID_ARGS;
913 }
914 ret = storageProxy_->CheckSchema(tableName);
915 if (ret != E_OK) {
916 LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
917 return ret;
918 }
919 return E_OK;
920 }
921
NeedNotifyChangedData(const ChangedData & changedData)922 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
923 {
924 TaskId taskId;
925 {
926 std::lock_guard<std::mutex> autoLock(dataLock_);
927 taskId = currentContext_.currentTaskId;
928 }
929 if (IsModeForcePush(taskId)) {
930 return false;
931 }
932 // when there have no data been changed, it don't need fill back
933 if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
934 changedData.primaryData[OP_DELETE].empty()) {
935 return false;
936 }
937 return true;
938 }
939
NotifyChangedDataInCurrentTask(ChangedData && changedData)940 int CloudSyncer::NotifyChangedDataInCurrentTask(ChangedData &&changedData)
941 {
942 if (!NeedNotifyChangedData(changedData)) {
943 return E_OK;
944 }
945 std::string deviceName;
946 {
947 std::lock_guard<std::mutex> autoLock(dataLock_);
948 std::vector<std::string> devices = currentContext_.notifier->GetDevices();
949 if (devices.empty()) {
950 DBDfxAdapter::ReportBehavior(
951 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
952 LOGE("[CloudSyncer] CurrentContext do not contain device info");
953 return -E_CLOUD_ERROR;
954 }
955 // We use first device name as the target of NotifyChangedData
956 deviceName = devices[0];
957 }
958 return CloudSyncUtils::NotifyChangeData(deviceName, storageProxy_, std::move(changedData));
959 }
960
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)961 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
962 {
963 if (!isFirstDownload && param.downloadData.data.empty()) {
964 // if the second download and there is no download data, do not notify
965 return;
966 }
967 std::lock_guard<std::mutex> autoLock(dataLock_);
968 if (currentContext_.strategy->JudgeUpload()) {
969 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
970 } else {
971 if (param.isLastBatch) {
972 param.info.tableStatus = ProcessStatus::FINISHED;
973 }
974 if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
975 currentContext_.notifier->UpdateProcess(param.info);
976 } else {
977 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
978 }
979 }
980 }
981
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)982 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m)
983 {
984 int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
985 if (ret != E_OK) {
986 LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
987 return ret;
988 }
989 uint64_t cursor = DBConstant::INVALID_CURSOR;
990 std::string maskStoreId = DBCommon::StringMiddleMasking(GetStoreIdByTask(taskId));
991 std::string maskTableName = DBCommon::StringMiddleMasking(param.info.tableName);
992 if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
993 LOGI("[CloudSyncer] cursor before save data is %llu, db: %s, table: %s, task id: %llu", cursor,
994 maskStoreId.c_str(), maskTableName.c_str(), taskId);
995 }
996 (void)storageProxy_->SetCursorIncFlag(true);
997 if (!IsModeForcePush(taskId)) {
998 param.changedData.tableName = param.info.tableName;
999 param.changedData.field = param.pkColNames;
1000 param.changedData.type = ChangedDataType::DATA;
1001 }
1002 ret = SaveData(taskId, param);
1003 (void)storageProxy_->SetCursorIncFlag(false);
1004 if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
1005 LOGI("[CloudSyncer] cursor after save data is %llu, db: %s, table: %s, task id: %llu", cursor,
1006 maskStoreId.c_str(), maskTableName.c_str(), taskId);
1007 }
1008 param.insertPk.clear();
1009 if (ret != E_OK) {
1010 LOGE("[CloudSyncer] cannot save data: %d.", ret);
1011 int rollBackErrorCode = storageProxy_->Rollback();
1012 if (rollBackErrorCode != E_OK) {
1013 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1014 } else {
1015 LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1016 }
1017 return ret;
1018 }
1019 ret = storageProxy_->Commit();
1020 if (ret != E_OK) {
1021 LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1022 }
1023 return ret;
1024 }
1025
DoDownloadAssets(SyncParam & param)1026 int CloudSyncer::DoDownloadAssets(SyncParam ¶m)
1027 {
1028 // Begin downloading assets
1029 ChangedData changedAssets;
1030 int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1031 bool isSharedTable = false;
1032 int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1033 if (errCode != E_OK) {
1034 LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1035 return errCode;
1036 }
1037 if (!isSharedTable) {
1038 (void)NotifyChangedDataInCurrentTask(std::move(changedAssets));
1039 }
1040 if (ret != E_OK) {
1041 LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1042 } else {
1043 param.assetsDownloadList.clear();
1044 }
1045 return ret;
1046 }
1047
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param,bool downloadAssetOnly)1048 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m, bool downloadAssetOnly)
1049 {
1050 InnerProcessInfo tmpInfo = param.info;
1051 int ret = E_OK;
1052 if (!downloadAssetOnly) {
1053 ChangedData changedData;
1054 param.changedData = changedData;
1055 param.downloadData.opType.resize(param.downloadData.data.size());
1056 param.downloadData.existDataKey.resize(param.downloadData.data.size());
1057 param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1058 ret = SaveDataInTransaction(taskId, param);
1059 if (ret != E_OK) {
1060 return ret;
1061 }
1062 // call OnChange to notify changedData object first time (without Assets)
1063 ret = NotifyChangedDataInCurrentTask(std::move(param.changedData));
1064 if (ret != E_OK) {
1065 LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1066 return ret;
1067 }
1068 }
1069 ret = DoDownloadAssets(param);
1070 if (ret == -E_TASK_PAUSED) {
1071 param.info = tmpInfo;
1072 if (IsAsyncDownloadAssets(taskId)) {
1073 UpdateCloudWaterMark(taskId, param);
1074 (void)SaveCloudWaterMark(param.tableName, taskId);
1075 }
1076 }
1077 if (ret != E_OK) {
1078 return ret;
1079 }
1080 UpdateCloudWaterMark(taskId, param);
1081 return E_OK;
1082 }
1083
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1084 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1085 bool lastBatch)
1086 {
1087 CloudTaskInfo taskInfo;
1088 {
1089 std::lock_guard<std::mutex> autoLock(dataLock_);
1090 taskInfo = cloudTaskInfos_[uploadParam.taskId];
1091 }
1092 std::lock_guard<std::mutex> autoLock(dataLock_);
1093 if (uploadParam.lastTable && lastBatch) {
1094 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1095 } else {
1096 currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1097 }
1098 }
1099
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1100 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1101 {
1102 SyncParam param;
1103 int errCode = GetSyncParamForDownload(taskId, param);
1104 if (errCode != E_OK) {
1105 LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1106 return errCode;
1107 }
1108 for (auto it = param.assetsDownloadList.begin(); it != param.assetsDownloadList.end();) {
1109 if (std::get<CloudSyncUtils::STRATEGY_INDEX>(*it) != OpType::DELETE) {
1110 it = param.assetsDownloadList.erase(it);
1111 } else {
1112 it++;
1113 }
1114 }
1115 (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1116 errCode = DoDownloadInner(taskId, param, isFirstDownload);
1117 (void)storageProxy_->ClearAllTempSyncTrigger();
1118 if (errCode == -E_TASK_PAUSED) {
1119 // No need to handle ret.
1120 int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1121 if (ret != E_OK) {
1122 LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1123 }
1124 std::lock_guard<std::mutex> autoLock(dataLock_);
1125 resumeTaskInfos_[taskId].syncParam = std::move(param);
1126 }
1127 return errCode;
1128 }
1129
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1130 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1131 {
1132 // Query data by batch until reaching end and not more data need to be download
1133 int ret = PreCheck(taskId, param.info.tableName);
1134 if (ret != E_OK) {
1135 return ret;
1136 }
1137 do {
1138 ret = DownloadOneBatch(taskId, param, isFirstDownload);
1139 if (ret != E_OK) {
1140 return ret;
1141 }
1142 } while (!param.isLastBatch);
1143 return E_OK;
1144 }
1145
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1146 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1147 {
1148 std::lock_guard<std::mutex> autoLock(dataLock_);
1149 if (currentContext_.strategy->JudgeUpload()) {
1150 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1151 } else {
1152 info.tableStatus = FINISHED;
1153 if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1154 currentContext_.notifier->UpdateProcess(info);
1155 } else {
1156 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1157 }
1158 }
1159 }
1160
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1161 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1162 {
1163 int ret = PreCheck(taskId, tableName);
1164 if (ret != E_OK) {
1165 return ret;
1166 }
1167 {
1168 std::lock_guard<std::mutex> autoLock(dataLock_);
1169 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1170 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1171 return -E_INVALID_ARGS;
1172 }
1173 if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1174 (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1175 LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1176 static_cast<int>(cloudTaskInfos_[taskId].mode));
1177 return -E_INVALID_ARGS;
1178 }
1179 }
1180
1181 return ret;
1182 }
1183
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1184 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1185 InnerProcessInfo &innerProcessInfo)
1186 {
1187 int errCode = E_OK;
1188 if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1189 errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1190 if (errCode != E_OK) {
1191 return errCode;
1192 }
1193 }
1194
1195 if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1196 errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1197 if (errCode != E_OK) {
1198 return errCode;
1199 }
1200 }
1201
1202 if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1203 errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1204 if (errCode != E_OK) {
1205 return errCode;
1206 }
1207 }
1208
1209 if (!uploadData.lockData.rowid.empty()) {
1210 errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1211 if (errCode != E_OK) {
1212 LOGE("[CloudSyncer] Failed to fill cloud log and asset after save upload data: %d", errCode);
1213 }
1214 }
1215 return errCode;
1216 }
1217
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1218 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1219 {
1220 int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1221 if (errCode != E_OK) {
1222 LOGE("[CloudSyncer] Failed to fill cloud log and asset before save upload data: %d", errCode);
1223 return errCode;
1224 }
1225 Info insertInfo;
1226 Info updateInfo;
1227 Info deleteInfo;
1228 errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1229 if (errCode != E_OK) {
1230 return errCode;
1231 }
1232 bool lastBatch = (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount) ==
1233 innerProcessInfo.upLoadInfo.total;
1234 if (lastBatch) {
1235 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1236 }
1237 // After each batch upload successed, call NotifyProcess
1238 NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1239
1240 // if batch upload successed, update local water mark
1241 // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1242 errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1243 if (errCode != E_OK) {
1244 LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1245 }
1246 return errCode;
1247 }
1248
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1249 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1250 {
1251 int errCode = E_OK;
1252 storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1253 // if we use local cover cloud strategy, it won't update local water mark also.
1254 if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1255 !IsQueryListEmpty(uploadParam.taskId))) {
1256 return E_OK;
1257 }
1258 errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1259 if (errCode != E_OK) {
1260 LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1261 }
1262 return errCode;
1263 }
1264
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1265 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1266 {
1267 std::string tableName;
1268 int ret = GetCurrentTableName(tableName);
1269 if (ret != E_OK) {
1270 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1271 return ret;
1272 }
1273
1274 Timestamp localMark = 0u;
1275 ret = PreCheckUpload(taskId, tableName, localMark);
1276 if (ret != E_OK) {
1277 LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1278 return ret;
1279 }
1280 ReloadWaterMarkIfNeed(taskId, localMark);
1281 storageProxy_->OnUploadStart();
1282
1283 int64_t count = 0;
1284 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1285 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1286 LOGI("get upload count:%zu", count);
1287 if (ret != E_OK) {
1288 // GetUploadCount will return E_OK when upload count is zero.
1289 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1290 return ret;
1291 }
1292 if (count == 0) {
1293 UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1294 return E_OK;
1295 }
1296 UploadParam param;
1297 param.count = count;
1298 param.lastTable = lastTable;
1299 param.taskId = taskId;
1300 param.lockAction = lockAction;
1301 return DoUploadInner(tableName, param);
1302 }
1303
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1304 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1305 CloudSyncData &uploadData)
1306 {
1307 // Precheck and calculate local water mark which would be updated if batch upload successed.
1308 int ret = CheckTaskIdValid(uploadParam.taskId);
1309 if (ret != E_OK) {
1310 return ret;
1311 }
1312 ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1313 innerProcessInfo.upLoadInfo.total);
1314 if (ret != E_OK) {
1315 LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1316 return ret;
1317 }
1318 TagUploadAssets(uploadData);
1319 CloudSyncUtils::UpdateAssetsFlag(uploadData);
1320 // get local water mark to be updated in future.
1321 ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1322 uploadParam.taskId, uploadParam.localMark);
1323 if (ret != E_OK) {
1324 LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1325 }
1326 return ret;
1327 }
1328
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1329 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1330 {
1331 {
1332 std::lock_guard<std::mutex> autoLock(dataLock_);
1333 if (cloudTaskInfos_[taskId].isAssetsOnly) {
1334 // assset only task does not need to save water mark.
1335 return E_OK;
1336 }
1337 }
1338 std::string cloudWaterMark;
1339 bool isUpdateCloudCursor = true;
1340 {
1341 std::lock_guard<std::mutex> autoLock(dataLock_);
1342 if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1343 currentContext_.cloudWaterMarks.end() ||
1344 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1345 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1346 LOGD("[CloudSyncer] Not found water mark just return");
1347 return E_OK;
1348 }
1349 cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1350 isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1351 }
1352 isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1353 if (isUpdateCloudCursor) {
1354 int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1355 if (errCode != E_OK) {
1356 LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1357 }
1358 return errCode;
1359 }
1360 return E_OK;
1361 }
1362
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1363 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1364 {
1365 std::lock_guard<std::mutex> autoLock(dataLock_);
1366 uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1367 uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1368 }
1369
IsModeForcePush(TaskId taskId)1370 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1371 {
1372 std::lock_guard<std::mutex> autoLock(dataLock_);
1373 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1374 }
1375
IsModeForcePull(const TaskId taskId)1376 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1377 {
1378 std::lock_guard<std::mutex> autoLock(dataLock_);
1379 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1380 }
1381
IsPriorityTask(TaskId taskId)1382 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1383 {
1384 std::lock_guard<std::mutex> autoLock(dataLock_);
1385 return cloudTaskInfos_[taskId].priorityTask;
1386 }
1387
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1388 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1389 {
1390 // type index of field in fields, true means mandatory filed
1391 static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1392 std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1393 std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1394 std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1395 std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1396 std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1397 std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1398 };
1399
1400 for (const auto &fieldIndex : fieldAndIndex) {
1401 if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1402 if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1403 continue;
1404 }
1405 LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1406 return -E_CLOUD_ERROR;
1407 }
1408 if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1409 LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1410 return -E_CLOUD_ERROR;
1411 }
1412 }
1413
1414 if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1415 CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1416 }
1417 std::lock_guard<std::mutex> autoLock(dataLock_);
1418 if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1419 LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1420 return -E_CLOUD_ERROR;
1421 }
1422 return E_OK;
1423 }
1424
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1425 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1426 DownloadData &downloadData)
1427 {
1428 VBucket extend;
1429 int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1430 if (ret != E_OK) {
1431 return ret;
1432 }
1433 ret = cloudDB_.Query(tableName, extend, downloadData.data);
1434 if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1435 if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1436 LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1437 return -E_CLOUD_ERROR;
1438 }
1439 cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1440 LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1441 return ret;
1442 }
1443 if (ret == -E_QUERY_END) {
1444 LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1445 return -E_QUERY_END;
1446 }
1447 if (ret != E_OK) {
1448 LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1449 }
1450 return ret;
1451 }
1452
CheckTaskIdValid(TaskId taskId)1453 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1454 {
1455 if (closed_) {
1456 LOGE("[CloudSyncer] DB is closed.");
1457 return -E_DB_CLOSED;
1458 }
1459 std::lock_guard<std::mutex> autoLock(dataLock_);
1460 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1461 LOGE("[CloudSyncer] not found task.");
1462 return -E_INVALID_ARGS;
1463 }
1464 if (cloudTaskInfos_[taskId].pause) {
1465 LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1466 return -E_TASK_PAUSED;
1467 }
1468 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1469 LOGE("[CloudSyncer] task %" PRIu64 " has error: %d", taskId, cloudTaskInfos_[taskId].errCode);
1470 return cloudTaskInfos_[taskId].errCode;
1471 }
1472 return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1473 }
1474
GetCurrentTableName(std::string & tableName)1475 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1476 {
1477 std::lock_guard<std::mutex> autoLock(dataLock_);
1478 if (currentContext_.tableName.empty()) {
1479 return -E_BUSY;
1480 }
1481 tableName = currentContext_.tableName;
1482 return E_OK;
1483 }
1484
GetCurrentCommonTaskNum()1485 size_t CloudSyncer::GetCurrentCommonTaskNum()
1486 {
1487 size_t queuedTaskNum = taskQueue_.count(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1488 auto rang = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1489 size_t compensatedTaskNum = 0;
1490 for (auto it = rang.first; it != rang.second; ++it) {
1491 compensatedTaskNum += cloudTaskInfos_[it->second].compensatedTask ? 1 : 0;
1492 }
1493 return queuedTaskNum - compensatedTaskNum;
1494 }
1495
CheckQueueSizeWithNoLock(bool priorityTask)1496 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1497 {
1498 int32_t limit = queuedManualSyncLimit_;
1499 size_t totalTaskNum = taskQueue_.size();
1500 size_t commonTaskNum = GetCurrentCommonTaskNum();
1501 size_t queuedTaskNum = priorityTask ? totalTaskNum - commonTaskNum : commonTaskNum;
1502 if (queuedTaskNum >= static_cast<size_t>(limit)) {
1503 std::string priorityName = priorityTask ? CLOUD_PRIORITY_TASK_STRING : CLOUD_COMMON_TASK_STRING;
1504 LOGW("[CloudSyncer] too much %s sync task", priorityName.c_str());
1505 return -E_BUSY;
1506 }
1507 return E_OK;
1508 }
1509
PrepareSync(TaskId taskId)1510 int CloudSyncer::PrepareSync(TaskId taskId)
1511 {
1512 std::lock_guard<std::mutex> autoLock(dataLock_);
1513 if (hasKvRemoveTask) {
1514 return -E_CLOUD_ERROR;
1515 }
1516 if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1517 LOGW("[CloudSyncer] Abort sync because syncer is closed");
1518 return -E_DB_CLOSED;
1519 }
1520 if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1521 LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1522 return -E_DB_CLOSED;
1523 }
1524 currentContext_.currentTaskId = taskId;
1525 cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1526 cloudTaskInfos_[taskId].pause = false;
1527 cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1528 if (cloudTaskInfos_[taskId].resume) {
1529 auto tempLocker = currentContext_.locker;
1530 currentContext_ = resumeTaskInfos_[taskId].context;
1531 currentContext_.locker = tempLocker;
1532 } else {
1533 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1534 currentContext_.strategy =
1535 StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1536 currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1537 cloudTaskInfos_[taskId].users);
1538 currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1539 }
1540 LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64 " priority[%d] compensated[%d] logicDelete[%d]",
1541 cloudTaskInfos_[taskId].storeId.c_str(), taskId, static_cast<int>(cloudTaskInfos_[taskId].priorityTask),
1542 static_cast<int>(cloudTaskInfos_[taskId].compensatedTask),
1543 static_cast<int>(storageProxy_->IsCurrentLogicDelete()));
1544 return E_OK;
1545 }
1546
LockCloud(TaskId taskId)1547 int CloudSyncer::LockCloud(TaskId taskId)
1548 {
1549 int period;
1550 {
1551 auto res = cloudDB_.Lock();
1552 if (res.first != E_OK) {
1553 return res.first;
1554 }
1555 period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1556 }
1557 int errCode = StartHeartBeatTimer(period, taskId);
1558 if (errCode != E_OK) {
1559 UnlockCloud();
1560 }
1561 return errCode;
1562 }
1563
UnlockCloud()1564 int CloudSyncer::UnlockCloud()
1565 {
1566 FinishHeartBeatTimer();
1567 return cloudDB_.UnLock();
1568 }
1569
StartHeartBeatTimer(int period,TaskId taskId)1570 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1571 {
1572 if (timerId_ != 0u) {
1573 LOGW("[CloudSyncer] HeartBeat timer has been start!");
1574 return E_OK;
1575 }
1576 TimerId timerId = 0;
1577 int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1578 HeartBeat(timerId, taskId);
1579 return E_OK;
1580 }, nullptr, timerId);
1581 if (errCode != E_OK) {
1582 LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1583 return errCode;
1584 }
1585 timerId_ = timerId;
1586 return E_OK;
1587 }
1588
FinishHeartBeatTimer()1589 void CloudSyncer::FinishHeartBeatTimer()
1590 {
1591 if (timerId_ == 0u) {
1592 return;
1593 }
1594 RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1595 timerId_ = 0u;
1596 LOGD("[CloudSyncer] Finish heartbeat timer ok");
1597 }
1598
HeartBeat(TimerId timerId,TaskId taskId)1599 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1600 {
1601 if (timerId_ != timerId) {
1602 return;
1603 }
1604 IncObjRef(this);
1605 {
1606 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1607 heartbeatCount_[taskId]++;
1608 }
1609 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1610 {
1611 std::lock_guard<std::mutex> guard(dataLock_);
1612 if (currentContext_.currentTaskId != taskId) {
1613 heartbeatCount_.erase(taskId);
1614 failedHeartbeatCount_.erase(taskId);
1615 DecObjRef(this);
1616 return;
1617 }
1618 }
1619 if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1620 // heartbeat block twice should finish task now
1621 SetTaskFailed(taskId, -E_CLOUD_ERROR);
1622 } else {
1623 int ret = cloudDB_.HeartBeat();
1624 if (ret != E_OK) {
1625 HeartBeatFailed(taskId, ret);
1626 } else {
1627 failedHeartbeatCount_[taskId] = 0;
1628 }
1629 }
1630 {
1631 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1632 heartbeatCount_[taskId]--;
1633 if (currentContext_.currentTaskId != taskId) {
1634 heartbeatCount_.erase(taskId);
1635 failedHeartbeatCount_.erase(taskId);
1636 }
1637 }
1638 DecObjRef(this);
1639 });
1640 if (errCode != E_OK) {
1641 LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1642 DecObjRef(this);
1643 }
1644 }
1645
HeartBeatFailed(TaskId taskId,int errCode)1646 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1647 {
1648 failedHeartbeatCount_[taskId]++;
1649 if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1650 return;
1651 }
1652 LOGW("[CloudSyncer] heartbeat failed too much times!");
1653 FinishHeartBeatTimer();
1654 SetTaskFailed(taskId, errCode);
1655 }
1656
SetTaskFailed(TaskId taskId,int errCode)1657 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1658 {
1659 std::lock_guard<std::mutex> autoLock(dataLock_);
1660 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1661 return;
1662 }
1663 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1664 return;
1665 }
1666 cloudTaskInfos_[taskId].errCode = errCode;
1667 }
1668
GetCloudSyncTaskCount()1669 int32_t CloudSyncer::GetCloudSyncTaskCount()
1670 {
1671 std::lock_guard<std::mutex> autoLock(dataLock_);
1672 return static_cast<int32_t>(taskQueue_.size());
1673 }
1674
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1675 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1676 const RelationalSchemaObject &localSchema)
1677 {
1678 std::lock_guard<std::mutex> lock(syncMutex_);
1679 int index = 1;
1680 for (const auto &tableName: tableNameList) {
1681 LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1682 int ret = storageProxy_->CleanWaterMark(tableName);
1683 if (ret != E_OK) {
1684 LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1685 return ret;
1686 }
1687 index++;
1688 }
1689 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1690 if (errCode != E_OK) {
1691 LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1692 return errCode;
1693 }
1694
1695 std::vector<Asset> assets;
1696 errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1697 if (errCode != E_OK) {
1698 LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1699 storageProxy_->Rollback();
1700 return errCode;
1701 }
1702 LOGI("[CloudSyncer] Clean cloud data success, mode=%d", mode);
1703 if (!assets.empty() && mode == FLAG_AND_DATA) {
1704 errCode = cloudDB_.RemoveLocalAssets(assets);
1705 if (errCode != E_OK) {
1706 LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1707 storageProxy_->Rollback();
1708 return errCode;
1709 }
1710 LOGI("[CloudSyncer] Remove local asset success, size=%zu", assets.size());
1711 }
1712
1713 storageProxy_->Commit();
1714 return errCode;
1715 }
1716
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1717 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1718 {
1719 std::lock_guard<std::mutex> lock(syncMutex_);
1720 for (const auto &tableName: tableNameList) {
1721 int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1722 if (ret != E_OK) {
1723 LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1724 return ret;
1725 }
1726 }
1727 return E_OK;
1728 }
1729
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1730 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m)
1731 {
1732 LOGI("[CloudSyncer] save cloud water [%s] of table [%s length[%u]]", param.cloudWaterMark.c_str(),
1733 DBCommon::StringMiddleMasking(param.tableName).c_str(), param.tableName.length());
1734 {
1735 std::lock_guard<std::mutex> autoLock(dataLock_);
1736 if (cloudTaskInfos_[taskId].isAssetsOnly) {
1737 LOGI("[CloudSyncer] assets only task not save water mark.");
1738 return;
1739 }
1740 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1741 }
1742 }
1743
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1744 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1745 DownloadCommitList &commitList, int errCode)
1746 {
1747 if (commitList.empty()) {
1748 return E_OK;
1749 }
1750 uint32_t successCount = 0u;
1751 int ret = E_OK;
1752 if (info.isAsyncDownload) {
1753 ret = CommitDownloadAssetsForAsyncDownload(downloadItem, info, commitList, successCount);
1754 } else {
1755 ret = HandleDownloadResult(downloadItem, info, commitList, successCount);
1756 }
1757 if (errCode == E_OK) {
1758 info.downLoadInfo.failCount += (commitList.size() - successCount);
1759 info.downLoadInfo.successCount -= (commitList.size() - successCount);
1760 }
1761 if (ret != E_OK) {
1762 LOGE("Commit download result failed.%d", ret);
1763 }
1764 commitList.clear();
1765 return ret;
1766 }
1767
GetIdentify() const1768 std::string CloudSyncer::GetIdentify() const
1769 {
1770 return id_;
1771 }
1772
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1773 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult)
1774 {
1775 strategyOpResult = OpType::NOT_HANDLE;
1776 // ignore same record with local generate data
1777 if (dataInfo.localInfo.logInfo.device.empty() &&
1778 !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1779 // not handle same data
1780 return E_OK;
1781 }
1782 {
1783 std::lock_guard<std::mutex> autoLock(dataLock_);
1784 if (!currentContext_.strategy) {
1785 LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1786 return -E_INTERNAL_ERROR;
1787 }
1788 bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1789 dataInfo.cloudLogInfo, policy_);
1790 strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1791 dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1792 }
1793 if (strategyOpResult == OpType::DELETE) {
1794 param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1795 }
1796 return E_OK;
1797 }
1798
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1799 int CloudSyncer::GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo,
1800 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1801 {
1802 int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index], true,
1803 logInfo, localAssetInfo);
1804 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1805 return errCode;
1806 }
1807 std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1808 if (hashKey.empty()) {
1809 return errCode;
1810 }
1811
1812 int ret = CheckLocalQueryAssetsOnlyIfNeed(localAssetInfo, param, logInfo);
1813 if (ret != E_OK) {
1814 LOGE("[CloudSyncer] check local assets failed, error code: %d", ret);
1815 return ret;
1816 }
1817
1818 param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1819 param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1820 if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1821 LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1822 DBCommon::TransferStringToHex(hashKey).c_str());
1823 logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1824 logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1825 logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1826 logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1827 logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1828 logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1829 logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1830 // delete record should remove local asset info
1831 if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1832 localAssetInfo.clear();
1833 }
1834 errCode = E_OK;
1835 }
1836 logInfo.logInfo.isNeedUpdateAsset = CloudSyncUtils::IsNeedUpdateAsset(localAssetInfo);
1837 return errCode;
1838 }
1839
GetNextTaskId()1840 TaskId CloudSyncer::GetNextTaskId()
1841 {
1842 std::lock_guard<std::mutex> autoLock(dataLock_);
1843 if (!taskQueue_.empty()) {
1844 return taskQueue_.begin()->second;
1845 }
1846 return INVALID_TASK_ID;
1847 }
1848
MarkCurrentTaskPausedIfNeed(const CloudTaskInfo & taskInfo)1849 void CloudSyncer::MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo)
1850 {
1851 // must sure have dataLock_ before call this function.
1852 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1853 return;
1854 }
1855 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1856 return;
1857 }
1858 if (cloudTaskInfos_[currentContext_.currentTaskId].priorityLevel < taskInfo.priorityLevel) {
1859 cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1860 LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1861 }
1862 }
1863
SetCurrentTaskFailedWithoutLock(int errCode)1864 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1865 {
1866 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1867 return;
1868 }
1869 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1870 }
1871
LockCloudIfNeed(TaskId taskId)1872 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1873 {
1874 {
1875 std::lock_guard<std::mutex> autoLock(dataLock_);
1876 if (currentContext_.locker != nullptr) {
1877 LOGD("[CloudSyncer] lock exist");
1878 return E_OK;
1879 }
1880 }
1881 std::shared_ptr<CloudLocker> locker = nullptr;
1882 int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1883 return LockCloud(taskId);
1884 }, [this]() {
1885 int unlockCode = UnlockCloud();
1886 if (unlockCode != E_OK) {
1887 SetCurrentTaskFailedWithoutLock(unlockCode);
1888 }
1889 }, locker);
1890 {
1891 std::lock_guard<std::mutex> autoLock(dataLock_);
1892 currentContext_.locker = locker;
1893 }
1894 return errCode;
1895 }
1896
UnlockIfNeed()1897 void CloudSyncer::UnlockIfNeed()
1898 {
1899 std::shared_ptr<CloudLocker> cacheLocker;
1900 {
1901 std::lock_guard<std::mutex> autoLock(dataLock_);
1902 cacheLocker = currentContext_.locker;
1903 currentContext_.locker = nullptr;
1904 }
1905 // unlock without mutex
1906 cacheLocker = nullptr;
1907 }
1908
ClearCurrentContextWithoutLock()1909 void CloudSyncer::ClearCurrentContextWithoutLock()
1910 {
1911 heartbeatCount_.erase(currentContext_.currentTaskId);
1912 failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1913 currentContext_.currentTaskId = INVALID_TASK_ID;
1914 currentContext_.notifier = nullptr;
1915 currentContext_.strategy = nullptr;
1916 currentContext_.processRecorder = nullptr;
1917 currentContext_.tableName.clear();
1918 currentContext_.assetDownloadList.clear();
1919 currentContext_.assetFields.clear();
1920 currentContext_.assetsInfo.clear();
1921 currentContext_.cloudWaterMarks.clear();
1922 currentContext_.isNeedUpload = false;
1923 currentContext_.currentUserIndex = 0;
1924 currentContext_.repeatCount = 0;
1925 }
1926
IsAlreadyHaveCompensatedSyncTask()1927 bool CloudSyncer::IsAlreadyHaveCompensatedSyncTask()
1928 {
1929 std::lock_guard<std::mutex> autoLock(dataLock_);
1930 for (const auto &item : cloudTaskInfos_) {
1931 const auto &taskInfo = item.second;
1932 if (taskInfo.compensatedTask) {
1933 return true;
1934 }
1935 }
1936 return false;
1937 }
1938
ClearContextAndNotify(TaskId taskId,int errCode)1939 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1940 {
1941 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1942 CloudTaskInfo info;
1943 {
1944 // clear current context
1945 std::lock_guard<std::mutex> autoLock(dataLock_);
1946 notifier = currentContext_.notifier;
1947 ClearCurrentContextWithoutLock();
1948 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1949 LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1950 contextCv_.notify_all();
1951 return;
1952 }
1953 info = std::move(cloudTaskInfos_[taskId]);
1954 cloudTaskInfos_.erase(taskId);
1955 resumeTaskInfos_.erase(taskId);
1956 }
1957 int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1958 if (err != E_OK) {
1959 // if clear unlocking failed, no return to avoid affecting the entire process
1960 LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1961 }
1962 contextCv_.notify_all();
1963 if (info.errCode == E_OK) {
1964 info.errCode = errCode;
1965 }
1966 LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1967 info.errCode);
1968 info.status = ProcessStatus::FINISHED;
1969 if (notifier != nullptr) {
1970 notifier->UpdateAllTablesFinally();
1971 notifier->NotifyProcess(info, {}, true);
1972 }
1973 // generate compensated sync
1974 // if already have compensated sync task in queue, no need to generate new compensated sync task
1975 if (!info.compensatedTask && !IsAlreadyHaveCompensatedSyncTask()) {
1976 CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1977 GenerateCompensatedSync(taskInfo);
1978 }
1979 }
1980
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1981 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1982 {
1983 int ret = CheckTaskIdValid(taskId);
1984 if (ret != E_OK) {
1985 return ret;
1986 }
1987 ret = DownloadDataFromCloud(taskId, param, isFirstDownload);
1988 if (ret != E_OK) {
1989 return ret;
1990 }
1991 bool downloadAssetOnly = false;
1992 if (param.downloadData.data.empty()) {
1993 if (param.assetsDownloadList.empty()) {
1994 return E_OK;
1995 }
1996 LOGI("[CloudSyncer] Resume task need download assets");
1997 downloadAssetOnly = true;
1998 }
1999 // Save data in transaction, update cloud water mark, notify process and changed data
2000 ret = SaveDataNotifyProcess(taskId, param, downloadAssetOnly);
2001 if (ret == -E_TASK_PAUSED) {
2002 return ret;
2003 }
2004 if (ret != E_OK) {
2005 std::lock_guard<std::mutex> autoLock(dataLock_);
2006 param.info.tableStatus = ProcessStatus::FINISHED;
2007 currentContext_.notifier->UpdateProcess(param.info);
2008 return ret;
2009 }
2010 (void)NotifyInDownload(taskId, param, isFirstDownload);
2011 return SaveCloudWaterMark(param.tableName, taskId);
2012 }
2013
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)2014 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
2015 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
2016 {
2017 CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
2018 if (downloadItem.assets.empty()) { // Download data (include deleting)
2019 return E_OK;
2020 }
2021 bool isSharedTable = false;
2022 int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
2023 if (errorCode != E_OK) {
2024 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
2025 return errorCode;
2026 }
2027 if (!isSharedTable) {
2028 errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
2029 if (errorCode == -E_NOT_SET) {
2030 return -E_NOT_SET;
2031 }
2032 } else {
2033 // share table will not download asset, need to reset the status
2034 for (auto &entry: downloadItem.assets) {
2035 for (auto &asset: entry.second) {
2036 asset.status = AssetStatus::NORMAL;
2037 }
2038 }
2039 }
2040 ModifyDownLoadInfoCount(errorCode, info);
2041 if (!downloadItem.assets.empty()) {
2042 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
2043 if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
2044 LOGW("[CloudSyncer] [DownloadOneAssetRecord] strategy is invalid.");
2045 } else {
2046 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
2047 downloadItem.primaryKeyValList);
2048 }
2049 } else if (downloadItem.strategy == OpType::INSERT) {
2050 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2051 }
2052 }
2053 return errorCode;
2054 }
2055
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2056 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m)
2057 {
2058 int ret = E_OK;
2059 if (IsCurrentTableResume(taskId, false)) {
2060 std::lock_guard<std::mutex> autoLock(dataLock_);
2061 if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2062 param = resumeTaskInfos_[taskId].syncParam;
2063 resumeTaskInfos_[taskId].syncParam = {};
2064 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2065 if (ret != E_OK) {
2066 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2067 }
2068 LOGD("[CloudSyncer] Get sync param from cache");
2069 return E_OK;
2070 }
2071 }
2072 ret = GetCurrentTableName(param.tableName);
2073 if (ret != E_OK) {
2074 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2075 return ret;
2076 }
2077 param.info.tableName = param.tableName;
2078 std::vector<Field> assetFields;
2079 // only no primary key and composite primary key contains rowid.
2080 ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2081 if (ret != E_OK) {
2082 LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2083 return ret;
2084 }
2085 {
2086 std::lock_guard<std::mutex> autoLock(dataLock_);
2087 currentContext_.assetFields[currentContext_.tableName] = assetFields;
2088 }
2089 param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2090 if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId))) {
2091 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2092 if (ret != E_OK) {
2093 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2094 }
2095 if (!IsCurrentTaskResume(taskId)) {
2096 ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2097 }
2098 }
2099 currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2100 auto queryObject = GetQuerySyncObject(param.tableName);
2101 param.isAssetsOnly = queryObject.IsAssetsOnly();
2102 param.groupNum = queryObject.GetGroupNum();
2103 param.assetsGroupMap = queryObject.GetAssetsOnlyGroupMap();
2104 param.isVaildForAssetsOnly = queryObject.AssetsOnlyErrFlag() == E_OK;
2105 param.isForcePullAseets = IsModeForcePull(taskId) && queryObject.IsContainQueryNodes();
2106 return ret;
2107 }
2108
IsCurrentTaskResume(TaskId taskId)2109 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2110 {
2111 std::lock_guard<std::mutex> autoLock(dataLock_);
2112 return cloudTaskInfos_[taskId].resume;
2113 }
2114
IsCurrentTableResume(TaskId taskId,bool upload)2115 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2116 {
2117 std::lock_guard<std::mutex> autoLock(dataLock_);
2118 if (!cloudTaskInfos_[taskId].resume) {
2119 return false;
2120 }
2121 if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2122 return false;
2123 }
2124 return upload == resumeTaskInfos_[taskId].upload;
2125 }
2126
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool isFirstDownload)2127 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
2128 {
2129 // Get cloud data after cloud water mark
2130 param.info.tableStatus = ProcessStatus::PROCESSING;
2131 param.downloadData = {};
2132 if (param.isAssetsOnly) {
2133 param.cloudWaterMarkForAssetsOnly = param.cloudWaterMark;
2134 }
2135 int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2136 CloudSyncUtils::CheckQueryCloudData(cloudTaskInfos_[taskId].prepareTraceId, param.downloadData, param.pkColNames);
2137 if (ret == -E_QUERY_END) {
2138 // Won't break here since downloadData may not be null
2139 param.isLastBatch = true;
2140 } else if (ret != E_OK) {
2141 std::lock_guard<std::mutex> autoLock(dataLock_);
2142 param.info.tableStatus = ProcessStatus::FINISHED;
2143 currentContext_.notifier->UpdateProcess(param.info);
2144 return ret;
2145 }
2146
2147 ret = CheckCloudQueryAssetsOnlyIfNeed(taskId, param);
2148 if (ret != E_OK) {
2149 LOGE("[CloudSyncer] query assets failed, error code: %d", ret);
2150 std::lock_guard<std::mutex> autoLock(dataLock_);
2151 param.info.tableStatus = ProcessStatus::FINISHED;
2152 currentContext_.notifier->UpdateProcess(param.info);
2153 return ret;
2154 }
2155
2156 if (param.downloadData.data.empty()) {
2157 if (ret == E_OK || isFirstDownload) {
2158 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2159 UpdateCloudWaterMark(taskId, param);
2160 // Cloud water may change on the cloud, it needs to be saved here
2161 SaveCloudWaterMark(param.tableName, taskId);
2162 }
2163 if (isFirstDownload) {
2164 NotifyInEmptyDownload(taskId, param.info);
2165 }
2166 }
2167 return E_OK;
2168 }
2169
ModifyDownLoadInfoCount(const int errorCode,InnerProcessInfo & info)2170 void CloudSyncer::ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info)
2171 {
2172 if (errorCode == E_OK) {
2173 return;
2174 }
2175 info.downLoadInfo.failCount += 1;
2176 if (info.downLoadInfo.successCount == 0) {
2177 LOGW("[CloudSyncer] Invalid successCount");
2178 } else {
2179 info.downLoadInfo.successCount -= 1;
2180 }
2181 }
2182
GetResumeWaterMark(TaskId taskId)2183 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2184 {
2185 std::lock_guard<std::mutex> autoLock(dataLock_);
2186 return resumeTaskInfos_[taskId].lastLocalWatermark;
2187 }
2188 } // namespace DistributedDB
2189