1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbGeneralStore"
16
17 #include "rdb_general_store.h"
18
19 #include <chrono>
20 #include <cinttypes>
21
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "dfx_types.h"
34 #include "device_manager_adapter.h"
35 #include "eventcenter/event_center.h"
36 #include "log_print.h"
37 #include "metadata/meta_data_manager.h"
38 #include "metadata/secret_key_meta_data.h"
39 #include "rdb_cursor.h"
40 #include "rdb_helper.h"
41 #include "rdb_query.h"
42 #include "rdb_result_set_impl.h"
43 #include "relational_store_manager.h"
44 #include "reporter.h"
45 #include "snapshot/bind_event.h"
46 #include "utils/anonymous.h"
47 #include "value_proxy.h"
48 namespace OHOS::DistributedRdb {
49 using namespace DistributedData;
50 using namespace DistributedDB;
51 using namespace NativeRdb;
52 using namespace std::chrono;
53 using namespace DistributedDataDfx;
54 using DBField = DistributedDB::Field;
55 using DBTable = DistributedDB::TableSchema;
56 using DBSchema = DistributedDB::DataBaseSchema;
57 using ClearMode = DistributedDB::ClearMode;
58 using DBStatus = DistributedDB::DBStatus;
59 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
60
61 constexpr const char *INSERT = "INSERT INTO ";
62 constexpr const char *REPLACE = "REPLACE INTO ";
63 constexpr const char *VALUES = " VALUES ";
64 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
65 constexpr const LockAction LOCK_ACTION =
66 static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE) |
67 static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD));
68 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
69 constexpr uint32_t SEARCHABLE_FLAG = 2;
70 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
71 static constexpr const char *FT_OPEN_STORE = "OPEN_STORE";
72 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
73
GetDBSchema(const Database & database)74 static DBSchema GetDBSchema(const Database &database)
75 {
76 DBSchema schema;
77 schema.tables.resize(database.tables.size());
78 for (size_t i = 0; i < database.tables.size(); i++) {
79 const Table &table = database.tables[i];
80 DBTable &dbTable = schema.tables[i];
81 dbTable.name = table.name;
82 dbTable.sharedTableName = table.sharedTableName;
83 for (auto &field : table.fields) {
84 DBField dbField;
85 dbField.colName = field.colName;
86 dbField.type = field.type;
87 dbField.primary = field.primary;
88 dbField.nullable = field.nullable;
89 dbTable.fields.push_back(std::move(dbField));
90 }
91 }
92 return schema;
93 }
94
IsExistence(const std::string & col,const std::vector<std::string> & cols)95 static bool IsExistence(const std::string &col, const std::vector<std::string> &cols)
96 {
97 for (auto &column : cols) {
98 if (col == column) {
99 return true;
100 }
101 }
102 return false;
103 }
104
GetGaussDistributedSchema(const Database & database)105 static DistributedSchema GetGaussDistributedSchema(const Database &database)
106 {
107 DistributedSchema distributedSchema;
108 distributedSchema.version = database.version;
109 distributedSchema.tables.resize(database.tables.size());
110 for (size_t i = 0; i < database.tables.size(); i++) {
111 const Table &table = database.tables[i];
112 DistributedTable &dbTable = distributedSchema.tables[i];
113 dbTable.tableName = table.name;
114 for (auto &field : table.fields) {
115 DistributedField dbField;
116 dbField.colName = field.colName;
117 dbField.isP2pSync = IsExistence(field.colName, table.deviceSyncFields);
118 dbField.isSpecified = field.primary;
119 dbTable.fields.push_back(std::move(dbField));
120 }
121 }
122 return distributedSchema;
123 }
124
GetDistributedSchema(const StoreMetaData & meta)125 static std::pair<bool, Database> GetDistributedSchema(const StoreMetaData &meta)
126 {
127 std::pair<bool, Database> tableData;
128 Database database;
129 database.bundleName = meta.bundleName;
130 database.name = meta.storeId;
131 database.user = meta.user;
132 database.deviceId = meta.deviceId;
133 tableData.first = MetaDataManager::GetInstance().LoadMeta(database.GetKey(), database, true);
134 tableData.second = database;
135 return tableData;
136 }
137
InitStoreInfo(const StoreMetaData & meta)138 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
139 {
140 storeInfo_.tokenId = meta.tokenId;
141 storeInfo_.bundleName = meta.bundleName;
142 storeInfo_.storeName = meta.storeId;
143 storeInfo_.instanceId = meta.instanceId;
144 storeInfo_.user = std::atoi(meta.user.c_str());
145 storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
146 }
147
GetOption(const StoreMetaData & meta)148 RelationalStoreDelegate::Option GetOption(const StoreMetaData &meta)
149 {
150 RelationalStoreDelegate::Option option;
151 option.syncDualTupleMode = true;
152 if (GetDistributedSchema(meta).first) {
153 option.tableMode = DistributedTableMode::COLLABORATION;
154 }
155 return option;
156 }
157
RdbGeneralStore(const StoreMetaData & meta)158 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
159 : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
160 {
161 observer_.storeId_ = meta.storeId;
162 observer_.meta_ = meta;
163 RelationalStoreDelegate::Option option = GetOption(meta);
164 option.observer = &observer_;
165 if (meta.isEncrypt) {
166 std::string key = meta.GetSecretKey();
167 SecretKeyMetaData secretKeyMeta;
168 MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
169 std::vector<uint8_t> decryptKey;
170 CryptoManager::GetInstance().Decrypt(meta, secretKeyMeta, decryptKey);
171 option.passwd.SetValue(decryptKey.data(), decryptKey.size());
172 std::fill(decryptKey.begin(), decryptKey.end(), 0);
173 option.isEncryptedDb = meta.isEncrypt;
174 option.cipher = CipherType::AES_256_GCM;
175 for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
176 option.iterateTimes = ITERS[i];
177 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
178 if (ret == DBStatus::OK && delegate_ != nullptr) {
179 break;
180 }
181 manager_.CloseStore(delegate_);
182 delegate_ = nullptr;
183 }
184 } else {
185 auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
186 if (ret != DBStatus::OK || delegate_ == nullptr) {
187 manager_.CloseStore(delegate_);
188 delegate_ = nullptr;
189 }
190 }
191 InitStoreInfo(meta);
192 if (meta.isSearchable) {
193 syncNotifyFlag_ |= SEARCHABLE_FLAG;
194 }
195 if (delegate_ != nullptr && meta.isManualClean) {
196 PragmaData data = static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
197 delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
198 }
199 ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
200 "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
201 meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
202 meta.isEncrypt, meta.isManualClean, meta.isSearchable);
203 }
204
~RdbGeneralStore()205 RdbGeneralStore::~RdbGeneralStore()
206 {
207 ZLOGI("Destruct. BundleName: %{public}s. StoreName: %{public}s. user: %{public}d",
208 storeInfo_.bundleName.c_str(), Anonymous::Change(storeInfo_.storeName).c_str(), storeInfo_.user);
209 manager_.CloseStore(delegate_);
210 delegate_ = nullptr;
211 bindInfo_.loader_ = nullptr;
212 if (bindInfo_.db_ != nullptr) {
213 bindInfo_.db_->Close();
214 }
215 bindInfo_.db_ = nullptr;
216 rdbCloud_ = nullptr;
217 rdbLoader_ = nullptr;
218 RemoveTasks();
219 tasks_ = nullptr;
220 executor_ = nullptr;
221 }
222
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)223 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
224 {
225 if (snapshots_.bindAssets == nullptr) {
226 snapshots_.bindAssets = bindAssets;
227 }
228 return GenErr::E_OK;
229 }
230
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)231 int32_t RdbGeneralStore::Bind(const Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
232 const CloudConfig &config)
233 {
234 if (bindInfos.empty()) {
235 return GeneralError::E_OK;
236 }
237 auto bindInfo = bindInfos.begin()->second;
238 if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
239 return GeneralError::E_INVALID_ARGS;
240 }
241
242 if (isBound_.exchange(true)) {
243 return GeneralError::E_OK;
244 }
245
246 BindEvent::BindEventInfo eventInfo;
247 eventInfo.tokenId = storeInfo_.tokenId;
248 eventInfo.bundleName = storeInfo_.bundleName;
249 eventInfo.storeName = storeInfo_.storeName;
250 eventInfo.user = storeInfo_.user;
251 eventInfo.instanceId = storeInfo_.instanceId;
252
253 auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
254 EventCenter::GetInstance().PostEvent(std::move(evt));
255 bindInfo_ = std::move(bindInfo);
256 {
257 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
258 rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
259 rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
260 }
261
262 DistributedDB::CloudSyncConfig dbConfig;
263 dbConfig.maxUploadCount = config.maxNumber;
264 dbConfig.maxUploadSize = config.maxSize;
265 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
266 DBSchema schema = GetDBSchema(database);
267 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
268 if (delegate_ == nullptr) {
269 ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
270 return GeneralError::E_ALREADY_CLOSED;
271 }
272 delegate_->SetCloudDB(rdbCloud_);
273 delegate_->SetIAssetLoader(rdbLoader_);
274 delegate_->SetCloudDbSchema(std::move(schema));
275 delegate_->SetCloudSyncConfig(dbConfig);
276
277 syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
278 return GeneralError::E_OK;
279 }
280
IsBound(uint32_t user)281 bool RdbGeneralStore::IsBound(uint32_t user)
282 {
283 return isBound_;
284 }
285
Close(bool isForce)286 int32_t RdbGeneralStore::Close(bool isForce)
287 {
288 {
289 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
290 if (!lock) {
291 return GeneralError::E_BUSY;
292 }
293
294 if (delegate_ == nullptr) {
295 return GeneralError::E_OK;
296 }
297 auto [dbStatus, downloadCount] = delegate_->GetDownloadingAssetsCount();
298 if (!isForce && (delegate_->GetCloudSyncTaskCount() > 0 || downloadCount > 0)) {
299 return GeneralError::E_BUSY;
300 }
301 auto status = manager_.CloseStore(delegate_);
302 if (status != DBStatus::OK) {
303 return status;
304 }
305 delegate_ = nullptr;
306 }
307 RemoveTasks();
308 bindInfo_.loader_ = nullptr;
309 if (bindInfo_.db_ != nullptr) {
310 bindInfo_.db_->Close();
311 }
312 bindInfo_.db_ = nullptr;
313 {
314 std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
315 rdbCloud_ = nullptr;
316 rdbLoader_ = nullptr;
317 }
318 return GeneralError::E_OK;
319 }
320
Execute(const std::string & table,const std::string & sql)321 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
322 {
323 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
324 if (delegate_ == nullptr) {
325 ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
326 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
327 Anonymous::Change(sql).c_str());
328 return GeneralError::E_ERROR;
329 }
330 std::vector<DistributedDB::VBucket> changedData;
331 auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
332 if (status != DBStatus::OK) {
333 ZLOGE("Execute failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
334 Anonymous::Change(sql).c_str(), changedData.size());
335 if (status == DBStatus::BUSY) {
336 return GeneralError::E_BUSY;
337 }
338 return GeneralError::E_ERROR;
339 }
340 return GeneralError::E_OK;
341 }
342
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)343 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
344 {
345 strColumnSql += " (";
346 strRowValueSql += " (";
347 auto columnSize = value.size();
348 for (auto column = value.begin(); column != value.end(); ++column) {
349 strRowValueSql += " ?,";
350 strColumnSql += " " + column->first + ",";
351 }
352 if (columnSize != 0) {
353 strColumnSql.pop_back();
354 strColumnSql += ")";
355 strRowValueSql.pop_back();
356 strRowValueSql += ")";
357 }
358 return columnSize;
359 }
360
Insert(const std::string & table,VBuckets && values)361 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
362 {
363 if (table.empty() || values.size() == 0) {
364 ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
365 return GeneralError::E_INVALID_ARGS;
366 }
367
368 std::string strColumnSql;
369 std::string strRowValueSql;
370 auto value = values.front();
371 size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
372 if (columnSize == 0) {
373 ZLOGE("Insert: columnSize error, can't be 0!");
374 return GeneralError::E_INVALID_ARGS;
375 }
376
377 Values args;
378 std::string strValueSql;
379 for (auto &rowData : values) {
380 if (rowData.size() != columnSize) {
381 ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
382 return GeneralError::E_INVALID_ARGS;
383 }
384 for (auto column = rowData.begin(); column != rowData.end(); ++column) {
385 args.push_back(std::move(column->second));
386 }
387 strValueSql += strRowValueSql + ",";
388 }
389 strValueSql.pop_back();
390 std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
391
392 std::vector<DistributedDB::VBucket> changedData;
393 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
394 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
395 if (delegate_ == nullptr) {
396 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
397 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
398 return GeneralError::E_ERROR;
399 }
400 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
401 if (status != DBStatus::OK) {
402 if (IsPrintLog(status)) {
403 auto time =
404 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
405 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
406 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
407 }
408 return GeneralError::E_ERROR;
409 }
410 return GeneralError::E_OK;
411 }
412
IsPrintLog(DBStatus status)413 bool RdbGeneralStore::IsPrintLog(DBStatus status)
414 {
415 bool isPrint = false;
416 if (status == lastError_) {
417 if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
418 isPrint = true;
419 }
420 } else {
421 isPrint = true;
422 lastErrCnt_ = 0;
423 lastError_ = status;
424 }
425 return isPrint;
426 }
427
428 /**
429 * This function does not support batch updates in rdb, it only supports single data updates.
430 */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)431 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
432 const std::string &whereSql, Values &&conditions)
433 {
434 if (table.empty()) {
435 ZLOGE("Update: table can't be empty!");
436 return GeneralError::E_INVALID_ARGS;
437 }
438 if (setSql.empty() || values.size() == 0) {
439 ZLOGE("Update: setSql and values can't be empty!");
440 return GeneralError::E_INVALID_ARGS;
441 }
442 if (whereSql.empty() || conditions.size() == 0) {
443 ZLOGE("Update: whereSql and conditions can't be empty!");
444 return GeneralError::E_INVALID_ARGS;
445 }
446
447 std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
448 Values args;
449 for (auto &value : values) {
450 args.push_back(std::move(value));
451 }
452 for (auto &condition : conditions) {
453 args.push_back(std::move(condition));
454 }
455
456 std::vector<DistributedDB::VBucket> changedData;
457 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
458 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
459 if (delegate_ == nullptr) {
460 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
461 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
462 return GeneralError::E_ERROR;
463 }
464 auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
465 if (status != DBStatus::OK) {
466 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
467 changedData.size());
468 return GeneralError::E_ERROR;
469 }
470 return GeneralError::E_OK;
471 }
472
Replace(const std::string & table,VBucket && value)473 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
474 {
475 if (table.empty() || value.size() == 0) {
476 return GeneralError::E_INVALID_ARGS;
477 }
478 std::string columnSql;
479 std::string valueSql;
480 SqlConcatenate(value, columnSql, valueSql);
481 std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
482
483 Values args;
484 for (auto &[k, v] : value) {
485 args.emplace_back(std::move(v));
486 }
487 std::vector<DistributedDB::VBucket> changedData;
488 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
489 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
490 if (delegate_ == nullptr) {
491 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
492 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
493 return GeneralError::E_ERROR;
494 }
495 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
496 if (status != DBStatus::OK) {
497 ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
498 status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
499 if (status == DBStatus::BUSY) {
500 return GeneralError::E_BUSY;
501 }
502 return GeneralError::E_ERROR;
503 }
504 return GeneralError::E_OK;
505 }
506
Delete(const std::string & table,const std::string & sql,Values && args)507 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
508 {
509 return 0;
510 }
511
Query(const std::string & table,const std::string & sql,Values && args)512 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
513 const std::string &sql, Values &&args)
514 {
515 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
516 if (delegate_ == nullptr) {
517 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
518 return { GeneralError::E_ALREADY_CLOSED, nullptr };
519 }
520 auto [errCode, records] = QuerySql(sql, std::move(args));
521 return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
522 }
523
Query(const std::string & table,GenQuery & query)524 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
525 {
526 RdbQuery *rdbQuery = nullptr;
527 auto ret = query.QueryInterface(rdbQuery);
528 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
529 ZLOGE("not RdbQuery!");
530 return { GeneralError::E_INVALID_ARGS, nullptr };
531 }
532 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
533 if (delegate_ == nullptr) {
534 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
535 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
536 return { GeneralError::E_ALREADY_CLOSED, nullptr };
537 }
538 if (rdbQuery->IsRemoteQuery()) {
539 if (rdbQuery->GetDevices().size() != 1) {
540 ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
541 return { GeneralError::E_ERROR, nullptr };
542 }
543 auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
544 return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
545 }
546 return { GeneralError::E_ERROR, nullptr };
547 }
548
MergeMigratedData(const std::string & tableName,VBuckets && values)549 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
550 {
551 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
552 if (delegate_ == nullptr) {
553 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
554 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
555 return GeneralError::E_ERROR;
556 }
557
558 auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
559 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
560 }
561
CleanTrackerData(const std::string & tableName,int64_t cursor)562 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
563 {
564 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
565 if (delegate_ == nullptr) {
566 ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
567 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
568 return GeneralError::E_ERROR;
569 }
570
571 auto status = delegate_->CleanTrackerData(tableName, cursor);
572 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
573 }
574
DoCloudSync(const Devices & devices,const DistributedDB::Query & dbQuery,const SyncParam & syncParam,bool isPriority,DetailAsync async)575 std::pair<int32_t, int32_t> RdbGeneralStore::DoCloudSync(const Devices &devices, const DistributedDB::Query &dbQuery,
576 const SyncParam &syncParam, bool isPriority, DetailAsync async)
577 {
578 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
579 auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
580 SyncId syncId = ++syncTaskId_;
581 auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
582 if (executor_ != nullptr && tasks_ != nullptr) {
583 auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
584 tasks_->Insert(syncId, { id, callback });
585 }
586 CloudSyncOption option;
587 option.devices = devices;
588 option.mode = DistributedDB::SyncMode(syncMode);
589 option.query = dbQuery;
590 option.waitTime = syncParam.wait;
591 option.priorityTask = isPriority || highMode == MANUAL_SYNC_MODE;
592 option.priorityLevel = GetPriorityLevel(highMode);
593 option.compensatedSyncOnly = syncParam.isCompensation;
594 option.merge = highMode == AUTO_SYNC_MODE;
595 option.lockAction = LOCK_ACTION;
596 option.prepareTraceId = syncParam.prepareTraceId;
597 option.asyncDownloadAssets = syncParam.asyncDownloadAsset;
598 auto dbStatus = DistributedDB::INVALID_ARGS;
599 dbStatus = delegate_->Sync(option, tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
600 if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
601 return { ConvertStatus(dbStatus), dbStatus };
602 }
603 Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
604 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
605 tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
606 if (executor != nullptr) {
607 executor->Remove(task.taskId);
608 }
609 return false;
610 });
611 return { ConvertStatus(dbStatus), dbStatus };
612 }
613
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)614 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
615 const SyncParam &syncParam)
616 {
617 DistributedDB::Query dbQuery;
618 RdbQuery *rdbQuery = nullptr;
619 bool isPriority = false;
620 auto ret = query.QueryInterface(rdbQuery);
621 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
622 dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
623 } else {
624 dbQuery = rdbQuery->GetQuery();
625 isPriority = rdbQuery->IsPriority();
626 }
627 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
628 auto dbMode = DistributedDB::SyncMode(syncMode);
629 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
630 if (delegate_ == nullptr) {
631 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
632 "wait:%{public}d", devices.size(),
633 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
634 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
635 }
636 auto dbStatus = DistributedDB::INVALID_ARGS;
637 if (syncMode < NEARBY_END) {
638 dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
639 } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
640 return DoCloudSync(devices, dbQuery, syncParam, isPriority, async);
641 }
642 return { ConvertStatus(dbStatus), dbStatus };
643 }
644
PreSharing(GenQuery & query)645 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
646 {
647 RdbQuery *rdbQuery = nullptr;
648 auto ret = query.QueryInterface(rdbQuery);
649 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
650 ZLOGE("not RdbQuery!");
651 return { GeneralError::E_INVALID_ARGS, nullptr };
652 }
653 auto tables = rdbQuery->GetTables();
654 auto statement = rdbQuery->GetStatement();
655 if (statement.empty() || tables.empty()) {
656 ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
657 return { GeneralError::E_INVALID_ARGS, nullptr };
658 }
659 std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
660 VBuckets values;
661 {
662 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
663 if (delegate_ == nullptr) {
664 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
665 return { GeneralError::E_ALREADY_CLOSED, nullptr };
666 }
667 auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
668 values = std::move(ret);
669 }
670 auto rdbCloud = GetRdbCloud();
671 if (rdbCloud == nullptr || values.empty()) {
672 ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
673 values.size());
674 return { GeneralError::E_CLOUD_DISABLED, nullptr };
675 }
676 VBuckets extends = ExtractExtend(values);
677 rdbCloud->PreSharing(*tables.begin(), extends);
678 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
679 ++value, ++extend) {
680 value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
681 value->erase(CLOUD_GID);
682 }
683 return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
684 }
685
ExtractExtend(VBuckets & values) const686 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
687 {
688 VBuckets extends(values.size());
689 for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
690 ++value, ++extend) {
691 auto it = value->find(CLOUD_GID);
692 if (it == value->end()) {
693 continue;
694 }
695 auto gid = std::get_if<std::string>(&(it->second));
696 if (gid == nullptr || gid->empty()) {
697 continue;
698 }
699 extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
700 }
701 return extends;
702 }
703
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const704 std::string RdbGeneralStore::BuildSql(
705 const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
706 {
707 std::string sql = "select ";
708 sql.append(CLOUD_GID);
709 std::string sqlNode = "select rowid";
710 for (auto &column : columns) {
711 sql.append(", ").append(column);
712 sqlNode.append(", ").append(column);
713 }
714 sqlNode.append(" from ").append(table).append(statement);
715 auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
716 sql.append(" from ").append(logTable).append(", (").append(sqlNode);
717 sql.append(") where ").append(DATE_KEY).append(" = rowid");
718 return sql;
719 }
720
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)721 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
722 {
723 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
724 return GeneralError::E_INVALID_ARGS;
725 }
726 DBStatus status = DistributedDB::DB_ERROR;
727 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
728 if (delegate_ == nullptr) {
729 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
730 "tableName:%{public}s",
731 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
732 Anonymous::Change(tableName).c_str());
733 return GeneralError::E_ALREADY_CLOSED;
734 }
735 switch (mode) {
736 case CLOUD_INFO:
737 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
738 if (status == DistributedDB::OK) {
739 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
740 break;
741 }
742 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
743 break;
744 case CLOUD_DATA:
745 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
746 if (status == DistributedDB::OK) {
747 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
748 break;
749 }
750 (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
751 break;
752 case NEARBY_DATA:
753 if (devices.empty()) {
754 status = delegate_->RemoveDeviceData();
755 break;
756 }
757 for (auto device : devices) {
758 status = delegate_->RemoveDeviceData(device, tableName);
759 }
760 break;
761 default:
762 return GeneralError::E_ERROR;
763 }
764 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
765 }
766
Watch(int32_t origin,Watcher & watcher)767 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
768 {
769 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
770 return GeneralError::E_INVALID_ARGS;
771 }
772
773 observer_.watcher_ = &watcher;
774 return GeneralError::E_OK;
775 }
776
Unwatch(int32_t origin,Watcher & watcher)777 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
778 {
779 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
780 return GeneralError::E_INVALID_ARGS;
781 }
782
783 observer_.watcher_ = nullptr;
784 return GeneralError::E_OK;
785 }
786
GetDBBriefCB(DetailAsync async)787 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
788 {
789 if (!async) {
790 return [](auto &) {};
791 }
792 return [async = std::move(async)](
793 const std::map<std::string, std::vector<TableStatus>> &result) {
794 DistributedData::GenDetails details;
795 for (auto &[key, tables] : result) {
796 auto &value = details[key];
797 value.progress = FINISHED;
798 value.code = GeneralError::E_OK;
799 for (auto &table : tables) {
800 if (table.status != DBStatus::OK) {
801 value.code = GeneralError::E_ERROR;
802 }
803 }
804 }
805 async(details);
806 };
807 }
808
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)809 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
810 uint32_t highMode)
811 {
812 std::shared_lock<std::shared_mutex> lock(asyncMutex_);
813 return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
814 rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
815 DistributedData::GenDetails details;
816 for (auto &[id, process] : processes) {
817 bool isDownload = false;
818 auto &detail = details[id];
819 detail.progress = process.process;
820 detail.code = ConvertStatus(process.errCode);
821 detail.dbCode = process.errCode;
822 uint32_t totalCount = 0;
823 for (auto [key, value] : process.tableProcess) {
824 auto &table = detail.details[key];
825 table.upload.total = value.upLoadInfo.total;
826 table.upload.success = value.upLoadInfo.successCount;
827 table.upload.failed = value.upLoadInfo.failCount;
828 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
829 totalCount += table.upload.total;
830 isDownload = table.download.total > 0;
831 table.download.total = value.downLoadInfo.total;
832 table.download.success = value.downLoadInfo.successCount;
833 table.download.failed = value.downLoadInfo.failCount;
834 table.download.untreated = table.download.total - table.download.success - table.download.failed;
835 detail.changeCount = (process.process == FINISHED)
836 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
837 value.downLoadInfo.deleteCount
838 : 0;
839 totalCount += table.download.total;
840 }
841 if (process.process == FINISHED) {
842 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
843 } else {
844 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
845 }
846
847 if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
848 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
849 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
850 }
851 }
852 if (async) {
853 async(details);
854 }
855
856 if (highMode == AUTO_SYNC_MODE && autoAsync
857 && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
858 autoAsync(details);
859 }
860 };
861 }
862
Release()863 int32_t RdbGeneralStore::Release()
864 {
865 auto ref = 1;
866 {
867 std::lock_guard<decltype(mutex_)> lock(mutex_);
868 if (ref_ == 0) {
869 return 0;
870 }
871 ref = --ref_;
872 }
873 ZLOGD("ref:%{public}d", ref);
874 if (ref == 0) {
875 delete this;
876 }
877 return ref;
878 }
879
AddRef()880 int32_t RdbGeneralStore::AddRef()
881 {
882 std::lock_guard<decltype(mutex_)> lock(mutex_);
883 if (ref_ == 0) {
884 return 0;
885 }
886 return ++ref_;
887 }
888
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)889 void RdbGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
890 {
891 ArkDataFaultMsg msg = { .faultType = faultType,
892 .bundleName = storeInfo_.bundleName,
893 .moduleName = ModuleName::RDB_STORE,
894 .storeName = storeInfo_.storeName,
895 .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
896 .appendixMsg = appendix };
897 Reporter::GetInstance()->CloudSyncFault()->Report(msg);
898 }
899
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)900 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
901 const std::vector<Reference> &references)
902 {
903 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
904 if (delegate_ == nullptr) {
905 ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
906 Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
907 return GeneralError::E_ALREADY_CLOSED;
908 }
909 for (const auto &table : tables) {
910 ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
911 auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
912 if (dBStatus != DistributedDB::DBStatus::OK) {
913 ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
914 Anonymous::Change(table).c_str(), dBStatus);
915 Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
916 "SetDistributedTables ret=" + std::to_string(static_cast<int32_t>(dBStatus)));
917 return GeneralError::E_ERROR;
918 }
919 }
920 std::vector<DistributedDB::TableReferenceProperty> properties;
921 for (const auto &reference : references) {
922 properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
923 }
924 auto status = delegate_->SetReference(properties);
925 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::PROPERTY_CHANGED) {
926 ZLOGE("distributed table set reference failed, err:%{public}d", status);
927 return GeneralError::E_ERROR;
928 }
929 auto [exist, database] = GetDistributedSchema(observer_.meta_);
930 if (exist) {
931 delegate_->SetDistributedSchema(GetGaussDistributedSchema(database));
932 }
933 return GeneralError::E_OK;
934 }
935
SetConfig(const StoreConfig & storeConfig)936 void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig)
937 {
938 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
939 if (delegate_ == nullptr) {
940 ZLOGE("database already closed!, tableMode is :%{public}d",
941 storeConfig.tableMode.has_value() ? static_cast<int32_t>(storeConfig.tableMode.value()) : -1);
942 }
943 if (storeConfig.tableMode.has_value()) {
944 RelationalStoreDelegate::StoreConfig config;
945 if (storeConfig.tableMode == DistributedTableMode::COLLABORATION) {
946 config.tableMode = DistributedDB::DistributedTableMode::COLLABORATION;
947 } else if (storeConfig.tableMode == DistributedTableMode::SPLIT_BY_DEVICE) {
948 config.tableMode = DistributedDB::DistributedTableMode::SPLIT_BY_DEVICE;
949 }
950 delegate_->SetStoreConfig(config);
951 }
952 }
953
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)954 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
955 const std::set<std::string> &extendColNames, bool isForceUpgrade)
956 {
957 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
958 if (delegate_ == nullptr) {
959 ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
960 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
961 return GeneralError::E_ALREADY_CLOSED;
962 }
963 auto status = delegate_->SetTrackerTable({ tableName, extendColNames, trackerColNames, isForceUpgrade });
964 if (status == DBStatus::WITH_INVENTORY_DATA) {
965 ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
966 Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
967 return GeneralError::E_WITH_INVENTORY_DATA;
968 }
969 if (status != DBStatus::OK) {
970 ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
971 status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
972 return GeneralError::E_ERROR;
973 }
974 return GeneralError::E_OK;
975 }
976
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)977 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
978 const DistributedDB::RemoteCondition &remoteCondition)
979 {
980 std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
981 DistributedDB::DBStatus status =
982 delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
983 if (status != DistributedDB::DBStatus::OK) {
984 ZLOGE("DistributedDB remote query failed, device:%{public}s, status is %{public}d.",
985 Anonymous::Change(device).c_str(), status);
986 return nullptr;
987 }
988 return std::make_shared<RdbCursor>(dbResultSet);
989 }
990
ConvertStatus(DistributedDB::DBStatus status)991 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
992 {
993 switch (status) {
994 case DBStatus::OK:
995 return GenErr::E_OK;
996 case DBStatus::CLOUD_NETWORK_ERROR:
997 return GenErr::E_NETWORK_ERROR;
998 case DBStatus::CLOUD_LOCK_ERROR:
999 return GenErr::E_LOCKED_BY_OTHERS;
1000 case DBStatus::CLOUD_FULL_RECORDS:
1001 return GenErr::E_RECODE_LIMIT_EXCEEDED;
1002 case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
1003 return GenErr::E_NO_SPACE_FOR_ASSET;
1004 case DBStatus::BUSY:
1005 return GenErr::E_BUSY;
1006 case DBStatus::CLOUD_SYNC_TASK_MERGED:
1007 return GenErr::E_SYNC_TASK_MERGED;
1008 case DBStatus::CLOUD_DISABLED:
1009 return GeneralError::E_CLOUD_DISABLED;
1010 default:
1011 ZLOGI("status:0x%{public}x", status);
1012 break;
1013 }
1014 return GenErr::E_ERROR;
1015 }
1016
IsValid()1017 bool RdbGeneralStore::IsValid()
1018 {
1019 return delegate_ != nullptr;
1020 }
1021
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)1022 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
1023 {
1024 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1025 async_ = std::move(async);
1026 return GenErr::E_OK;
1027 }
1028
UnregisterDetailProgressObserver()1029 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
1030 {
1031 std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1032 async_ = nullptr;
1033 return GenErr::E_OK;
1034 }
1035
QuerySql(const std::string & sql,Values && args)1036 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
1037 {
1038 std::vector<DistributedDB::VBucket> changedData;
1039 std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
1040 auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
1041 if (status != DBStatus::OK) {
1042 ZLOGE("Query failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
1043 Anonymous::Change(sql).c_str(), changedData.size());
1044 if (status == DBStatus::BUSY) {
1045 return { GenErr::E_BUSY, {} };
1046 }
1047 return { GenErr::E_ERROR, {} };
1048 }
1049 return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
1050 }
1051
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)1052 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
1053 uint32_t syncCount)
1054 {
1055 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1056 if (requiredFlag != (requiredFlag & flag)) {
1057 return;
1058 }
1059 StoreInfo info = storeInfo;
1060 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
1061 traceId, syncCount);
1062 EventCenter::GetInstance().PostEvent(std::move(evt));
1063 }
1064
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)1065 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
1066 {
1067 uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1068 if (requiredFlag != (requiredFlag & flag)) {
1069 return;
1070 }
1071 StoreInfo info = storeInfo;
1072 auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
1073 traceId);
1074 EventCenter::GetInstance().PostEvent(std::move(evt));
1075 }
1076
GetTables()1077 std::set<std::string> RdbGeneralStore::GetTables()
1078 {
1079 std::set<std::string> tables;
1080 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1081 if (delegate_ == nullptr) {
1082 ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1083 return tables;
1084 }
1085 auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
1086 if (errCode != GenErr::E_OK) {
1087 return tables;
1088 }
1089 for (auto &table : res) {
1090 auto it = table.find("name");
1091 if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
1092 ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1093 continue;
1094 }
1095 tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
1096 }
1097 return tables;
1098 }
1099
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)1100 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
1101 const std::set<std::string> &localTables)
1102 {
1103 std::vector<std::string> res;
1104 for (auto &it : syncTables) {
1105 if (localTables.count(it) &&
1106 localTables.count(RelationalStoreManager::GetDistributedLogTableName(it))) {
1107 res.push_back(std::move(it));
1108 }
1109 }
1110 return res;
1111 }
1112
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)1113 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1114 const std::vector<std::string> &tables, ChangeType type)
1115 {
1116 RemoteChangeEvent::DataInfo info;
1117 info.userId = meta.user;
1118 info.storeId = meta.storeId;
1119 info.deviceId = meta.deviceId;
1120 info.bundleName = meta.bundleName;
1121 info.tables = tables;
1122 info.changeType = type;
1123 auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1124 EventCenter::GetInstance().PostEvent(std::move(evt));
1125 }
1126
OnChange(const DBChangedIF & data)1127 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1128 {
1129 if (!HasWatcher()) {
1130 return;
1131 }
1132 std::string device = data.GetDataChangeDevice();
1133 auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1134 ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1135 Anonymous::Change(device).c_str());
1136 GenOrigin genOrigin;
1137 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1138 genOrigin.dataType = GenOrigin::BASIC_DATA;
1139 DistributedDB::StoreProperty property;
1140 data.GetStoreProperty(property);
1141 genOrigin.id.push_back(networkId);
1142 genOrigin.store = storeId_;
1143 GeneralWatcher::ChangeInfo changeInfo{};
1144 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1145 return;
1146 }
1147
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1148 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1149 {
1150 if (!HasWatcher()) {
1151 return;
1152 }
1153 ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1154 Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1155 GenOrigin genOrigin;
1156 genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1157 ? GenOrigin::ORIGIN_LOCAL
1158 : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1159 genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1160 genOrigin.id.push_back(originalId);
1161 genOrigin.store = storeId_;
1162 Watcher::PRIFields fields;
1163 Watcher::ChangeInfo changeInfo;
1164 bool notifyFlag = false;
1165 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1166 auto &info = changeInfo[data.tableName][i];
1167 for (auto &priData : data.primaryData[i]) {
1168 Watcher::PRIValue value;
1169 if (priData.empty()) {
1170 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
1171 Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1172 Anonymous::Change(originalId).c_str(), i);
1173 continue;
1174 }
1175 Convert(std::move(*(priData.begin())), value);
1176 if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1177 info.push_back(std::move(value));
1178 continue;
1179 }
1180 auto deleteKey = std::get_if<std::string>(&value);
1181 if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1182 // notify to start app
1183 notifyFlag = true;
1184 }
1185 info.push_back(std::move(value));
1186 }
1187 }
1188 if (notifyFlag) {
1189 ZLOGI("post data change for cleaning cloud data");
1190 PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1191 }
1192 if (!data.field.empty()) {
1193 fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1194 }
1195 watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1196 }
1197
LockCloudDB()1198 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1199 {
1200 auto rdbCloud = GetRdbCloud();
1201 if (rdbCloud == nullptr) {
1202 return { GeneralError::E_ERROR, 0 };
1203 }
1204 return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1205 }
1206
UnLockCloudDB()1207 int32_t RdbGeneralStore::UnLockCloudDB()
1208 {
1209 auto rdbCloud = GetRdbCloud();
1210 if (rdbCloud == nullptr) {
1211 return GeneralError::E_ERROR;
1212 }
1213 return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1214 }
1215
GetRdbCloud() const1216 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1217 {
1218 std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1219 return rdbCloud_;
1220 }
1221
IsFinished(SyncId syncId) const1222 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1223 {
1224 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1225 if (delegate_ == nullptr) {
1226 ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1227 return true;
1228 }
1229 return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1230 }
1231
GetFinishTask(SyncId syncId)1232 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1233 {
1234 return [this, executor = executor_, task = tasks_, syncId]() {
1235 auto [exist, finishTask] = task->Find(syncId);
1236 if (!exist || finishTask.cb == nullptr) {
1237 task->Erase(syncId);
1238 return;
1239 }
1240 if (!IsFinished(syncId)) {
1241 task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1242 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1243 return true;
1244 });
1245 return;
1246 }
1247 DBProcessCB cb;
1248 task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1249 cb = task.cb;
1250 return false;
1251 });
1252 if (cb != nullptr) {
1253 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1254 Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1255 std::map<std::string, SyncProcess> result;
1256 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1257 cb(result);
1258 }
1259 };
1260 }
1261
SetExecutor(std::shared_ptr<Executor> executor)1262 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1263 {
1264 if (executor_ == nullptr) {
1265 executor_ = executor;
1266 }
1267 }
1268
RemoveTasks()1269 void RdbGeneralStore::RemoveTasks()
1270 {
1271 if (tasks_ == nullptr) {
1272 return;
1273 }
1274 std::list<DBProcessCB> cbs;
1275 std::list<TaskId> taskIds;
1276 tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1277 if (task.cb != nullptr) {
1278 ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1279 syncId);
1280 }
1281 cbs.push_back(std::move(task.cb));
1282 taskIds.push_back(task.taskId);
1283 return true;
1284 });
1285 auto func = [](const std::list<DBProcessCB> &cbs) {
1286 std::map<std::string, SyncProcess> result;
1287 result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1288 for (auto &cb : cbs) {
1289 if (cb != nullptr) {
1290 cb(result);
1291 }
1292 }
1293 };
1294 if (executor_ != nullptr) {
1295 for (auto taskId: taskIds) {
1296 executor_->Remove(taskId, true);
1297 }
1298 executor_->Execute([cbs, func]() {
1299 func(cbs);
1300 });
1301 } else {
1302 func(cbs);
1303 }
1304 }
1305
GetCB(SyncId syncId)1306 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1307 {
1308 return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1309 if (task == nullptr) {
1310 return;
1311 }
1312 DBProcessCB cb;
1313 task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1314 cb = finishTask.cb;
1315 bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1316 if (isFinished) {
1317 finishTask.cb = nullptr;
1318 }
1319 return true;
1320 });
1321 if (cb != nullptr) {
1322 cb(progress);
1323 }
1324 return;
1325 };
1326 }
1327 } // namespace OHOS::DistributedRdb