1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbGeneralStore"
16
17 #include "rdb_general_store.h"
18
19 #include <chrono>
20 #include <cinttypes>
21
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "eventcenter/event_center.h"
35 #include "log_print.h"
36 #include "metadata/meta_data_manager.h"
37 #include "metadata/secret_key_meta_data.h"
38 #include "rdb_cursor.h"
39 #include "rdb_helper.h"
40 #include "rdb_query.h"
41 #include "rdb_result_set_impl.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 namespace OHOS::DistributedRdb {
47 using namespace DistributedData;
48 using namespace DistributedDB;
49 using namespace NativeRdb;
50 using namespace CloudData;
51 using namespace std::chrono;
52 using DBField = DistributedDB::Field;
53 using DBTable = DistributedDB::TableSchema;
54 using DBSchema = DistributedDB::DataBaseSchema;
55 using ClearMode = DistributedDB::ClearMode;
56 using DBStatus = DistributedDB::DBStatus;
57 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
58
59 constexpr const char *INSERT = "INSERT INTO ";
60 constexpr const char *REPLACE = "REPLACE INTO ";
61 constexpr const char *VALUES = " VALUES ";
62 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
63 constexpr const LockAction LOCK_ACTION = static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) |
64 static_cast<uint32_t>(LockAction::UPDATE) | static_cast<uint32_t>(LockAction::DELETE) |
65 static_cast<uint32_t>(LockAction::DOWNLOAD));
66 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
67 constexpr uint32_t SEARCHABLE_FLAG = 2;
68 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
69
GetDBSchema(const Database & database)70 static DBSchema GetDBSchema(const Database &database)
71 {
72 DBSchema schema;
73 schema.tables.resize(database.tables.size());
74 for (size_t i = 0; i < database.tables.size(); i++) {
75 const Table &table = database.tables[i];
76 DBTable &dbTable = schema.tables[i];
77 dbTable.name = table.name;
78 dbTable.sharedTableName = table.sharedTableName;
79 for (auto &field : table.fields) {
80 DBField dbField;
81 dbField.colName = field.colName;
82 dbField.type = field.type;
83 dbField.primary = field.primary;
84 dbField.nullable = field.nullable;
85 dbTable.fields.push_back(std::move(dbField));
86 }
87 }
88 return schema;
89 }
90
RdbGeneralStore(const StoreMetaData & meta)91 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
92 : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
93 {
94 observer_.storeId_ = meta.storeId;
95 observer_.meta_ = meta;
96 RelationalStoreDelegate::Option option;
97 option.observer = &observer_;
98 if (meta.isEncrypt) {
99 std::string key = meta.GetSecretKey();
100 SecretKeyMetaData secretKeyMeta;
101 MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
102 std::vector<uint8_t> decryptKey;
103 CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
104 option.passwd.SetValue(decryptKey.data(), decryptKey.size());
105 std::fill(decryptKey.begin(), decryptKey.end(), 0);
106 option.isEncryptedDb = meta.isEncrypt;
107 option.cipher = CipherType::AES_256_GCM;
108 for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
109 option.iterateTimes = ITERS[i];
110 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
111 if (ret == DBStatus::OK && delegate_ != nullptr) {
112 break;
113 }
114 manager_.CloseStore(delegate_);
115 delegate_ = nullptr;
116 }
117 } else {
118 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
119 if (ret != DBStatus::OK || delegate_ == nullptr) {
120 manager_.CloseStore(delegate_);
121 delegate_ = nullptr;
122 }
123 }
124
125 storeInfo_.tokenId = meta.tokenId;
126 storeInfo_.bundleName = meta.bundleName;
127 storeInfo_.storeName = meta.storeId;
128 storeInfo_.instanceId = meta.instanceId;
129 storeInfo_.user = std::stoi(meta.user);
130 storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
131 if (meta.isSearchable) {
132 syncNotifyFlag_ |= SEARCHABLE_FLAG;
133 }
134
135 if (delegate_ != nullptr && meta.isManualClean) {
136 PragmaData data =
137 static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
138 delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
139 }
140 }
141
~RdbGeneralStore()142 RdbGeneralStore::~RdbGeneralStore()
143 {
144 manager_.CloseStore(delegate_);
145 delegate_ = nullptr;
146 bindInfo_.loader_ = nullptr;
147 if (bindInfo_.db_ != nullptr) {
148 bindInfo_.db_->Close();
149 }
150 bindInfo_.db_ = nullptr;
151 rdbCloud_ = nullptr;
152 rdbLoader_ = nullptr;
153 RemoveTasks();
154 tasks_ = nullptr;
155 executor_ = nullptr;
156 }
157
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)158 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
159 {
160 if (snapshots_.bindAssets == nullptr) {
161 snapshots_.bindAssets = bindAssets;
162 }
163 return GenErr::E_OK;
164 }
165
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)166 int32_t RdbGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
167 const CloudConfig &config)
168 {
169 if (bindInfos.empty()) {
170 return GeneralError::E_OK;
171 }
172 auto bindInfo = bindInfos.begin()->second;
173 if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
174 return GeneralError::E_INVALID_ARGS;
175 }
176
177 if (isBound_.exchange(true)) {
178 return GeneralError::E_OK;
179 }
180
181 BindEvent::BindEventInfo eventInfo;
182 eventInfo.tokenId = storeInfo_.tokenId;
183 eventInfo.bundleName = storeInfo_.bundleName;
184 eventInfo.storeName = storeInfo_.storeName;
185 eventInfo.user = storeInfo_.user;
186 eventInfo.instanceId = storeInfo_.instanceId;
187
188 auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
189 EventCenter::GetInstance().PostEvent(std::move(evt));
190 bindInfo_ = std::move(bindInfo);
191 {
192 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
193 rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
194 rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
195 }
196
197 DistributedDB::CloudSyncConfig dbConfig;
198 dbConfig.maxUploadCount = config.maxNumber;
199 dbConfig.maxUploadSize = config.maxSize;
200 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
201 DBSchema schema = GetDBSchema(database);
202 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
203 if (delegate_ == nullptr) {
204 ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
205 return GeneralError::E_ALREADY_CLOSED;
206 }
207 delegate_->SetCloudDB(rdbCloud_);
208 delegate_->SetIAssetLoader(rdbLoader_);
209 delegate_->SetCloudDbSchema(std::move(schema));
210 delegate_->SetCloudSyncConfig(dbConfig);
211
212 syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
213 return GeneralError::E_OK;
214 }
215
IsBound()216 bool RdbGeneralStore::IsBound()
217 {
218 return isBound_;
219 }
220
Close(bool isForce)221 int32_t RdbGeneralStore::Close(bool isForce)
222 {
223 {
224 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
225 if (!lock) {
226 return GeneralError::E_BUSY;
227 }
228
229 if (delegate_ == nullptr) {
230 return GeneralError::E_OK;
231 }
232 if (!isForce && delegate_->GetCloudSyncTaskCount() > 0) {
233 return GeneralError::E_BUSY;
234 }
235 if (isForce && bindInfo_.loader_ != nullptr) {
236 bindInfo_.loader_->Cancel();
237 }
238 auto status = manager_.CloseStore(delegate_);
239 if (status != DBStatus::OK) {
240 return status;
241 }
242 delegate_ = nullptr;
243 }
244 RemoveTasks();
245 bindInfo_.loader_ = nullptr;
246 if (bindInfo_.db_ != nullptr) {
247 bindInfo_.db_->Close();
248 }
249 bindInfo_.db_ = nullptr;
250 {
251 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
252 rdbCloud_ = nullptr;
253 rdbLoader_ = nullptr;
254 }
255 return GeneralError::E_OK;
256 }
257
Execute(const std::string & table,const std::string & sql)258 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
259 {
260 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
261 if (delegate_ == nullptr) {
262 ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
263 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
264 Anonymous::Change(sql).c_str());
265 return GeneralError::E_ERROR;
266 }
267 std::vector<DistributedDB::VBucket> changedData;
268 auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
269 if (status != DBStatus::OK) {
270 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
271 changedData.size());
272 return GeneralError::E_ERROR;
273 }
274 return GeneralError::E_OK;
275 }
276
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)277 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
278 {
279 strColumnSql += " (";
280 strRowValueSql += " (";
281 auto columnSize = value.size();
282 for (auto column = value.begin(); column != value.end(); ++column) {
283 strRowValueSql += " ?,";
284 strColumnSql += " " + column->first + ",";
285 }
286 if (columnSize != 0) {
287 strColumnSql.pop_back();
288 strColumnSql += ")";
289 strRowValueSql.pop_back();
290 strRowValueSql += ")";
291 }
292 return columnSize;
293 }
294
Insert(const std::string & table,VBuckets && values)295 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
296 {
297 if (table.empty() || values.size() == 0) {
298 ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
299 return GeneralError::E_INVALID_ARGS;
300 }
301
302 std::string strColumnSql;
303 std::string strRowValueSql;
304 auto value = values.front();
305 size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
306 if (columnSize == 0) {
307 ZLOGE("Insert: columnSize error, can't be 0!");
308 return GeneralError::E_INVALID_ARGS;
309 }
310
311 Values args;
312 std::string strValueSql;
313 for (auto &rowData : values) {
314 if (rowData.size() != columnSize) {
315 ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
316 return GeneralError::E_INVALID_ARGS;
317 }
318 for (auto column = rowData.begin(); column != rowData.end(); ++column) {
319 args.push_back(std::move(column->second));
320 }
321 strValueSql += strRowValueSql + ",";
322 }
323 strValueSql.pop_back();
324 std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
325
326 std::vector<DistributedDB::VBucket> changedData;
327 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
328 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
329 if (delegate_ == nullptr) {
330 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
331 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
332 return GeneralError::E_ERROR;
333 }
334 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
335 if (status != DBStatus::OK) {
336 if (IsPrintLog(status)) {
337 auto time =
338 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
339 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
340 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
341 }
342 return GeneralError::E_ERROR;
343 }
344 return GeneralError::E_OK;
345 }
346
IsPrintLog(DBStatus status)347 bool RdbGeneralStore::IsPrintLog(DBStatus status)
348 {
349 bool isPrint = false;
350 if (status == lastError_) {
351 if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
352 isPrint = true;
353 }
354 } else {
355 isPrint = true;
356 lastErrCnt_ = 0;
357 lastError_ = status;
358 }
359 return isPrint;
360 }
361
362 /**
363 * This function does not support batch updates in rdb, it only supports single data updates.
364 */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)365 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
366 const std::string &whereSql, Values &&conditions)
367 {
368 if (table.empty()) {
369 ZLOGE("Update: table can't be empty!");
370 return GeneralError::E_INVALID_ARGS;
371 }
372 if (setSql.empty() || values.size() == 0) {
373 ZLOGE("Update: setSql and values can't be empty!");
374 return GeneralError::E_INVALID_ARGS;
375 }
376 if (whereSql.empty() || conditions.size() == 0) {
377 ZLOGE("Update: whereSql and conditions can't be empty!");
378 return GeneralError::E_INVALID_ARGS;
379 }
380
381 std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
382 Values args;
383 for (auto &value : values) {
384 args.push_back(std::move(value));
385 }
386 for (auto &condition : conditions) {
387 args.push_back(std::move(condition));
388 }
389
390 std::vector<DistributedDB::VBucket> changedData;
391 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
392 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
393 if (delegate_ == nullptr) {
394 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
395 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
396 return GeneralError::E_ERROR;
397 }
398 auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
399 if (status != DBStatus::OK) {
400 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
401 changedData.size());
402 return GeneralError::E_ERROR;
403 }
404 return GeneralError::E_OK;
405 }
406
Replace(const std::string & table,VBucket && value)407 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
408 {
409 if (table.empty() || value.size() == 0) {
410 return GeneralError::E_INVALID_ARGS;
411 }
412 std::string columnSql;
413 std::string valueSql;
414 SqlConcatenate(value, columnSql, valueSql);
415 std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
416
417 Values args;
418 for (auto &[k, v] : value) {
419 args.emplace_back(std::move(v));
420 }
421 std::vector<DistributedDB::VBucket> changedData;
422 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
423 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
424 if (delegate_ == nullptr) {
425 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
426 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
427 return GeneralError::E_ERROR;
428 }
429 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
430 if (status != DBStatus::OK) {
431 ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
432 status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
433 return GeneralError::E_ERROR;
434 }
435 return GeneralError::E_OK;
436 }
437
Delete(const std::string & table,const std::string & sql,Values && args)438 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
439 {
440 return 0;
441 }
442
Query(const std::string & table,const std::string & sql,Values && args)443 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
444 const std::string &sql, Values &&args)
445 {
446 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
447 if (delegate_ == nullptr) {
448 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
449 return { GeneralError::E_ALREADY_CLOSED, nullptr };
450 }
451 auto [errCode, records] = QuerySql(sql, std::move(args));
452 return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
453 }
454
Query(const std::string & table,GenQuery & query)455 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
456 {
457 RdbQuery *rdbQuery = nullptr;
458 auto ret = query.QueryInterface(rdbQuery);
459 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
460 ZLOGE("not RdbQuery!");
461 return { GeneralError::E_INVALID_ARGS, nullptr };
462 }
463 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
464 if (delegate_ == nullptr) {
465 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
466 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
467 return { GeneralError::E_ALREADY_CLOSED, nullptr };
468 }
469 if (rdbQuery->IsRemoteQuery()) {
470 if (rdbQuery->GetDevices().size() != 1) {
471 ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
472 return { GeneralError::E_ERROR, nullptr };
473 }
474 auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
475 return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
476 }
477 return { GeneralError::E_ERROR, nullptr };
478 }
479
MergeMigratedData(const std::string & tableName,VBuckets && values)480 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
481 {
482 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
483 if (delegate_ == nullptr) {
484 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
485 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
486 return GeneralError::E_ERROR;
487 }
488
489 auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
490 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
491 }
492
CleanTrackerData(const std::string & tableName,int64_t cursor)493 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
494 {
495 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
496 if (delegate_ == nullptr) {
497 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
498 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
499 return GeneralError::E_ERROR;
500 }
501
502 auto status = delegate_->CleanTrackerData(tableName, cursor);
503 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
504 }
505
Sync(const Devices & devices,GenQuery & query,DetailAsync async,SyncParam & syncParam)506 int32_t RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParam)
507 {
508 DistributedDB::Query dbQuery;
509 RdbQuery *rdbQuery = nullptr;
510 bool isPriority = false;
511 auto ret = query.QueryInterface(rdbQuery);
512 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
513 dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
514 } else {
515 dbQuery = rdbQuery->GetQuery();
516 isPriority = rdbQuery->IsPriority();
517 }
518 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
519 auto dbMode = DistributedDB::SyncMode(syncMode);
520 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
521 if (delegate_ == nullptr) {
522 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
523 "wait:%{public}d", devices.size(),
524 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
525 return GeneralError::E_ALREADY_CLOSED;
526 }
527 auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
528 SyncId syncId = ++syncTaskId_;
529 auto dbStatus = DistributedDB::INVALID_ARGS;
530 if (syncMode < NEARBY_END) {
531 dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
532 } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
533 auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
534 if (executor_ != nullptr && tasks_ != nullptr) {
535 auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
536 tasks_->Insert(syncId, { id, callback });
537 }
538 dbStatus = delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait,
539 (isPriority || highMode == MANUAL_SYNC_MODE), syncParam.isCompensation, {},
540 highMode == AUTO_SYNC_MODE, LOCK_ACTION, syncParam.prepareTraceId },
541 tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
542 if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
543 return ConvertStatus(dbStatus);
544 }
545 tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
546 if (executor != nullptr) {
547 executor->Remove(task.taskId);
548 }
549 return false;
550 });
551 }
552 return ConvertStatus(dbStatus);
553 }
554
PreSharing(GenQuery & query)555 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
556 {
557 RdbQuery *rdbQuery = nullptr;
558 auto ret = query.QueryInterface(rdbQuery);
559 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
560 ZLOGE("not RdbQuery!");
561 return { GeneralError::E_INVALID_ARGS, nullptr };
562 }
563 auto tables = rdbQuery->GetTables();
564 auto statement = rdbQuery->GetStatement();
565 if (statement.empty() || tables.empty()) {
566 ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
567 return { GeneralError::E_INVALID_ARGS, nullptr };
568 }
569 std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
570 VBuckets values;
571 {
572 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
573 if (delegate_ == nullptr) {
574 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
575 return { GeneralError::E_ALREADY_CLOSED, nullptr };
576 }
577 auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
578 values = std::move(ret);
579 }
580 auto rdbCloud = GetRdbCloud();
581 if (rdbCloud == nullptr || values.empty()) {
582 ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
583 values.size());
584 return { GeneralError::E_CLOUD_DISABLED, nullptr };
585 }
586 VBuckets extends = ExtractExtend(values);
587 rdbCloud->PreSharing(*tables.begin(), extends);
588 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
589 ++value, ++extend) {
590 value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
591 value->erase(CLOUD_GID);
592 }
593 return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
594 }
595
ExtractExtend(VBuckets & values) const596 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
597 {
598 VBuckets extends(values.size());
599 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
600 ++value, ++extend) {
601 auto it = value->find(CLOUD_GID);
602 if (it == value->end()) {
603 continue;
604 }
605 auto gid = std::get_if<std::string>(&(it->second));
606 if (gid == nullptr || gid->empty()) {
607 continue;
608 }
609 extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
610 }
611 return extends;
612 }
613
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const614 std::string RdbGeneralStore::BuildSql(
615 const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
616 {
617 std::string sql = "select ";
618 sql.append(CLOUD_GID);
619 std::string sqlNode = "select rowid";
620 for (auto &column : columns) {
621 sql.append(", ").append(column);
622 sqlNode.append(", ").append(column);
623 }
624 sqlNode.append(" from ").append(table).append(statement);
625 auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
626 sql.append(" from ").append(logTable).append(", (").append(sqlNode);
627 sql.append(") where ").append(DATE_KEY).append(" = rowid");
628 return sql;
629 }
630
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)631 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
632 {
633 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
634 return GeneralError::E_INVALID_ARGS;
635 }
636 DBStatus status = DistributedDB::DB_ERROR;
637 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
638 if (delegate_ == nullptr) {
639 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
640 "tableName:%{public}s",
641 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
642 Anonymous::Change(tableName).c_str());
643 return GeneralError::E_ALREADY_CLOSED;
644 }
645 switch (mode) {
646 case CLOUD_INFO:
647 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
648 if (status == DistributedDB::OK) {
649 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
650 break;
651 }
652 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
653 break;
654 case CLOUD_DATA:
655 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
656 if (status == DistributedDB::OK) {
657 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
658 break;
659 }
660 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
661 break;
662 case NEARBY_DATA:
663 if (devices.empty()) {
664 status = delegate_->RemoveDeviceData();
665 break;
666 }
667 for (auto device : devices) {
668 status = delegate_->RemoveDeviceData(device, tableName);
669 }
670 break;
671 default:
672 return GeneralError::E_ERROR;
673 }
674 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
675 }
676
Watch(int32_t origin,Watcher & watcher)677 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
678 {
679 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
680 return GeneralError::E_INVALID_ARGS;
681 }
682
683 observer_.watcher_ = &watcher;
684 return GeneralError::E_OK;
685 }
686
Unwatch(int32_t origin,Watcher & watcher)687 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
688 {
689 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
690 return GeneralError::E_INVALID_ARGS;
691 }
692
693 observer_.watcher_ = nullptr;
694 return GeneralError::E_OK;
695 }
696
GetDBBriefCB(DetailAsync async)697 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
698 {
699 if (!async) {
700 return [](auto &) {};
701 }
702 return [async = std::move(async)](
703 const std::map<std::string, std::vector<TableStatus>> &result) {
704 DistributedData::GenDetails details;
705 for (auto &[key, tables] : result) {
706 auto &value = details[key];
707 value.progress = FINISHED;
708 value.code = GeneralError::E_OK;
709 for (auto &table : tables) {
710 if (table.status != DBStatus::OK) {
711 value.code = GeneralError::E_ERROR;
712 }
713 }
714 }
715 async(details);
716 };
717 }
718
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)719 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
720 uint32_t highMode)
721 {
722 std::shared_lock<std::shared_mutex> lock(asyncMutex_);
723 return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
724 rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
725 DistributedData::GenDetails details;
726 for (auto &[id, process] : processes) {
727 bool isDownload = false;
728 auto &detail = details[id];
729 detail.progress = process.process;
730 detail.code = ConvertStatus(process.errCode);
731 detail.dbCode = DB_ERR_OFFSET + process.errCode;
732 uint32_t totalCount = 0;
733 for (auto [key, value] : process.tableProcess) {
734 auto &table = detail.details[key];
735 table.upload.total = value.upLoadInfo.total;
736 table.upload.success = value.upLoadInfo.successCount;
737 table.upload.failed = value.upLoadInfo.failCount;
738 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
739 totalCount += table.upload.total;
740 isDownload = table.download.total > 0;
741 table.download.total = value.downLoadInfo.total;
742 table.download.success = value.downLoadInfo.successCount;
743 table.download.failed = value.downLoadInfo.failCount;
744 table.download.untreated = table.download.total - table.download.success - table.download.failed;
745 detail.changeCount = (process.process == FINISHED)
746 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
747 value.downLoadInfo.deleteCount
748 : 0;
749 totalCount += table.download.total;
750 }
751 if (process.process == FINISHED) {
752 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
753 } else {
754 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
755 }
756
757 if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
758 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
759 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
760 }
761 }
762 if (async) {
763 async(details);
764 }
765
766 if (highMode == AUTO_SYNC_MODE && autoAsync
767 && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
768 autoAsync(details);
769 }
770 };
771 }
772
Release()773 int32_t RdbGeneralStore::Release()
774 {
775 auto ref = 1;
776 {
777 std::lock_guard<decltype(mutex_)> lock(mutex_);
778 if (ref_ == 0) {
779 return 0;
780 }
781 ref = --ref_;
782 }
783 ZLOGD("ref:%{public}d", ref);
784 if (ref == 0) {
785 delete this;
786 }
787 return ref;
788 }
789
AddRef()790 int32_t RdbGeneralStore::AddRef()
791 {
792 std::lock_guard<decltype(mutex_)> lock(mutex_);
793 if (ref_ == 0) {
794 return 0;
795 }
796 return ++ref_;
797 }
798
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)799 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
800 const std::vector<Reference> &references)
801 {
802 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
803 if (delegate_ == nullptr) {
804 ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
805 Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
806 return GeneralError::E_ALREADY_CLOSED;
807 }
808 for (const auto &table : tables) {
809 ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
810 auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
811 if (dBStatus != DistributedDB::DBStatus::OK) {
812 ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
813 Anonymous::Change(table).c_str(), dBStatus);
814 return GeneralError::E_ERROR;
815 }
816 }
817 std::vector<DistributedDB::TableReferenceProperty> properties;
818 for (const auto &reference : references) {
819 properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
820 }
821 auto status = delegate_->SetReference(properties);
822 if (status != DistributedDB::DBStatus::OK) {
823 ZLOGE("distributed table set reference failed, err:%{public}d", status);
824 return GeneralError::E_ERROR;
825 }
826 return GeneralError::E_OK;
827 }
828
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)829 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
830 const std::string &extendColName, bool isForceUpgrade)
831 {
832 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
833 if (delegate_ == nullptr) {
834 ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
835 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
836 return GeneralError::E_ALREADY_CLOSED;
837 }
838 auto status = delegate_->SetTrackerTable({ tableName, extendColName, trackerColNames, isForceUpgrade });
839 if (status == DBStatus::WITH_INVENTORY_DATA) {
840 ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
841 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
842 return GeneralError::E_WITH_INVENTORY_DATA;
843 }
844 if (status != DBStatus::OK) {
845 ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
846 status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
847 return GeneralError::E_ERROR;
848 }
849 return GeneralError::E_OK;
850 }
851
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)852 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
853 const DistributedDB::RemoteCondition &remoteCondition)
854 {
855 std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
856 DistributedDB::DBStatus status =
857 delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
858 if (status != DistributedDB::DBStatus::OK) {
859 ZLOGE("DistributedDB remote query failed, device:%{public}s, status is %{public}d.",
860 Anonymous::Change(device).c_str(), status);
861 return nullptr;
862 }
863 return std::make_shared<RdbCursor>(dbResultSet);
864 }
865
ConvertStatus(DistributedDB::DBStatus status)866 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
867 {
868 switch (status) {
869 case DBStatus::OK:
870 return GenErr::E_OK;
871 case DBStatus::CLOUD_NETWORK_ERROR:
872 return GenErr::E_NETWORK_ERROR;
873 case DBStatus::CLOUD_LOCK_ERROR:
874 return GenErr::E_LOCKED_BY_OTHERS;
875 case DBStatus::CLOUD_FULL_RECORDS:
876 return GenErr::E_RECODE_LIMIT_EXCEEDED;
877 case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
878 return GenErr::E_NO_SPACE_FOR_ASSET;
879 case DBStatus::BUSY:
880 return GenErr::E_BUSY;
881 case DBStatus::CLOUD_SYNC_TASK_MERGED:
882 return GenErr::E_SYNC_TASK_MERGED;
883 default:
884 ZLOGI("status:0x%{public}x", status);
885 break;
886 }
887 return GenErr::E_ERROR;
888 }
889
IsValid()890 bool RdbGeneralStore::IsValid()
891 {
892 return delegate_ != nullptr;
893 }
894
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)895 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
896 {
897 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
898 async_ = std::move(async);
899 return GenErr::E_OK;
900 }
901
UnregisterDetailProgressObserver()902 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
903 {
904 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
905 async_ = nullptr;
906 return GenErr::E_OK;
907 }
908
QuerySql(const std::string & sql,Values && args)909 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
910 {
911 std::vector<DistributedDB::VBucket> changedData;
912 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
913 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
914 if (status != DBStatus::OK) {
915 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
916 changedData.size());
917 return { GenErr::E_ERROR, {} };
918 }
919 return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
920 }
921
GetWaterVersion(const std::string & deviceId)922 std::vector<std::string> RdbGeneralStore::GetWaterVersion(const std::string &deviceId)
923 {
924 return {};
925 }
926
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)927 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
928 uint32_t syncCount)
929 {
930 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
931 if (requiredFlag != (requiredFlag & flag)) {
932 return;
933 }
934 StoreInfo info = storeInfo;
935 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
936 traceId, syncCount);
937 EventCenter::GetInstance().PostEvent(std::move(evt));
938 }
939
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)940 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
941 {
942 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
943 if (requiredFlag != (requiredFlag & flag)) {
944 return;
945 }
946 StoreInfo info = storeInfo;
947 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
948 traceId);
949 EventCenter::GetInstance().PostEvent(std::move(evt));
950 }
951
GetTables()952 std::set<std::string> RdbGeneralStore::GetTables()
953 {
954 std::set<std::string> tables;
955 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
956 if (delegate_ == nullptr) {
957 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
958 return tables;
959 }
960 auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
961 if (errCode != GenErr::E_OK) {
962 return tables;
963 }
964 for (auto &table : res) {
965 auto it = table.find("name");
966 if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
967 ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
968 continue;
969 }
970 tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
971 }
972 return tables;
973 }
974
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)975 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
976 const std::set<std::string> &localTables)
977 {
978 std::vector<std::string> res;
979 for (auto &it : syncTables) {
980 if (localTables.count(it)) {
981 res.push_back(std::move(it));
982 }
983 }
984 return res;
985 }
986
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)987 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
988 const std::vector<std::string> &tables, ChangeType type)
989 {
990 RemoteChangeEvent::DataInfo info;
991 info.userId = meta.user;
992 info.storeId = meta.storeId;
993 info.deviceId = meta.deviceId;
994 info.bundleName = meta.bundleName;
995 info.tables = tables;
996 info.changeType = type;
997 auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
998 EventCenter::GetInstance().PostEvent(std::move(evt));
999 }
1000
OnChange(const DBChangedIF & data)1001 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1002 {
1003 if (!HasWatcher()) {
1004 return;
1005 }
1006 std::string device = data.GetDataChangeDevice();
1007 auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1008 ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1009 Anonymous::Change(device).c_str());
1010 GenOrigin genOrigin;
1011 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1012 genOrigin.dataType = GenOrigin::BASIC_DATA;
1013 DistributedDB::StoreProperty property;
1014 data.GetStoreProperty(property);
1015 genOrigin.id.push_back(networkId);
1016 genOrigin.store = storeId_;
1017 GeneralWatcher::ChangeInfo changeInfo{};
1018 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1019 return;
1020 }
1021
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1022 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1023 {
1024 if (!HasWatcher()) {
1025 return;
1026 }
1027 ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1028 Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1029 GenOrigin genOrigin;
1030 genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1031 ? GenOrigin::ORIGIN_LOCAL
1032 : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1033 genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1034 genOrigin.id.push_back(originalId);
1035 genOrigin.store = storeId_;
1036 Watcher::PRIFields fields;
1037 Watcher::ChangeInfo changeInfo;
1038 bool notifyFlag = false;
1039 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1040 auto &info = changeInfo[data.tableName][i];
1041 for (auto &priData : data.primaryData[i]) {
1042 Watcher::PRIValue value;
1043 Convert(std::move(*(priData.begin())), value);
1044 if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1045 info.push_back(std::move(value));
1046 continue;
1047 }
1048 auto deleteKey = std::get_if<std::string>(&value);
1049 if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1050 // notify to start app
1051 notifyFlag = true;
1052 }
1053 info.push_back(std::move(value));
1054 }
1055 }
1056 if (notifyFlag) {
1057 PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1058 }
1059 if (!data.field.empty()) {
1060 fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1061 }
1062 watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1063 }
1064
LockCloudDB()1065 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1066 {
1067 auto rdbCloud = GetRdbCloud();
1068 if (rdbCloud == nullptr) {
1069 return { GeneralError::E_ERROR, 0 };
1070 }
1071 return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1072 }
1073
UnLockCloudDB()1074 int32_t RdbGeneralStore::UnLockCloudDB()
1075 {
1076 auto rdbCloud = GetRdbCloud();
1077 if (rdbCloud == nullptr) {
1078 return GeneralError::E_ERROR;
1079 }
1080 return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1081 }
1082
GetRdbCloud() const1083 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1084 {
1085 std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1086 return rdbCloud_;
1087 }
1088
IsFinished(SyncId syncId) const1089 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1090 {
1091 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1092 if (delegate_ == nullptr) {
1093 ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1094 return true;
1095 }
1096 return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1097 }
1098
GetFinishTask(SyncId syncId)1099 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1100 {
1101 return [this, executor = executor_, task = tasks_, syncId]() {
1102 auto [exist, finishTask] = task->Find(syncId);
1103 if (!exist || finishTask.cb == nullptr) {
1104 task->Erase(syncId);
1105 return;
1106 }
1107 if (!IsFinished(syncId)) {
1108 task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1109 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1110 return true;
1111 });
1112 return;
1113 }
1114 DBProcessCB cb;
1115 task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1116 cb = task.cb;
1117 return false;
1118 });
1119 if (cb != nullptr) {
1120 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1121 Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1122 std::map<std::string, SyncProcess> result;
1123 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1124 cb(result);
1125 }
1126 };
1127 }
1128
SetExecutor(std::shared_ptr<Executor> executor)1129 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1130 {
1131 if (executor_ == nullptr) {
1132 executor_ = executor;
1133 }
1134 }
1135
RemoveTasks()1136 void RdbGeneralStore::RemoveTasks()
1137 {
1138 if (tasks_ == nullptr) {
1139 return;
1140 }
1141 std::list<DBProcessCB> cbs;
1142 std::list<TaskId> taskIds;
1143 tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1144 if (task.cb != nullptr) {
1145 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1146 syncId);
1147 }
1148 cbs.push_back(std::move(task.cb));
1149 taskIds.push_back(task.taskId);
1150 return true;
1151 });
1152 if (executor_ != nullptr) {
1153 for (auto taskId : taskIds) {
1154 executor_->Remove(taskId, true);
1155 }
1156 }
1157 std::map<std::string, SyncProcess> result;
1158 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1159 for (auto &cb : cbs) {
1160 if (cb != nullptr) {
1161 cb(result);
1162 }
1163 }
1164 }
1165
GetCB(SyncId syncId)1166 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1167 {
1168 return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1169 if (task == nullptr) {
1170 return;
1171 }
1172 DBProcessCB cb;
1173 task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1174 cb = finishTask.cb;
1175 bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1176 if (isFinished) {
1177 finishTask.cb = nullptr;
1178 }
1179 return true;
1180 });
1181 if (cb != nullptr) {
1182 cb(progress);
1183 }
1184 return;
1185 };
1186 }
1187 } // namespace OHOS::DistributedRdb