1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbGeneralStore"
16
17 #include "rdb_general_store.h"
18
19 #include <chrono>
20 #include <cinttypes>
21
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_mark.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "device_sync_app/device_sync_app_manager.h"
30 #include "cloud_service.h"
31 #include "commonevent/data_sync_event.h"
32 #include "crypto/crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "dfx/dfx_types.h"
35 #include "dfx/reporter.h"
36 #include "eventcenter/event_center.h"
37 #include "log_print.h"
38 #include "metadata/meta_data_manager.h"
39 #include "metadata/secret_key_meta_data.h"
40 #include "rdb_cursor.h"
41 #include "rdb_query.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 #include "snapshot/snapshot.h"
47 namespace OHOS::DistributedRdb {
48 using namespace DistributedData;
49 using namespace DistributedDB;
50 using namespace NativeRdb;
51 using namespace std::chrono;
52 using namespace DistributedDataDfx;
53 using DBField = DistributedDB::Field;
54 using DBTable = DistributedDB::TableSchema;
55 using DBSchema = DistributedDB::DataBaseSchema;
56 using ClearMode = DistributedDB::ClearMode;
57 using DBStatus = DistributedDB::DBStatus;
58 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
59
60 constexpr const char *INSERT = "INSERT INTO ";
61 constexpr const char *REPLACE = "REPLACE INTO ";
62 constexpr const char *VALUES = " VALUES ";
63 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
64 constexpr const char *LOGOUT_RESERVE_FLAG = "RESERVE#ALL_CLOUDDATA";
65 constexpr const LockAction LOCK_ACTION =
66 static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE) |
67 static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD));
68 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
69 constexpr uint32_t SEARCHABLE_FLAG = 2;
70 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
71 static constexpr const char *FT_OPEN_STORE = "OPEN_STORE";
72 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
73
GetDBSchema(const Database & database)74 static DBSchema GetDBSchema(const Database &database)
75 {
76 DBSchema schema;
77 schema.tables.resize(database.tables.size());
78 for (size_t i = 0; i < database.tables.size(); i++) {
79 const Table &table = database.tables[i];
80 DBTable &dbTable = schema.tables[i];
81 dbTable.name = table.name;
82 dbTable.sharedTableName = table.sharedTableName;
83 for (auto &field : table.fields) {
84 DBField dbField;
85 dbField.colName = field.colName;
86 dbField.type = field.type;
87 dbField.primary = field.primary;
88 dbField.nullable = field.nullable;
89 dbTable.fields.push_back(std::move(dbField));
90 }
91 }
92 return schema;
93 }
94
IsExistence(const std::string & col,const std::vector<std::string> & cols)95 static bool IsExistence(const std::string &col, const std::vector<std::string> &cols)
96 {
97 for (auto &column : cols) {
98 if (col == column) {
99 return true;
100 }
101 }
102 return false;
103 }
104
GetGaussDistributedSchema(const Database & database)105 static DistributedSchema GetGaussDistributedSchema(const Database &database)
106 {
107 DistributedSchema distributedSchema;
108 distributedSchema.version = database.version;
109 distributedSchema.tables.resize(database.tables.size());
110 for (size_t i = 0; i < database.tables.size(); i++) {
111 const Table &table = database.tables[i];
112 DistributedTable &dbTable = distributedSchema.tables[i];
113 dbTable.tableName = table.name;
114 for (auto &field : table.fields) {
115 DistributedField dbField;
116 dbField.colName = field.colName;
117 dbField.isP2pSync = IsExistence(field.colName, table.deviceSyncFields);
118 dbField.isSpecified = field.primary;
119 dbTable.fields.push_back(std::move(dbField));
120 }
121 }
122 return distributedSchema;
123 }
124
GetDistributedSchema(const StoreMetaData & meta)125 static std::pair<bool, Database> GetDistributedSchema(const StoreMetaData &meta)
126 {
127 std::pair<bool, Database> tableData;
128 Database database;
129 database.bundleName = meta.bundleName;
130 database.name = meta.storeId;
131 database.user = meta.user;
132 database.deviceId = meta.deviceId;
133 tableData.first = MetaDataManager::GetInstance().LoadMeta(database.GetKey(), database, true);
134 tableData.second = database;
135 return tableData;
136 }
137
InitStoreInfo(const StoreMetaData & meta)138 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
139 {
140 storeInfo_.tokenId = meta.tokenId;
141 storeInfo_.bundleName = meta.bundleName;
142 storeInfo_.storeName = meta.storeId;
143 storeInfo_.instanceId = meta.instanceId;
144 storeInfo_.user = std::atoi(meta.user.c_str());
145 storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
146 storeInfo_.path = meta.dataDir;
147 }
148
GetOption(const StoreMetaData & meta)149 RelationalStoreDelegate::Option GetOption(const StoreMetaData &meta)
150 {
151 RelationalStoreDelegate::Option option;
152 option.syncDualTupleMode = true;
153 if (GetDistributedSchema(meta).first) {
154 option.tableMode = DistributedTableMode::COLLABORATION;
155 }
156 return option;
157 }
158
GetDBPassword(const StoreMetaData & data)159 RdbGeneralStore::DBPassword RdbGeneralStore::GetDBPassword(const StoreMetaData &data)
160 {
161 DBPassword dbPassword;
162 SecretKeyMetaData secretKey;
163 auto metaKey = data.GetSecretKey();
164 if (!MetaDataManager::GetInstance().LoadMeta(metaKey, secretKey, true) || secretKey.sKey.empty()) {
165 return dbPassword;
166 }
167 CryptoManager::CryptoParams decryptParams = { .area = secretKey.area, .userId = data.user,
168 .nonce = secretKey.nonce };
169 auto password = CryptoManager::GetInstance().Decrypt(secretKey.sKey, decryptParams);
170 if (password.empty()) {
171 return dbPassword;
172 }
173 // update secret key of area or nonce
174 CryptoManager::GetInstance().UpdateSecretMeta(password, data, metaKey, secretKey);
175 dbPassword.SetValue(password.data(), password.size());
176 password.assign(password.size(), 0);
177 return dbPassword;
178 }
179
RdbGeneralStore(const StoreMetaData & meta)180 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
181 : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
182 {
183 observer_.storeId_ = meta.storeId;
184 observer_.meta_ = meta;
185 RelationalStoreDelegate::Option option = GetOption(meta);
186 option.isNeedCompressOnSync = true;
187 option.observer = &observer_;
188 if (meta.isEncrypt) {
189 option.passwd = GetDBPassword(meta);
190 option.isEncryptedDb = meta.isEncrypt;
191 option.cipher = CipherType::AES_256_GCM;
192 for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
193 option.iterateTimes = ITERS[i];
194 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
195 if (ret == DBStatus::OK && delegate_ != nullptr) {
196 break;
197 }
198 manager_.CloseStore(delegate_);
199 delegate_ = nullptr;
200 }
201 } else {
202 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
203 if (ret != DBStatus::OK || delegate_ == nullptr) {
204 manager_.CloseStore(delegate_);
205 delegate_ = nullptr;
206 }
207 }
208 InitStoreInfo(meta);
209 if (meta.isSearchable) {
210 syncNotifyFlag_ |= SEARCHABLE_FLAG;
211 }
212 if (delegate_ != nullptr && meta.isManualClean) {
213 PragmaData data = static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
214 delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
215 }
216 ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
217 "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
218 meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
219 meta.isEncrypt, meta.isManualClean, meta.isSearchable);
220 }
221
~RdbGeneralStore()222 RdbGeneralStore::~RdbGeneralStore()
223 {
224 ZLOGI("Destruct. BundleName: %{public}s. StoreName: %{public}s. user: %{public}d",
225 storeInfo_.bundleName.c_str(), Anonymous::Change(storeInfo_.storeName).c_str(), storeInfo_.user);
226 manager_.CloseStore(delegate_);
227 delegate_ = nullptr;
228 bindInfo_.loader_ = nullptr;
229 if (bindInfo_.db_ != nullptr) {
230 bindInfo_.db_->Close();
231 }
232 bindInfo_.db_ = nullptr;
233 rdbCloud_ = nullptr;
234 rdbLoader_ = nullptr;
235 RemoveTasks();
236 tasks_ = nullptr;
237 executor_ = nullptr;
238 }
239
BindSnapshots(BindAssets bindAssets)240 int32_t RdbGeneralStore::BindSnapshots(BindAssets bindAssets)
241 {
242 if (snapshots_ == nullptr) {
243 snapshots_ = bindAssets;
244 }
245 return GenErr::E_OK;
246 }
247
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)248 int32_t RdbGeneralStore::Bind(const Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
249 const CloudConfig &config)
250 {
251 if (bindInfos.empty()) {
252 return GeneralError::E_OK;
253 }
254 auto bindInfo = bindInfos.begin()->second;
255 if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
256 return GeneralError::E_INVALID_ARGS;
257 }
258
259 if (isBound_.exchange(true)) {
260 return GeneralError::E_OK;
261 }
262
263 BindEvent::BindEventInfo eventInfo;
264 eventInfo.tokenId = storeInfo_.tokenId;
265 eventInfo.bundleName = storeInfo_.bundleName;
266 eventInfo.storeName = storeInfo_.storeName;
267 eventInfo.user = storeInfo_.user;
268 eventInfo.instanceId = storeInfo_.instanceId;
269
270 auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
271 EventCenter::GetInstance().PostEvent(std::move(evt));
272 bindInfo_ = std::move(bindInfo);
273 {
274 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
275 rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, snapshots_);
276 rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, snapshots_);
277 }
278
279 DistributedDB::CloudSyncConfig dbConfig;
280 dbConfig.maxUploadCount = config.maxNumber;
281 dbConfig.maxUploadSize = config.maxSize;
282 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
283 dbConfig.isSupportEncrypt = config.isSupportEncrypt;
284 DBSchema schema = GetDBSchema(database);
285 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
286 if (delegate_ == nullptr) {
287 ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
288 return GeneralError::E_ALREADY_CLOSED;
289 }
290 delegate_->SetCloudDB(rdbCloud_);
291 delegate_->SetIAssetLoader(rdbLoader_);
292 delegate_->SetCloudDbSchema(std::move(schema));
293 delegate_->SetCloudSyncConfig(dbConfig);
294
295 syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
296 return GeneralError::E_OK;
297 }
298
IsBound(uint32_t user)299 bool RdbGeneralStore::IsBound(uint32_t user)
300 {
301 return isBound_;
302 }
303
Close(bool isForce)304 int32_t RdbGeneralStore::Close(bool isForce)
305 {
306 {
307 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
308 if (!lock) {
309 return GeneralError::E_BUSY;
310 }
311
312 if (delegate_ == nullptr) {
313 return GeneralError::E_OK;
314 }
315 auto [dbStatus, downloadCount] = delegate_->GetDownloadingAssetsCount();
316 if (!isForce &&
317 (delegate_->GetCloudSyncTaskCount() > 0 || downloadCount > 0 || delegate_->GetDeviceSyncTaskCount() > 0)) {
318 return GeneralError::E_BUSY;
319 }
320 auto status = manager_.CloseStore(delegate_);
321 if (status != DBStatus::OK) {
322 return status;
323 }
324 delegate_ = nullptr;
325 }
326 RemoveTasks();
327 bindInfo_.loader_ = nullptr;
328 if (bindInfo_.db_ != nullptr) {
329 bindInfo_.db_->Close();
330 }
331 bindInfo_.db_ = nullptr;
332 {
333 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
334 rdbCloud_ = nullptr;
335 rdbLoader_ = nullptr;
336 }
337 return GeneralError::E_OK;
338 }
339
Execute(const std::string & table,const std::string & sql)340 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
341 {
342 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
343 if (delegate_ == nullptr) {
344 ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
345 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
346 Anonymous::Change(sql).c_str());
347 return GeneralError::E_ERROR;
348 }
349 std::vector<DistributedDB::VBucket> changedData;
350 auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
351 if (status != DBStatus::OK) {
352 ZLOGE("Execute failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
353 Anonymous::Change(sql).c_str(), changedData.size());
354 if (status == DBStatus::BUSY) {
355 return GeneralError::E_BUSY;
356 }
357 return GeneralError::E_ERROR;
358 }
359 return GeneralError::E_OK;
360 }
361
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)362 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
363 {
364 strColumnSql += " (";
365 strRowValueSql += " (";
366 auto columnSize = value.size();
367 for (auto column = value.begin(); column != value.end(); ++column) {
368 strRowValueSql += " ?,";
369 strColumnSql += " " + column->first + ",";
370 }
371 if (columnSize != 0) {
372 strColumnSql.pop_back();
373 strColumnSql += ")";
374 strRowValueSql.pop_back();
375 strRowValueSql += ")";
376 }
377 return columnSize;
378 }
379
Insert(const std::string & table,VBuckets && values)380 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
381 {
382 if (table.empty() || values.size() == 0) {
383 ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
384 return GeneralError::E_INVALID_ARGS;
385 }
386
387 std::string strColumnSql;
388 std::string strRowValueSql;
389 auto value = values.front();
390 size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
391 if (columnSize == 0) {
392 ZLOGE("Insert: columnSize error, can't be 0!");
393 return GeneralError::E_INVALID_ARGS;
394 }
395
396 Values args;
397 std::string strValueSql;
398 for (auto &rowData : values) {
399 if (rowData.size() != columnSize) {
400 ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
401 return GeneralError::E_INVALID_ARGS;
402 }
403 for (auto column = rowData.begin(); column != rowData.end(); ++column) {
404 args.push_back(std::move(column->second));
405 }
406 strValueSql += strRowValueSql + ",";
407 }
408 strValueSql.pop_back();
409 std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
410
411 std::vector<DistributedDB::VBucket> changedData;
412 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
413 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
414 if (delegate_ == nullptr) {
415 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
416 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
417 return GeneralError::E_ERROR;
418 }
419 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
420 if (status != DBStatus::OK) {
421 if (IsPrintLog(status)) {
422 auto time =
423 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
424 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
425 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
426 }
427 return GeneralError::E_ERROR;
428 }
429 return GeneralError::E_OK;
430 }
431
IsPrintLog(DBStatus status)432 bool RdbGeneralStore::IsPrintLog(DBStatus status)
433 {
434 bool isPrint = false;
435 if (status == lastError_) {
436 if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
437 isPrint = true;
438 }
439 } else {
440 isPrint = true;
441 lastErrCnt_ = 0;
442 lastError_ = status;
443 }
444 return isPrint;
445 }
446
447 /**
448 * This function does not support batch updates in rdb, it only supports single data updates.
449 */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)450 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
451 const std::string &whereSql, Values &&conditions)
452 {
453 if (table.empty()) {
454 ZLOGE("Update: table can't be empty!");
455 return GeneralError::E_INVALID_ARGS;
456 }
457 if (setSql.empty() || values.size() == 0) {
458 ZLOGE("Update: setSql and values can't be empty!");
459 return GeneralError::E_INVALID_ARGS;
460 }
461 if (whereSql.empty() || conditions.size() == 0) {
462 ZLOGE("Update: whereSql and conditions can't be empty!");
463 return GeneralError::E_INVALID_ARGS;
464 }
465
466 std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
467 Values args;
468 for (auto &value : values) {
469 args.push_back(std::move(value));
470 }
471 for (auto &condition : conditions) {
472 args.push_back(std::move(condition));
473 }
474
475 std::vector<DistributedDB::VBucket> changedData;
476 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
477 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
478 if (delegate_ == nullptr) {
479 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
480 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
481 return GeneralError::E_ERROR;
482 }
483 auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
484 if (status != DBStatus::OK) {
485 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
486 changedData.size());
487 return GeneralError::E_ERROR;
488 }
489 return GeneralError::E_OK;
490 }
491
Replace(const std::string & table,VBucket && value)492 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
493 {
494 if (table.empty() || value.size() == 0) {
495 return GeneralError::E_INVALID_ARGS;
496 }
497 std::string columnSql;
498 std::string valueSql;
499 SqlConcatenate(value, columnSql, valueSql);
500 std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
501
502 Values args;
503 for (auto &[k, v] : value) {
504 args.emplace_back(std::move(v));
505 }
506 std::vector<DistributedDB::VBucket> changedData;
507 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
508 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
509 if (delegate_ == nullptr) {
510 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
511 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
512 return GeneralError::E_ERROR;
513 }
514 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
515 if (status != DBStatus::OK) {
516 ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
517 status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
518 if (status == DBStatus::BUSY) {
519 return GeneralError::E_BUSY;
520 }
521 return GeneralError::E_ERROR;
522 }
523 return GeneralError::E_OK;
524 }
525
Delete(const std::string & table,const std::string & sql,Values && args)526 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
527 {
528 return 0;
529 }
530
Query(const std::string & table,const std::string & sql,Values && args)531 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
532 const std::string &sql, Values &&args)
533 {
534 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
535 if (delegate_ == nullptr) {
536 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
537 return { GeneralError::E_ALREADY_CLOSED, nullptr };
538 }
539 auto [errCode, records] = QuerySql(sql, std::move(args));
540 return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
541 }
542
Query(const std::string & table,GenQuery & query)543 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
544 {
545 RdbQuery *rdbQuery = nullptr;
546 auto ret = query.QueryInterface(rdbQuery);
547 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
548 ZLOGE("not RdbQuery!");
549 return { GeneralError::E_INVALID_ARGS, nullptr };
550 }
551 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
552 if (delegate_ == nullptr) {
553 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
554 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
555 return { GeneralError::E_ALREADY_CLOSED, nullptr };
556 }
557 if (rdbQuery->IsRemoteQuery()) {
558 if (rdbQuery->GetDevices().size() != 1) {
559 ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
560 return { GeneralError::E_ERROR, nullptr };
561 }
562 auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
563 return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
564 }
565 return { GeneralError::E_ERROR, nullptr };
566 }
567
MergeMigratedData(const std::string & tableName,VBuckets && values)568 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
569 {
570 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
571 if (delegate_ == nullptr) {
572 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
573 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
574 return GeneralError::E_ERROR;
575 }
576
577 auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
578 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
579 }
580
CleanTrackerData(const std::string & tableName,int64_t cursor)581 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
582 {
583 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
584 if (delegate_ == nullptr) {
585 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
586 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
587 return GeneralError::E_ERROR;
588 }
589
590 auto status = delegate_->CleanTrackerData(tableName, cursor);
591 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
592 }
593
DoCloudSync(const Devices & devices,const DistributedDB::Query & dbQuery,const SyncParam & syncParam,bool isPriority,DetailAsync async)594 std::pair<int32_t, int32_t> RdbGeneralStore::DoCloudSync(const Devices &devices, const DistributedDB::Query &dbQuery,
595 const SyncParam &syncParam, bool isPriority, DetailAsync async)
596 {
597 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
598 auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
599 SyncId syncId = ++syncTaskId_;
600 auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
601 if (executor_ != nullptr && tasks_ != nullptr) {
602 auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
603 tasks_->Insert(syncId, { id, callback });
604 }
605 CloudSyncOption option;
606 option.devices = devices;
607 option.mode = DistributedDB::SyncMode(syncMode);
608 option.query = dbQuery;
609 option.waitTime = syncParam.wait;
610 option.priorityTask = isPriority || highMode == MANUAL_SYNC_MODE;
611 option.priorityLevel = GetPriorityLevel(highMode);
612 option.compensatedSyncOnly = syncParam.isCompensation;
613 option.merge = highMode == AUTO_SYNC_MODE;
614 option.lockAction = LOCK_ACTION;
615 option.prepareTraceId = syncParam.prepareTraceId;
616 option.asyncDownloadAssets = syncParam.asyncDownloadAsset;
617 auto dbStatus = DistributedDB::INVALID_ARGS;
618 dbStatus = delegate_->Sync(option, tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
619 if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
620 return { ConvertStatus(dbStatus), dbStatus };
621 }
622 Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
623 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
624 tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
625 if (executor != nullptr) {
626 executor->Remove(task.taskId);
627 }
628 return false;
629 });
630 return { ConvertStatus(dbStatus), dbStatus };
631 }
632
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)633 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
634 const SyncParam &syncParam)
635 {
636 DistributedDB::Query dbQuery;
637 RdbQuery *rdbQuery = nullptr;
638 bool isPriority = false;
639 auto ret = query.QueryInterface(rdbQuery);
640 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
641 dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
642 } else {
643 dbQuery = rdbQuery->GetQuery();
644 isPriority = rdbQuery->IsPriority();
645 }
646 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
647 auto dbMode = DistributedDB::SyncMode(syncMode);
648 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
649 if (delegate_ == nullptr) {
650 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
651 "wait:%{public}d", devices.size(),
652 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
653 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
654 }
655 auto dbStatus = DistributedDB::INVALID_ARGS;
656 if (syncMode < NEARBY_END) {
657 dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
658 } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
659 return DoCloudSync(devices, dbQuery, syncParam, isPriority, async);
660 }
661 return { ConvertStatus(dbStatus), dbStatus };
662 }
663
PreSharing(GenQuery & query)664 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
665 {
666 RdbQuery *rdbQuery = nullptr;
667 auto ret = query.QueryInterface(rdbQuery);
668 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
669 ZLOGE("not RdbQuery!");
670 return { GeneralError::E_INVALID_ARGS, nullptr };
671 }
672 auto tables = rdbQuery->GetTables();
673 auto statement = rdbQuery->GetStatement();
674 if (statement.empty() || tables.empty()) {
675 ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
676 return { GeneralError::E_INVALID_ARGS, nullptr };
677 }
678 std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
679 VBuckets values;
680 {
681 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
682 if (delegate_ == nullptr) {
683 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
684 return { GeneralError::E_ALREADY_CLOSED, nullptr };
685 }
686 auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
687 values = std::move(ret);
688 }
689 auto rdbCloud = GetRdbCloud();
690 if (rdbCloud == nullptr || values.empty()) {
691 ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
692 values.size());
693 return { GeneralError::E_CLOUD_DISABLED, nullptr };
694 }
695 VBuckets extends = ExtractExtend(values);
696 rdbCloud->PreSharing(*tables.begin(), extends);
697 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
698 ++value, ++extend) {
699 value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
700 value->erase(CLOUD_GID);
701 }
702 return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
703 }
704
ExtractExtend(VBuckets & values) const705 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
706 {
707 VBuckets extends(values.size());
708 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
709 ++value, ++extend) {
710 auto it = value->find(CLOUD_GID);
711 if (it == value->end()) {
712 continue;
713 }
714 auto gid = std::get_if<std::string>(&(it->second));
715 if (gid == nullptr || gid->empty()) {
716 continue;
717 }
718 extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
719 }
720 return extends;
721 }
722
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const723 std::string RdbGeneralStore::BuildSql(
724 const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
725 {
726 std::string sql = "select ";
727 sql.append(CLOUD_GID);
728 std::string sqlNode = "select rowid";
729 for (auto &column : columns) {
730 sql.append(", ").append(column);
731 sqlNode.append(", ").append(column);
732 }
733 sqlNode.append(" from ").append(table).append(statement);
734 auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
735 sql.append(" from ").append(logTable).append(", (").append(sqlNode);
736 sql.append(") where ").append(DATE_KEY).append(" = rowid");
737 return sql;
738 }
739
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)740 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
741 {
742 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
743 return GeneralError::E_INVALID_ARGS;
744 }
745 DBStatus status = DistributedDB::DB_ERROR;
746 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
747 if (delegate_ == nullptr) {
748 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
749 "tableName:%{public}s",
750 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
751 Anonymous::Change(tableName).c_str());
752 return GeneralError::E_ALREADY_CLOSED;
753 }
754 switch (mode) {
755 case CLOUD_INFO:
756 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
757 if (status == DistributedDB::OK) {
758 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
759 break;
760 }
761 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
762 break;
763 case CLOUD_DATA:
764 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
765 if (status == DistributedDB::OK) {
766 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
767 break;
768 }
769 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
770 break;
771 case NEARBY_DATA:
772 if (devices.empty()) {
773 status = delegate_->RemoveDeviceData();
774 break;
775 }
776 for (auto device : devices) {
777 status = delegate_->RemoveDeviceData(device, tableName);
778 }
779 break;
780 default:
781 return GeneralError::E_ERROR;
782 }
783 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
784 }
785
Watch(int32_t origin,Watcher & watcher)786 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
787 {
788 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
789 return GeneralError::E_INVALID_ARGS;
790 }
791
792 observer_.watcher_ = &watcher;
793 return GeneralError::E_OK;
794 }
795
Unwatch(int32_t origin,Watcher & watcher)796 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
797 {
798 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
799 return GeneralError::E_INVALID_ARGS;
800 }
801
802 observer_.watcher_ = nullptr;
803 return GeneralError::E_OK;
804 }
805
GetDBBriefCB(DetailAsync async)806 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
807 {
808 if (!async) {
809 return [](auto &) {};
810 }
811 return [async = std::move(async)](
812 const std::map<std::string, std::vector<TableStatus>> &result) {
813 DistributedData::GenDetails details;
814 for (auto &[key, tables] : result) {
815 auto &value = details[key];
816 value.progress = FINISHED;
817 value.code = GeneralError::E_OK;
818 for (auto &table : tables) {
819 if (table.status != DBStatus::OK) {
820 value.code = GeneralError::E_ERROR;
821 }
822 }
823 }
824 async(details);
825 };
826 }
827
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)828 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
829 uint32_t highMode)
830 {
831 std::shared_lock<std::shared_mutex> lock(asyncMutex_);
832 return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
833 rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
834 DistributedData::GenDetails details;
835 for (auto &[id, process] : processes) {
836 bool isDownload = false;
837 auto &detail = details[id];
838 detail.progress = process.process;
839 detail.code = ConvertStatus(process.errCode);
840 detail.dbCode = process.errCode;
841 uint32_t totalCount = 0;
842 for (auto [key, value] : process.tableProcess) {
843 auto &table = detail.details[key];
844 table.upload.total = value.upLoadInfo.total;
845 table.upload.success = value.upLoadInfo.successCount;
846 table.upload.failed = value.upLoadInfo.failCount;
847 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
848 totalCount += table.upload.total;
849 isDownload = table.download.total > 0;
850 table.download.total = value.downLoadInfo.total;
851 table.download.success = value.downLoadInfo.successCount;
852 table.download.failed = value.downLoadInfo.failCount;
853 table.download.untreated = table.download.total - table.download.success - table.download.failed;
854 detail.changeCount = (process.process == FINISHED)
855 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
856 value.downLoadInfo.deleteCount
857 : 0;
858 totalCount += table.download.total;
859 }
860 if (process.process == FINISHED) {
861 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
862 } else {
863 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
864 }
865
866 if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
867 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
868 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
869 }
870 }
871 if (async) {
872 async(details);
873 }
874
875 if (highMode == AUTO_SYNC_MODE && autoAsync
876 && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
877 autoAsync(details);
878 }
879 };
880 }
881
Release()882 int32_t RdbGeneralStore::Release()
883 {
884 auto ref = 1;
885 {
886 std::lock_guard<decltype(mutex_)> lock(mutex_);
887 if (ref_ == 0) {
888 return 0;
889 }
890 ref = --ref_;
891 }
892 ZLOGD("ref:%{public}d", ref);
893 if (ref == 0) {
894 delete this;
895 }
896 return ref;
897 }
898
AddRef()899 int32_t RdbGeneralStore::AddRef()
900 {
901 std::lock_guard<decltype(mutex_)> lock(mutex_);
902 if (ref_ == 0) {
903 return 0;
904 }
905 return ++ref_;
906 }
907
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)908 void RdbGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
909 {
910 ArkDataFaultMsg msg = { .faultType = faultType,
911 .bundleName = storeInfo_.bundleName,
912 .moduleName = ModuleName::RDB_STORE,
913 .storeName = storeInfo_.storeName,
914 .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
915 .appendixMsg = appendix };
916 Reporter::GetInstance()->CloudSyncFault()->Report(msg);
917 }
918
SetReference(const std::vector<Reference> & references)919 int32_t RdbGeneralStore::SetReference(const std::vector<Reference> &references)
920 {
921 std::vector<DistributedDB::TableReferenceProperty> properties;
922 for (const auto &reference : references) {
923 properties.push_back({reference.sourceTable, reference.targetTable, reference.refFields});
924 }
925 auto status = delegate_->SetReference(properties);
926 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::PROPERTY_CHANGED) {
927 ZLOGE("distributed table set reference failed, err:%{public}d", status);
928 return GeneralError::E_ERROR;
929 }
930 return GeneralError::E_OK;
931 }
932
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)933 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
934 const std::vector<Reference> &references)
935 {
936 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
937 if (delegate_ == nullptr) {
938 ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
939 Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
940 return GeneralError::E_ALREADY_CLOSED;
941 }
942 for (const auto &table : tables) {
943 ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
944 auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
945 if (dBStatus != DistributedDB::DBStatus::OK) {
946 ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
947 Anonymous::Change(table).c_str(), dBStatus);
948 Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
949 "SetDistributedTables: set table(" + Anonymous::Change(table) + ") =" +
950 std::to_string(static_cast<int32_t>(dBStatus)));
951 return GeneralError::E_ERROR;
952 }
953 }
954 if (type == DistributedTableType::DISTRIBUTED_CLOUD) {
955 auto status = SetReference(references);
956 if (status != GeneralError::E_OK) {
957 Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
958 "SetDistributedTables: set reference=" + std::to_string(static_cast<int32_t>(status)));
959 return GeneralError::E_ERROR;
960 }
961 }
962 auto [exist, database] = GetDistributedSchema(observer_.meta_);
963 if (exist && type == DistributedTableType::DISTRIBUTED_DEVICE) {
964 auto force = DeviceSyncAppManager::GetInstance().Check(
965 {observer_.meta_.appId, observer_.meta_.bundleName, database.version});
966 delegate_->SetDistributedSchema(GetGaussDistributedSchema(database), force);
967 }
968 CloudMark metaData(storeInfo_);
969 if (MetaDataManager::GetInstance().LoadMeta(metaData.GetKey(), metaData, true) && metaData.isClearWaterMark) {
970 DistributedDB::ClearMetaDataOption option{ .mode = DistributedDB::ClearMetaDataMode::CLOUD_WATERMARK };
971 auto ret = delegate_->ClearMetaData(option);
972 if (ret != DBStatus::OK) {
973 ZLOGE("clear watermark failed, err:%{public}d", ret);
974 return GeneralError::E_ERROR;
975 }
976 MetaDataManager::GetInstance().DelMeta(metaData.GetKey(), true);
977 auto event = std::make_unique<CloudEvent>(CloudEvent::UPGRADE_SCHEMA, storeInfo_);
978 EventCenter::GetInstance().PostEvent(std::move(event));
979 ZLOGI("clear watermark success, bundleName:%{public}s, storeName:%{public}s", storeInfo_.bundleName.c_str(),
980 Anonymous::Change(storeInfo_.storeName).c_str());
981 }
982 return GeneralError::E_OK;
983 }
984
SetConfig(const StoreConfig & storeConfig)985 void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig)
986 {
987 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
988 if (delegate_ == nullptr) {
989 ZLOGE("database already closed!, tableMode is :%{public}d",
990 storeConfig.tableMode.has_value() ? static_cast<int32_t>(storeConfig.tableMode.value()) : -1);
991 return;
992 }
993 if (storeConfig.tableMode.has_value()) {
994 RelationalStoreDelegate::StoreConfig config;
995 if (storeConfig.tableMode == DistributedTableMode::COLLABORATION) {
996 config.tableMode = DistributedDB::DistributedTableMode::COLLABORATION;
997 } else if (storeConfig.tableMode == DistributedTableMode::SPLIT_BY_DEVICE) {
998 config.tableMode = DistributedDB::DistributedTableMode::SPLIT_BY_DEVICE;
999 }
1000 delegate_->SetStoreConfig(config);
1001 }
1002 }
1003
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)1004 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
1005 const std::set<std::string> &extendColNames, bool isForceUpgrade)
1006 {
1007 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1008 if (delegate_ == nullptr) {
1009 ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
1010 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1011 return GeneralError::E_ALREADY_CLOSED;
1012 }
1013 auto status = delegate_->SetTrackerTable({ tableName, extendColNames, trackerColNames, isForceUpgrade });
1014 if (status == DBStatus::WITH_INVENTORY_DATA) {
1015 ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
1016 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1017 return GeneralError::E_WITH_INVENTORY_DATA;
1018 }
1019 if (status != DBStatus::OK) {
1020 ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
1021 status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1022 return GeneralError::E_ERROR;
1023 }
1024 return GeneralError::E_OK;
1025 }
1026
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)1027 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
1028 const DistributedDB::RemoteCondition &remoteCondition)
1029 {
1030 std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
1031 DistributedDB::DBStatus status =
1032 delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
1033 if (status != DistributedDB::DBStatus::OK) {
1034 ZLOGE("DistributedDB remote query failed, device:%{public}s, status is %{public}d.",
1035 Anonymous::Change(device).c_str(), status);
1036 return nullptr;
1037 }
1038 return std::make_shared<RdbCursor>(dbResultSet);
1039 }
1040
ConvertStatus(DistributedDB::DBStatus status)1041 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
1042 {
1043 switch (status) {
1044 case DBStatus::OK:
1045 return GenErr::E_OK;
1046 case DBStatus::CLOUD_NETWORK_ERROR:
1047 return GenErr::E_NETWORK_ERROR;
1048 case DBStatus::CLOUD_LOCK_ERROR:
1049 return GenErr::E_LOCKED_BY_OTHERS;
1050 case DBStatus::CLOUD_FULL_RECORDS:
1051 return GenErr::E_RECODE_LIMIT_EXCEEDED;
1052 case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
1053 return GenErr::E_NO_SPACE_FOR_ASSET;
1054 case DBStatus::BUSY:
1055 return GenErr::E_BUSY;
1056 case DBStatus::CLOUD_SYNC_TASK_MERGED:
1057 return GenErr::E_SYNC_TASK_MERGED;
1058 case DBStatus::CLOUD_DISABLED:
1059 return GeneralError::E_CLOUD_DISABLED;
1060 default:
1061 ZLOGI("status:0x%{public}x", status);
1062 break;
1063 }
1064 return GenErr::E_ERROR;
1065 }
1066
IsValid()1067 bool RdbGeneralStore::IsValid()
1068 {
1069 return delegate_ != nullptr;
1070 }
1071
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)1072 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
1073 {
1074 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1075 async_ = std::move(async);
1076 return GenErr::E_OK;
1077 }
1078
UnregisterDetailProgressObserver()1079 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
1080 {
1081 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1082 async_ = nullptr;
1083 return GenErr::E_OK;
1084 }
1085
QuerySql(const std::string & sql,Values && args)1086 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
1087 {
1088 std::vector<DistributedDB::VBucket> changedData;
1089 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
1090 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
1091 if (status != DBStatus::OK) {
1092 ZLOGE("Query failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
1093 Anonymous::Change(sql).c_str(), changedData.size());
1094 if (status == DBStatus::BUSY) {
1095 return { GenErr::E_BUSY, {} };
1096 }
1097 return { GenErr::E_ERROR, {} };
1098 }
1099 return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
1100 }
1101
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)1102 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
1103 uint32_t syncCount)
1104 {
1105 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1106 if (requiredFlag != (requiredFlag & flag)) {
1107 return;
1108 }
1109 StoreInfo info = storeInfo;
1110 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
1111 traceId, syncCount);
1112 EventCenter::GetInstance().PostEvent(std::move(evt));
1113 }
1114
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)1115 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
1116 {
1117 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1118 if (requiredFlag != (requiredFlag & flag)) {
1119 return;
1120 }
1121 StoreInfo info = storeInfo;
1122 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
1123 traceId);
1124 EventCenter::GetInstance().PostEvent(std::move(evt));
1125 }
1126
GetTables()1127 std::set<std::string> RdbGeneralStore::GetTables()
1128 {
1129 std::set<std::string> tables;
1130 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1131 if (delegate_ == nullptr) {
1132 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1133 return tables;
1134 }
1135 auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
1136 if (errCode != GenErr::E_OK) {
1137 return tables;
1138 }
1139 for (auto &table : res) {
1140 auto it = table.find("name");
1141 if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
1142 ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1143 continue;
1144 }
1145 tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
1146 }
1147 return tables;
1148 }
1149
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)1150 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
1151 const std::set<std::string> &localTables)
1152 {
1153 std::vector<std::string> res;
1154 for (auto &it : syncTables) {
1155 if (localTables.count(it) &&
1156 localTables.count(RelationalStoreManager::GetDistributedLogTableName(it))) {
1157 res.push_back(std::move(it));
1158 }
1159 }
1160 return res;
1161 }
1162
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)1163 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1164 const std::vector<std::string> &tables, ChangeType type)
1165 {
1166 RemoteChangeEvent::DataInfo info;
1167 info.userId = meta.user;
1168 info.storeId = meta.storeId;
1169 info.deviceId = meta.deviceId;
1170 info.bundleName = meta.bundleName;
1171 info.tables = tables;
1172 info.changeType = type;
1173 auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1174 EventCenter::GetInstance().PostEvent(std::move(evt));
1175 }
1176
OnChange(const DBChangedIF & data)1177 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1178 {
1179 if (!HasWatcher()) {
1180 return;
1181 }
1182 std::string device = data.GetDataChangeDevice();
1183 auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1184 ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1185 Anonymous::Change(device).c_str());
1186 GenOrigin genOrigin;
1187 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1188 genOrigin.dataType = GenOrigin::BASIC_DATA;
1189 DistributedDB::StoreProperty property;
1190 data.GetStoreProperty(property);
1191 genOrigin.id.push_back(networkId);
1192 genOrigin.store = storeId_;
1193 GeneralWatcher::ChangeInfo changeInfo{};
1194 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1195 return;
1196 }
1197
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1198 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1199 {
1200 if (!HasWatcher()) {
1201 return;
1202 }
1203 ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1204 Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1205 GenOrigin genOrigin;
1206 genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1207 ? GenOrigin::ORIGIN_LOCAL
1208 : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1209 genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1210 genOrigin.id.push_back(originalId);
1211 genOrigin.store = storeId_;
1212 Watcher::PRIFields fields;
1213 Watcher::ChangeInfo changeInfo;
1214 bool notifyFlag = false;
1215 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1216 auto &info = changeInfo[data.tableName][i];
1217 for (auto &priData : data.primaryData[i]) {
1218 Watcher::PRIValue value;
1219 if (priData.empty()) {
1220 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
1221 Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1222 Anonymous::Change(originalId).c_str(), i);
1223 continue;
1224 }
1225 Convert(std::move(*(priData.begin())), value);
1226 if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1227 info.push_back(std::move(value));
1228 continue;
1229 }
1230 auto key = std::get_if<std::string>(&value);
1231 if (key != nullptr && (*key == LOGOUT_DELETE_FLAG || *key == LOGOUT_RESERVE_FLAG)) {
1232 // notify to start app
1233 notifyFlag = true;
1234 }
1235 info.push_back(std::move(value));
1236 }
1237 }
1238 if (notifyFlag) {
1239 ZLOGI("post data change for cleaning cloud data. store:%{public}s table:%{public}s data change from "
1240 ":%{public}s",
1241 Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1242 Anonymous::Change(originalId).c_str());
1243 PostDataChange(meta_, {}, CLOUD_LOGOUT);
1244 }
1245 if (!data.field.empty()) {
1246 fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1247 }
1248 watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1249 }
1250
LockCloudDB()1251 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1252 {
1253 auto rdbCloud = GetRdbCloud();
1254 if (rdbCloud == nullptr) {
1255 return { GeneralError::E_ERROR, 0 };
1256 }
1257 return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1258 }
1259
UnLockCloudDB()1260 int32_t RdbGeneralStore::UnLockCloudDB()
1261 {
1262 auto rdbCloud = GetRdbCloud();
1263 if (rdbCloud == nullptr) {
1264 return GeneralError::E_ERROR;
1265 }
1266 return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1267 }
1268
GetRdbCloud() const1269 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1270 {
1271 std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1272 return rdbCloud_;
1273 }
1274
IsFinished(SyncId syncId) const1275 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1276 {
1277 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1278 if (delegate_ == nullptr) {
1279 ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1280 return true;
1281 }
1282 return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1283 }
1284
GetFinishTask(SyncId syncId)1285 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1286 {
1287 return [this, executor = executor_, task = tasks_, syncId]() {
1288 auto [exist, finishTask] = task->Find(syncId);
1289 if (!exist || finishTask.cb == nullptr) {
1290 task->Erase(syncId);
1291 return;
1292 }
1293 if (!IsFinished(syncId)) {
1294 task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1295 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1296 return true;
1297 });
1298 return;
1299 }
1300 DBProcessCB cb;
1301 task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1302 cb = task.cb;
1303 return false;
1304 });
1305 if (cb != nullptr) {
1306 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1307 Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1308 std::map<std::string, SyncProcess> result;
1309 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1310 cb(result);
1311 }
1312 };
1313 }
1314
SetExecutor(std::shared_ptr<Executor> executor)1315 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1316 {
1317 if (executor_ == nullptr) {
1318 executor_ = executor;
1319 }
1320 }
1321
RemoveTasks()1322 void RdbGeneralStore::RemoveTasks()
1323 {
1324 if (tasks_ == nullptr) {
1325 return;
1326 }
1327 std::list<DBProcessCB> cbs;
1328 std::list<TaskId> taskIds;
1329 tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1330 if (task.cb != nullptr) {
1331 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1332 syncId);
1333 }
1334 cbs.push_back(std::move(task.cb));
1335 taskIds.push_back(task.taskId);
1336 return true;
1337 });
1338 auto func = [](const std::list<DBProcessCB> &cbs) {
1339 std::map<std::string, SyncProcess> result;
1340 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1341 for (auto &cb : cbs) {
1342 if (cb != nullptr) {
1343 cb(result);
1344 }
1345 }
1346 };
1347 if (executor_ != nullptr) {
1348 for (auto taskId: taskIds) {
1349 executor_->Remove(taskId, true);
1350 }
1351 executor_->Execute([cbs, func]() {
1352 func(cbs);
1353 });
1354 } else {
1355 func(cbs);
1356 }
1357 }
1358
GetCB(SyncId syncId)1359 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1360 {
1361 return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1362 if (task == nullptr) {
1363 return;
1364 }
1365 DBProcessCB cb;
1366 task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1367 cb = finishTask.cb;
1368 bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1369 if (isFinished) {
1370 finishTask.cb = nullptr;
1371 }
1372 return true;
1373 });
1374 if (cb != nullptr) {
1375 cb(progress);
1376 }
1377 return;
1378 };
1379 }
1380
UpdateDBStatus()1381 int32_t RdbGeneralStore::UpdateDBStatus()
1382 {
1383 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1384 if (delegate_ == nullptr) {
1385 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1386 return GeneralError::E_ALREADY_CLOSED;
1387 }
1388 return delegate_->OperateDataStatus(static_cast<uint32_t>(DataOperator::UPDATE_TIME));
1389 }
1390 } // namespace OHOS::DistributedRdb