1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbGeneralStore"
16 #include "rdb_general_store.h"
17 #include "cloud_service.h"
18 #include "cloud/asset_loader.h"
19 #include "cloud/cloud_db.h"
20 #include "cloud/schema_meta.h"
21 #include "crypto_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "rdb_cursor.h"
26 #include "rdb_helper.h"
27 #include "rdb_query.h"
28 #include "relational_store_manager.h"
29 #include "utils/anonymous.h"
30 #include "value_proxy.h"
31 #include "device_manager_adapter.h"
32 #include "rdb_result_set_impl.h"
33 namespace OHOS::DistributedRdb {
34 using namespace DistributedData;
35 using namespace DistributedDB;
36 using namespace NativeRdb;
37 using namespace CloudData;
38 using DBField = DistributedDB::Field;
39 using DBTable = DistributedDB::TableSchema;
40 using DBSchema = DistributedDB::DataBaseSchema;
41 using ClearMode = DistributedDB::ClearMode;
42 using DBStatus = DistributedDB::DBStatus;
43 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
44
RdbGeneralStore(const StoreMetaData & meta)45 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta) : manager_(meta.appId, meta.user, meta.instanceId)
46 {
47 observer_.storeId_ = meta.storeId;
48 RelationalStoreDelegate::Option option;
49 if (meta.isEncrypt) {
50 std::string key = meta.GetSecretKey();
51 SecretKeyMetaData secretKeyMeta;
52 MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
53 std::vector<uint8_t> decryptKey;
54 CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
55 if (option.passwd.SetValue(decryptKey.data(), decryptKey.size()) != CipherPassword::OK) {
56 std::fill(decryptKey.begin(), decryptKey.end(), 0);
57 }
58 std::fill(decryptKey.begin(), decryptKey.end(), 0);
59 option.isEncryptedDb = meta.isEncrypt;
60 option.iterateTimes = ITERATE_TIMES;
61 option.cipher = CipherType::AES_256_GCM;
62 }
63 option.observer = &observer_;
64 manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
65 }
66
~RdbGeneralStore()67 RdbGeneralStore::~RdbGeneralStore()
68 {
69 manager_.CloseStore(delegate_);
70 delegate_ = nullptr;
71 bindInfo_.loader_ = nullptr;
72 if (bindInfo_.db_ != nullptr) {
73 bindInfo_.db_->Close();
74 }
75 bindInfo_.db_ = nullptr;
76 rdbCloud_ = nullptr;
77 rdbLoader_ = nullptr;
78 }
79
Bind(const Database & database,BindInfo bindInfo)80 int32_t RdbGeneralStore::Bind(const Database &database, BindInfo bindInfo)
81 {
82 if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
83 return GeneralError::E_INVALID_ARGS;
84 }
85
86 if (isBound_.exchange(true)) {
87 return GeneralError::E_OK;
88 }
89
90 bindInfo_ = std::move(bindInfo);
91 rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_);
92 rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_);
93 DBSchema schema;
94 schema.tables.resize(database.tables.size());
95 for (size_t i = 0; i < database.tables.size(); i++) {
96 const Table &table = database.tables[i];
97 DBTable &dbTable = schema.tables[i];
98 dbTable.name = table.name;
99 for (auto &field : table.fields) {
100 DBField dbField;
101 dbField.colName = field.colName;
102 dbField.type = field.type;
103 dbField.primary = field.primary;
104 dbField.nullable = field.nullable;
105 dbTable.fields.push_back(std::move(dbField));
106 }
107 }
108 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
109 if (delegate_ == nullptr) {
110 ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
111 return GeneralError::E_ALREADY_CLOSED;
112 }
113 delegate_->SetCloudDB(rdbCloud_);
114 delegate_->SetIAssetLoader(rdbLoader_);
115 delegate_->SetCloudDbSchema(std::move(schema));
116 return GeneralError::E_OK;
117 }
118
IsBound()119 bool RdbGeneralStore::IsBound()
120 {
121 return isBound_;
122 }
123
Close()124 int32_t RdbGeneralStore::Close()
125 {
126 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
127 if (delegate_ == nullptr) {
128 return 0;
129 }
130 int32_t count = delegate_->GetCloudSyncTaskCount();
131 if (count > 0) {
132 return GeneralError::E_BUSY;
133 }
134 auto status = manager_.CloseStore(delegate_);
135 if (status != DBStatus::OK) {
136 return status;
137 }
138 delegate_ = nullptr;
139 bindInfo_.loader_ = nullptr;
140 if (bindInfo_.db_ != nullptr) {
141 bindInfo_.db_->Close();
142 }
143 bindInfo_.db_ = nullptr;
144 rdbCloud_ = nullptr;
145 rdbLoader_ = nullptr;
146 return 0;
147 }
148
Execute(const std::string & table,const std::string & sql)149 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
150 {
151 return GeneralError::E_OK;
152 }
153
BatchInsert(const std::string & table,VBuckets && values)154 int32_t RdbGeneralStore::BatchInsert(const std::string &table, VBuckets &&values)
155 {
156 return 0;
157 }
158
BatchUpdate(const std::string & table,const std::string & sql,VBuckets && values)159 int32_t RdbGeneralStore::BatchUpdate(const std::string &table, const std::string &sql, VBuckets &&values)
160 {
161 return 0;
162 }
163
Delete(const std::string & table,const std::string & sql,Values && args)164 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
165 {
166 return 0;
167 }
168
Query(const std::string & table,const std::string & sql,Values && args)169 std::shared_ptr<Cursor> RdbGeneralStore::Query(const std::string &table, const std::string &sql, Values &&args)
170 {
171 return std::shared_ptr<Cursor>();
172 }
173
Query(const std::string & table,GenQuery & query)174 std::shared_ptr<Cursor> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
175 {
176 RdbQuery *rdbQuery = nullptr;
177 auto ret = query.QueryInterface(rdbQuery);
178 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
179 ZLOGE("not RdbQuery!");
180 return nullptr;
181 }
182 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
183 if (delegate_ == nullptr) {
184 ZLOGE("database already closed! tables name:%{public}s", Anonymous::Change(table).c_str());
185 return nullptr;
186 }
187 if (rdbQuery->IsRemoteQuery()) {
188 if (rdbQuery->GetDevices().size() != 1) {
189 ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
190 return nullptr;
191 }
192 return RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
193 }
194 return nullptr;
195 }
196
Sync(const Devices & devices,int32_t mode,GenQuery & query,DetailAsync async,int32_t wait)197 int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &query, DetailAsync async, int32_t wait)
198 {
199 DistributedDB::Query dbQuery;
200 RdbQuery *rdbQuery = nullptr;
201 auto ret = query.QueryInterface(rdbQuery);
202 if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
203 dbQuery.FromTable(query.GetTables());
204 } else {
205 dbQuery = rdbQuery->GetQuery();
206 }
207 auto dbMode = DistributedDB::SyncMode(mode);
208 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
209 if (delegate_ == nullptr) {
210 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
211 "wait:%{public}d",
212 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode, wait);
213 return GeneralError::E_ALREADY_CLOSED;
214 }
215 auto status = (mode < NEARBY_END)
216 ? delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), wait != 0)
217 : (mode > NEARBY_END && mode < CLOUD_END)
218 ? delegate_->Sync(devices, dbMode, dbQuery, GetDBProcessCB(std::move(async)), wait)
219 : DistributedDB::INVALID_ARGS;
220 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
221 }
222
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)223 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
224 {
225 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
226 return GeneralError::E_INVALID_ARGS;
227 }
228 DBStatus status;
229 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
230 if (delegate_ == nullptr) {
231 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
232 "tableName:%{public}s",
233 devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
234 Anonymous::Change(tableName).c_str());
235 return GeneralError::E_ALREADY_CLOSED;
236 }
237 switch (mode) {
238 case CLOUD_INFO:
239 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
240 break;
241 case CLOUD_DATA:
242 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
243 break;
244 case NEARBY_DATA:
245 if (devices.empty()) {
246 status = delegate_->RemoveDeviceData();
247 break;
248 }
249 for (auto device : devices) {
250 status = delegate_->RemoveDeviceData(device, tableName);
251 }
252 break;
253 default:
254 return GeneralError::E_ERROR;
255 }
256 return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
257 }
258
Watch(int32_t origin,Watcher & watcher)259 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
260 {
261 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
262 return GeneralError::E_INVALID_ARGS;
263 }
264
265 observer_.watcher_ = &watcher;
266 return GeneralError::E_OK;
267 }
268
Unwatch(int32_t origin,Watcher & watcher)269 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
270 {
271 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
272 return GeneralError::E_INVALID_ARGS;
273 }
274
275 observer_.watcher_ = nullptr;
276 return GeneralError::E_OK;
277 }
278
GetDBBriefCB(DetailAsync async)279 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
280 {
281 if (!async) {
282 return [](auto &) {};
283 }
284 return [async = std::move(async)](const std::map<std::string, std::vector<TableStatus>> &result) {
285 DistributedData::GenDetails details;
286 for (auto &[key, tables] : result) {
287 auto &value = details[key];
288 value.progress = FINISHED;
289 value.code = GeneralError::E_OK;
290 for (auto &table : tables) {
291 if (table.status != DBStatus::OK) {
292 value.code = GeneralError::E_ERROR;
293 }
294 }
295 }
296 async(details);
297 };
298 }
299
GetDBProcessCB(DetailAsync async)300 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async)
301 {
302 if (!async) {
303 return [](auto &) {};
304 }
305
306 return [async = std::move(async)](const std::map<std::string, SyncProcess> &processes) {
307 DistributedData::GenDetails details;
308 for (auto &[id, process] : processes) {
309 auto &detail = details[id];
310 detail.progress = process.process;
311 detail.code = ConvertStatus(process.errCode);
312 for (auto [key, value] : process.tableProcess) {
313 auto &table = detail.details[key];
314 table.upload.total = value.upLoadInfo.total;
315 table.upload.success = value.upLoadInfo.successCount;
316 table.upload.failed = value.upLoadInfo.failCount;
317 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
318 table.download.total = value.downLoadInfo.total;
319 table.download.success = value.downLoadInfo.successCount;
320 table.download.failed = value.downLoadInfo.failCount;
321 table.download.untreated = table.download.total - table.download.success - table.download.failed;
322 }
323 }
324 async(details);
325 };
326 }
327
Release()328 int32_t RdbGeneralStore::Release()
329 {
330 auto ref = 1;
331 {
332 std::lock_guard<decltype(mutex_)> lock(mutex_);
333 if (ref_ == 0) {
334 return 0;
335 }
336 ref = --ref_;
337 }
338 ZLOGD("ref:%{public}d", ref);
339 if (ref == 0) {
340 delete this;
341 }
342 return ref;
343 }
344
AddRef()345 int32_t RdbGeneralStore::AddRef()
346 {
347 std::lock_guard<decltype(mutex_)> lock(mutex_);
348 if (ref_ == 0) {
349 return 0;
350 }
351 return ++ref_;
352 }
353
SetDistributedTables(const std::vector<std::string> & tables,int32_t type)354 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type)
355 {
356 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
357 if (delegate_ == nullptr) {
358 ZLOGE("database already closed! tables size:%{public}zu, type:%{public}d", tables.size(), type);
359 return GeneralError::E_ALREADY_CLOSED;
360 }
361 for (const auto &table : tables) {
362 ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
363 auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
364 if (dBStatus != DistributedDB::DBStatus::OK) {
365 ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
366 Anonymous::Change(table).c_str(), dBStatus);
367 return GeneralError::E_ERROR;
368 }
369 }
370 return GeneralError::E_OK;
371 }
372
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)373 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
374 const DistributedDB::RemoteCondition &remoteCondition)
375 {
376 std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
377 DistributedDB::DBStatus status =
378 delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
379 if (status != DistributedDB::DBStatus::OK) {
380 ZLOGE("DistributedDB remote query failed, device:%{public}s, status is %{public}d.",
381 Anonymous::Change(device).c_str(), status);
382 return nullptr;
383 }
384 return std::make_shared<RdbCursor>(dbResultSet);
385 }
386
ConvertStatus(DistributedDB::DBStatus status)387 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
388 {
389 switch (status) {
390 case DBStatus::OK:
391 return GenErr::E_OK;
392 case DBStatus::CLOUD_NETWORK_ERROR:
393 return GenErr::E_NETWORK_ERROR;
394 case DBStatus::CLOUD_LOCK_ERROR:
395 return GenErr::E_LOCKED_BY_OTHERS;
396 case DBStatus::CLOUD_FULL_RECORDS:
397 return GenErr::E_RECODE_LIMIT_EXCEEDED;
398 case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
399 return GenErr::E_NO_SPACE_FOR_ASSET;
400 default:
401 ZLOGI("status:0x%{public}x", status);
402 break;
403 }
404 return GenErr::E_ERROR;
405 }
406
OnChange(const DBChangedIF & data)407 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
408 {
409 if (!HasWatcher()) {
410 return;
411 }
412 std::string device = data.GetDataChangeDevice();
413 auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
414 ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
415 Anonymous::Change(device).c_str());
416 GenOrigin genOrigin;
417 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
418 genOrigin.dataType = GenOrigin::BASIC_DATA;
419 DistributedDB::StoreProperty property;
420 data.GetStoreProperty(property);
421 genOrigin.id.push_back(networkId);
422 genOrigin.store = storeId_;
423 watcher_->OnChange(genOrigin, {}, {});
424 return;
425 }
426
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)427 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
428 {
429 if (!HasWatcher()) {
430 return;
431 }
432 ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
433 Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
434 GenOrigin genOrigin;
435 genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL) ? GenOrigin::ORIGIN_LOCAL
436 : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD
437 : GenOrigin::ORIGIN_NEARBY;
438 genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
439 genOrigin.id.push_back(originalId);
440 genOrigin.store = storeId_;
441 Watcher::PRIFields fields;
442 Watcher::ChangeInfo changeInfo;
443 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
444 auto &info = changeInfo[data.tableName][i];
445 for (auto &priData : data.primaryData[i]) {
446 Watcher::PRIValue value;
447 Convert(std::move(*(priData.begin())), value);
448 info.push_back(std::move(value));
449 }
450 }
451 if (!data.field.empty()) {
452 fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
453 }
454 watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
455 }
456 } // namespace OHOS::DistributedRdb
457