• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbGeneralStore"
16 #include "rdb_general_store.h"
17 #include "cloud_service.h"
18 #include "cloud/asset_loader.h"
19 #include "cloud/cloud_db.h"
20 #include "cloud/schema_meta.h"
21 #include "crypto_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "rdb_cursor.h"
26 #include "rdb_helper.h"
27 #include "rdb_query.h"
28 #include "relational_store_manager.h"
29 #include "utils/anonymous.h"
30 #include "value_proxy.h"
31 #include "device_manager_adapter.h"
32 #include "rdb_result_set_impl.h"
33 namespace OHOS::DistributedRdb {
34 using namespace DistributedData;
35 using namespace DistributedDB;
36 using namespace NativeRdb;
37 using namespace  CloudData;
38 using DBField = DistributedDB::Field;
39 using DBTable = DistributedDB::TableSchema;
40 using DBSchema = DistributedDB::DataBaseSchema;
41 using ClearMode = DistributedDB::ClearMode;
42 using DBStatus = DistributedDB::DBStatus;
43 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
44 
RdbGeneralStore(const StoreMetaData & meta)45 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta) : manager_(meta.appId, meta.user, meta.instanceId)
46 {
47     observer_.storeId_ = meta.storeId;
48     RelationalStoreDelegate::Option option;
49     if (meta.isEncrypt) {
50         std::string key = meta.GetSecretKey();
51         SecretKeyMetaData secretKeyMeta;
52         MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
53         std::vector<uint8_t> decryptKey;
54         CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
55         if (option.passwd.SetValue(decryptKey.data(), decryptKey.size()) != CipherPassword::OK) {
56             std::fill(decryptKey.begin(), decryptKey.end(), 0);
57         }
58         std::fill(decryptKey.begin(), decryptKey.end(), 0);
59         option.isEncryptedDb = meta.isEncrypt;
60         option.iterateTimes = ITERATE_TIMES;
61         option.cipher = CipherType::AES_256_GCM;
62     }
63     option.observer = &observer_;
64     manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
65 }
66 
~RdbGeneralStore()67 RdbGeneralStore::~RdbGeneralStore()
68 {
69     manager_.CloseStore(delegate_);
70     delegate_ = nullptr;
71     bindInfo_.loader_ = nullptr;
72     if (bindInfo_.db_ != nullptr) {
73         bindInfo_.db_->Close();
74     }
75     bindInfo_.db_ = nullptr;
76     rdbCloud_ = nullptr;
77     rdbLoader_ = nullptr;
78 }
79 
Bind(const Database & database,BindInfo bindInfo)80 int32_t RdbGeneralStore::Bind(const Database &database, BindInfo bindInfo)
81 {
82     if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
83         return GeneralError::E_INVALID_ARGS;
84     }
85 
86     if (isBound_.exchange(true)) {
87         return GeneralError::E_OK;
88     }
89 
90     bindInfo_ = std::move(bindInfo);
91     rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_);
92     rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_);
93     DBSchema schema;
94     schema.tables.resize(database.tables.size());
95     for (size_t i = 0; i < database.tables.size(); i++) {
96         const Table &table = database.tables[i];
97         DBTable &dbTable = schema.tables[i];
98         dbTable.name = table.name;
99         for (auto &field : table.fields) {
100             DBField dbField;
101             dbField.colName = field.colName;
102             dbField.type = field.type;
103             dbField.primary = field.primary;
104             dbField.nullable = field.nullable;
105             dbTable.fields.push_back(std::move(dbField));
106         }
107     }
108     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
109     if (delegate_ == nullptr) {
110         ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
111         return GeneralError::E_ALREADY_CLOSED;
112     }
113     delegate_->SetCloudDB(rdbCloud_);
114     delegate_->SetIAssetLoader(rdbLoader_);
115     delegate_->SetCloudDbSchema(std::move(schema));
116     return GeneralError::E_OK;
117 }
118 
IsBound()119 bool RdbGeneralStore::IsBound()
120 {
121     return isBound_;
122 }
123 
Close()124 int32_t RdbGeneralStore::Close()
125 {
126     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
127     if (delegate_ == nullptr) {
128         return 0;
129     }
130     int32_t count = delegate_->GetCloudSyncTaskCount();
131     if (count > 0) {
132         return GeneralError::E_BUSY;
133     }
134     auto status = manager_.CloseStore(delegate_);
135     if (status != DBStatus::OK) {
136         return status;
137     }
138     delegate_ = nullptr;
139     bindInfo_.loader_ = nullptr;
140     if (bindInfo_.db_ != nullptr) {
141         bindInfo_.db_->Close();
142     }
143     bindInfo_.db_ = nullptr;
144     rdbCloud_ = nullptr;
145     rdbLoader_ = nullptr;
146     return 0;
147 }
148 
Execute(const std::string & table,const std::string & sql)149 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
150 {
151     return GeneralError::E_OK;
152 }
153 
BatchInsert(const std::string & table,VBuckets && values)154 int32_t RdbGeneralStore::BatchInsert(const std::string &table, VBuckets &&values)
155 {
156     return 0;
157 }
158 
BatchUpdate(const std::string & table,const std::string & sql,VBuckets && values)159 int32_t RdbGeneralStore::BatchUpdate(const std::string &table, const std::string &sql, VBuckets &&values)
160 {
161     return 0;
162 }
163 
Delete(const std::string & table,const std::string & sql,Values && args)164 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
165 {
166     return 0;
167 }
168 
Query(const std::string & table,const std::string & sql,Values && args)169 std::shared_ptr<Cursor> RdbGeneralStore::Query(const std::string &table, const std::string &sql, Values &&args)
170 {
171     return std::shared_ptr<Cursor>();
172 }
173 
Query(const std::string & table,GenQuery & query)174 std::shared_ptr<Cursor> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
175 {
176     RdbQuery *rdbQuery = nullptr;
177     auto ret = query.QueryInterface(rdbQuery);
178     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
179         ZLOGE("not RdbQuery!");
180         return nullptr;
181     }
182     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
183     if (delegate_ == nullptr) {
184         ZLOGE("database already closed! tables name:%{public}s", Anonymous::Change(table).c_str());
185         return nullptr;
186     }
187     if (rdbQuery->IsRemoteQuery()) {
188         if (rdbQuery->GetDevices().size() != 1) {
189             ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
190             return nullptr;
191         }
192         return RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
193     }
194     return nullptr;
195 }
196 
Sync(const Devices & devices,int32_t mode,GenQuery & query,DetailAsync async,int32_t wait)197 int32_t RdbGeneralStore::Sync(const Devices &devices, int32_t mode, GenQuery &query, DetailAsync async, int32_t wait)
198 {
199     DistributedDB::Query dbQuery;
200     RdbQuery *rdbQuery = nullptr;
201     auto ret = query.QueryInterface(rdbQuery);
202     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
203         dbQuery.FromTable(query.GetTables());
204     } else {
205         dbQuery = rdbQuery->GetQuery();
206     }
207     auto dbMode = DistributedDB::SyncMode(mode);
208     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
209     if (delegate_ == nullptr) {
210         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
211               "wait:%{public}d",
212             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode, wait);
213         return GeneralError::E_ALREADY_CLOSED;
214     }
215     auto status = (mode < NEARBY_END)
216                   ? delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), wait != 0)
217                   : (mode > NEARBY_END && mode < CLOUD_END)
218                   ? delegate_->Sync(devices, dbMode, dbQuery, GetDBProcessCB(std::move(async)), wait)
219                   : DistributedDB::INVALID_ARGS;
220     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
221 }
222 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)223 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
224 {
225     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
226         return GeneralError::E_INVALID_ARGS;
227     }
228     DBStatus status;
229     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
230     if (delegate_ == nullptr) {
231         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
232               "tableName:%{public}s",
233             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
234             Anonymous::Change(tableName).c_str());
235         return GeneralError::E_ALREADY_CLOSED;
236     }
237     switch (mode) {
238         case CLOUD_INFO:
239             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
240             break;
241         case CLOUD_DATA:
242             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
243             break;
244         case NEARBY_DATA:
245             if (devices.empty()) {
246                 status = delegate_->RemoveDeviceData();
247                 break;
248             }
249             for (auto device : devices) {
250                 status = delegate_->RemoveDeviceData(device, tableName);
251             }
252             break;
253         default:
254             return GeneralError::E_ERROR;
255     }
256     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
257 }
258 
Watch(int32_t origin,Watcher & watcher)259 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
260 {
261     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
262         return GeneralError::E_INVALID_ARGS;
263     }
264 
265     observer_.watcher_ = &watcher;
266     return GeneralError::E_OK;
267 }
268 
Unwatch(int32_t origin,Watcher & watcher)269 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
270 {
271     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
272         return GeneralError::E_INVALID_ARGS;
273     }
274 
275     observer_.watcher_ = nullptr;
276     return GeneralError::E_OK;
277 }
278 
GetDBBriefCB(DetailAsync async)279 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
280 {
281     if (!async) {
282         return [](auto &) {};
283     }
284     return [async = std::move(async)](const std::map<std::string, std::vector<TableStatus>> &result) {
285         DistributedData::GenDetails details;
286         for (auto &[key, tables] : result) {
287             auto &value = details[key];
288             value.progress = FINISHED;
289             value.code = GeneralError::E_OK;
290             for (auto &table : tables) {
291                 if (table.status != DBStatus::OK) {
292                     value.code = GeneralError::E_ERROR;
293                 }
294             }
295         }
296         async(details);
297     };
298 }
299 
GetDBProcessCB(DetailAsync async)300 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async)
301 {
302     if (!async) {
303         return [](auto &) {};
304     }
305 
306     return [async = std::move(async)](const std::map<std::string, SyncProcess> &processes) {
307         DistributedData::GenDetails details;
308         for (auto &[id, process] : processes) {
309             auto &detail = details[id];
310             detail.progress = process.process;
311             detail.code = ConvertStatus(process.errCode);
312             for (auto [key, value] : process.tableProcess) {
313                 auto &table = detail.details[key];
314                 table.upload.total = value.upLoadInfo.total;
315                 table.upload.success = value.upLoadInfo.successCount;
316                 table.upload.failed = value.upLoadInfo.failCount;
317                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
318                 table.download.total = value.downLoadInfo.total;
319                 table.download.success = value.downLoadInfo.successCount;
320                 table.download.failed = value.downLoadInfo.failCount;
321                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
322             }
323         }
324         async(details);
325     };
326 }
327 
Release()328 int32_t RdbGeneralStore::Release()
329 {
330     auto ref = 1;
331     {
332         std::lock_guard<decltype(mutex_)> lock(mutex_);
333         if (ref_ == 0) {
334             return 0;
335         }
336         ref = --ref_;
337     }
338     ZLOGD("ref:%{public}d", ref);
339     if (ref == 0) {
340         delete this;
341     }
342     return ref;
343 }
344 
AddRef()345 int32_t RdbGeneralStore::AddRef()
346 {
347     std::lock_guard<decltype(mutex_)> lock(mutex_);
348     if (ref_ == 0) {
349         return 0;
350     }
351     return ++ref_;
352 }
353 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type)354 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type)
355 {
356     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
357     if (delegate_ == nullptr) {
358         ZLOGE("database already closed! tables size:%{public}zu, type:%{public}d", tables.size(), type);
359         return GeneralError::E_ALREADY_CLOSED;
360     }
361     for (const auto &table : tables) {
362         ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
363         auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
364         if (dBStatus != DistributedDB::DBStatus::OK) {
365             ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
366                 Anonymous::Change(table).c_str(), dBStatus);
367             return GeneralError::E_ERROR;
368         }
369     }
370     return GeneralError::E_OK;
371 }
372 
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)373 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
374     const DistributedDB::RemoteCondition &remoteCondition)
375 {
376     std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
377     DistributedDB::DBStatus status =
378         delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
379     if (status != DistributedDB::DBStatus::OK) {
380         ZLOGE("DistributedDB remote query failed, device:%{public}s, status is  %{public}d.",
381             Anonymous::Change(device).c_str(), status);
382         return nullptr;
383     }
384     return std::make_shared<RdbCursor>(dbResultSet);
385 }
386 
ConvertStatus(DistributedDB::DBStatus status)387 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
388 {
389     switch (status) {
390         case DBStatus::OK:
391             return GenErr::E_OK;
392         case DBStatus::CLOUD_NETWORK_ERROR:
393             return GenErr::E_NETWORK_ERROR;
394         case DBStatus::CLOUD_LOCK_ERROR:
395             return GenErr::E_LOCKED_BY_OTHERS;
396         case DBStatus::CLOUD_FULL_RECORDS:
397             return GenErr::E_RECODE_LIMIT_EXCEEDED;
398         case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
399             return GenErr::E_NO_SPACE_FOR_ASSET;
400         default:
401             ZLOGI("status:0x%{public}x", status);
402             break;
403     }
404     return GenErr::E_ERROR;
405 }
406 
OnChange(const DBChangedIF & data)407 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
408 {
409     if (!HasWatcher()) {
410         return;
411     }
412     std::string device = data.GetDataChangeDevice();
413     auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
414     ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
415         Anonymous::Change(device).c_str());
416     GenOrigin genOrigin;
417     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
418     genOrigin.dataType = GenOrigin::BASIC_DATA;
419     DistributedDB::StoreProperty property;
420     data.GetStoreProperty(property);
421     genOrigin.id.push_back(networkId);
422     genOrigin.store = storeId_;
423     watcher_->OnChange(genOrigin, {}, {});
424     return;
425 }
426 
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)427 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
428 {
429     if (!HasWatcher()) {
430         return;
431     }
432     ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
433         Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
434     GenOrigin genOrigin;
435     genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)   ? GenOrigin::ORIGIN_LOCAL
436                        : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD
437                                                             : GenOrigin::ORIGIN_NEARBY;
438     genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
439     genOrigin.id.push_back(originalId);
440     genOrigin.store = storeId_;
441     Watcher::PRIFields fields;
442     Watcher::ChangeInfo changeInfo;
443     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
444         auto &info = changeInfo[data.tableName][i];
445         for (auto &priData : data.primaryData[i]) {
446             Watcher::PRIValue value;
447             Convert(std::move(*(priData.begin())), value);
448             info.push_back(std::move(value));
449         }
450     }
451     if (!data.field.empty()) {
452         fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
453     }
454     watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
455 }
456 } // namespace OHOS::DistributedRdb
457