• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "flat_object_storage_engine.h"
16 
17 #include "logger.h"
18 #include "objectstore_errors.h"
19 #include "process_communicator_impl.h"
20 #include "securec.h"
21 #include "softbus_adapter.h"
22 #include "string_utils.h"
23 #include "types_export.h"
24 
25 namespace OHOS::ObjectStore {
~FlatObjectStorageEngine()26 FlatObjectStorageEngine::~FlatObjectStorageEngine()
27 {
28     if (!isOpened_) {
29         return;
30     }
31     storeManager_ = nullptr;
32     LOG_INFO("FlatObjectStorageEngine::~FlatObjectStorageEngine Crash! end");
33 }
34 
Open(const std::string & bundleName)35 uint32_t FlatObjectStorageEngine::Open(const std::string &bundleName)
36 {
37     if (isOpened_) {
38         LOG_INFO("FlatObjectDatabase: No need to reopen it");
39         return SUCCESS;
40     }
41     auto status = DistributedDB::KvStoreDelegateManager::SetProcessLabel("objectstoreDB", bundleName);
42     if (status != DistributedDB::DBStatus::OK) {
43         LOG_ERROR("delegate SetProcessLabel failed: %{public}d.", static_cast<int>(status));
44     }
45 
46     auto communicator = std::make_shared<ProcessCommunicatorImpl>();
47     auto commStatus = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(communicator);
48     if (commStatus != DistributedDB::DBStatus::OK) {
49         LOG_ERROR("set distributed db communicator failed.");
50     }
51     storeManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>(bundleName, "default");
52     if (storeManager_ == nullptr) {
53         LOG_ERROR("FlatObjectStorageEngine::make shared fail");
54         return ERR_NOMEM;
55     }
56 
57     DistributedDB::KvStoreConfig config;
58     config.dataDir = "/data/log";
59     storeManager_->SetKvStoreConfig(config);
60     isOpened_ = true;
61     LOG_INFO("FlatObjectDatabase::Open Succeed");
62     return SUCCESS;
63 }
64 
Close()65 uint32_t FlatObjectStorageEngine::Close()
66 {
67     if (!isOpened_) {
68         LOG_INFO("FlatObjectStorageEngine::Close has been closed!");
69         return SUCCESS;
70     }
71     std::lock_guard<std::mutex> lock(operationMutex_);
72     storeManager_ = nullptr;
73     isOpened_ = false;
74     return SUCCESS;
75 }
76 
OnComplete(const std::string & key,const std::map<std::string,DistributedDB::DBStatus> & devices,std::shared_ptr<StatusWatcher> statusWatcher)77 void FlatObjectStorageEngine::OnComplete(const std::string &key,
78     const std::map<std::string, DistributedDB::DBStatus> &devices, std::shared_ptr<StatusWatcher> statusWatcher)
79 {
80     LOG_INFO("complete");
81     if (statusWatcher != nullptr) {
82         for (auto item : devices) {
83             statusWatcher->OnChanged(key, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
84                 item.second == DistributedDB::OK ? "online" : "offline");
85         }
86     }
87 }
88 
CreateTable(const std::string & key)89 uint32_t FlatObjectStorageEngine::CreateTable(const std::string &key)
90 {
91     if (!isOpened_) {
92         return ERR_DB_NOT_INIT;
93     }
94     {
95         std::lock_guard<std::mutex> lock(operationMutex_);
96         if (delegates_.count(key) != 0) {
97             LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s already created", key.c_str());
98             return ERR_EXIST;
99         }
100     }
101     DistributedDB::KvStoreNbDelegate *kvStore = nullptr;
102     DistributedDB::DBStatus status;
103     DistributedDB::KvStoreNbDelegate::Option option = { true, true,
104         false }; // createIfNecessary, isMemoryDb, isEncryptedDb
105     LOG_INFO("start create table");
106     storeManager_->GetKvStore(key, option,
107         [&status, &kvStore](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
108             status = dbStatus;
109             kvStore = kvStoreNbDelegate;
110             LOG_INFO("create table result %{public}d", status);
111         });
112     if (status != DistributedDB::DBStatus::OK || kvStore == nullptr) {
113         LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s getkvstore fail[%{public}d]", key.c_str(), status);
114         return ERR_DB_GETKV_FAIL;
115     }
116     bool autoSync = true;
117     DistributedDB::PragmaData data = static_cast<DistributedDB::PragmaData>(&autoSync);
118     LOG_INFO("start Pragma");
119     status = kvStore->Pragma(DistributedDB::AUTO_SYNC, data);
120     if (status != DistributedDB::DBStatus::OK) {
121         LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s Pragma fail[%{public}d]", key.c_str(), status);
122         return ERR_DB_GETKV_FAIL;
123     }
124     LOG_INFO("create table %{public}s success", key.c_str());
125     {
126         std::lock_guard<std::mutex> lock(operationMutex_);
127         delegates_.insert_or_assign(key, kvStore);
128     }
129 
130     auto onComplete = [key, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
131         OnComplete(key, devices, statusWatcher_);
132     };
133     std::vector<DeviceInfo> devices = SoftBusAdapter::GetInstance()->GetDeviceList();
134     std::vector<std::string> deviceIds;
135     for (auto item : devices) {
136         deviceIds.push_back(item.deviceId);
137     }
138     SyncAllData(key, deviceIds, onComplete);
139     return SUCCESS;
140 }
141 
GetTable(const std::string & key,std::map<std::string,Value> & result)142 uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std::string, Value> &result)
143 {
144     if (!isOpened_) {
145         LOG_ERROR("not opened %{public}s", key.c_str());
146         return ERR_DB_NOT_INIT;
147     }
148     std::lock_guard<std::mutex> lock(operationMutex_);
149     if (delegates_.count(key) == 0) {
150         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
151         return ERR_DB_NOT_EXIST;
152     }
153     result.clear();
154     DistributedDB::KvStoreResultSet *resultSet = nullptr;
155     Key emptyKey;
156     LOG_INFO("start GetEntries");
157     auto delegate = delegates_.at(key);
158     DistributedDB::DBStatus status = delegate->GetEntries(emptyKey, resultSet);
159     if (status != DistributedDB::DBStatus::OK || resultSet == nullptr) {
160         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s GetEntries fail", key.c_str());
161         return ERR_DB_GET_FAIL;
162     }
163     LOG_INFO("end GetEntries");
164     while (resultSet->IsAfterLast()) {
165         DistributedDB::Entry entry;
166         status = resultSet->GetEntry(entry);
167         if (status != DistributedDB::DBStatus::OK) {
168             LOG_INFO("FlatObjectStorageEngine::GetTable GetEntry fail, errcode = %{public}d", status);
169             status = delegate->CloseResultSet(resultSet);
170             if (status != DistributedDB::DBStatus::OK) {
171                 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
172                 return ERR_RESULTSET;
173             }
174             return ERR_DB_ENTRY_FAIL;
175         }
176         result.insert_or_assign(StringUtils::BytesToStr(entry.key), entry.value);
177         resultSet->MoveToNext();
178     }
179     status = delegate->CloseResultSet(resultSet);
180     if (status != DistributedDB::DBStatus::OK) {
181         LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
182         return ERR_RESULTSET;
183     }
184     return SUCCESS;
185 }
186 
UpdateItem(const std::string & key,const std::string & itemKey,Value & value)187 uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::string &itemKey, Value &value)
188 {
189     if (!isOpened_) {
190         return ERR_DB_NOT_INIT;
191     }
192     std::lock_guard<std::mutex> lock(operationMutex_);
193     if (delegates_.count(key) == 0) {
194         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
195         return ERR_DB_NOT_EXIST;
196     }
197     auto delegate = delegates_.at(key);
198     LOG_INFO("start Put");
199     auto status = delegate->Put(StringUtils::StrToBytes(itemKey), value);
200     if (status != DistributedDB::DBStatus::OK) {
201         LOG_ERROR("%{public}s Put fail[%{public}d]", key.c_str(), status);
202         return ERR_CLOSE_STORAGE;
203     }
204     LOG_INFO("put success");
205     return SUCCESS;
206 }
207 
UpdateItems(const std::string & key,const std::map<std::string,std::vector<uint8_t>> & data)208 uint32_t FlatObjectStorageEngine::UpdateItems(
209     const std::string &key, const std::map<std::string, std::vector<uint8_t>> &data)
210 {
211     if (!isOpened_ || data.size() == 0) {
212         return ERR_DB_NOT_INIT;
213     }
214     std::lock_guard<std::mutex> lock(operationMutex_);
215     if (delegates_.count(key) == 0) {
216         LOG_INFO("FlatObjectStorageEngine::UpdateItems %{public}s not exist", key.c_str());
217         return ERR_DB_NOT_EXIST;
218     }
219 
220     std::vector<DistributedDB::Entry> entries;
221     for (auto &item : data) {
222         DistributedDB::Entry entry = { .key = StringUtils::StrToBytes(item.first), .value = item.second };
223         entries.emplace_back(entry);
224     }
225     auto delegate = delegates_.at(key);
226     LOG_INFO("start PutBatch");
227     auto status = delegate->PutBatch(entries);
228     if (status != DistributedDB::DBStatus::OK) {
229         LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
230         return ERR_CLOSE_STORAGE;
231     }
232     LOG_INFO("put success");
233     return SUCCESS;
234 }
235 
DeleteTable(const std::string & key)236 uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
237 {
238     if (!isOpened_) {
239         return ERR_DB_NOT_INIT;
240     }
241     std::lock_guard<std::mutex> lock(operationMutex_);
242     if (delegates_.count(key) == 0) {
243         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
244         return ERR_DB_NOT_EXIST;
245     }
246     LOG_INFO("start DeleteTable %{public}s", key.c_str());
247     auto status = storeManager_->CloseKvStore(delegates_.at(key));
248     if (status != DistributedDB::DBStatus::OK) {
249         LOG_ERROR(
250             "FlatObjectStorageEngine::CloseKvStore %{public}s CloseKvStore fail[%{public}d]", key.c_str(), status);
251         return ERR_CLOSE_STORAGE;
252     }
253     LOG_INFO("DeleteTable success");
254     delegates_.erase(key);
255     return SUCCESS;
256 }
257 
GetItem(const std::string & key,const std::string & itemKey,Value & value)258 uint32_t FlatObjectStorageEngine::GetItem(const std::string &key, const std::string &itemKey, Value &value)
259 {
260     if (!isOpened_) {
261         return ERR_DB_NOT_INIT;
262     }
263     std::lock_guard<std::mutex> lock(operationMutex_);
264     if (delegates_.count(key) == 0) {
265         LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s not exist", key.c_str());
266         return ERR_DB_NOT_EXIST;
267     }
268     LOG_INFO("start Get %{public}s", key.c_str());
269     DistributedDB::DBStatus status = delegates_.at(key)->Get(StringUtils::StrToBytes(itemKey), value);
270     if (status != DistributedDB::DBStatus::OK) {
271         LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s item fail %{public}d", itemKey.c_str(), status);
272         return status;
273     }
274     LOG_INFO("end Get %{public}s", key.c_str());
275     return SUCCESS;
276 }
277 
RegisterObserver(const std::string & key,std::shared_ptr<TableWatcher> watcher)278 uint32_t FlatObjectStorageEngine::RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher)
279 {
280     if (!isOpened_) {
281         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
282         return ERR_DB_NOT_INIT;
283     }
284     std::lock_guard<std::mutex> lock(operationMutex_);
285     if (delegates_.count(key) == 0) {
286         LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
287         return ERR_DB_NOT_EXIST;
288     }
289     if (observerMap_.count(key) != 0) {
290         LOG_INFO("FlatObjectStorageEngine::RegisterObserver observer already exist.");
291         return SUCCESS;
292     }
293     auto delegate = delegates_.at(key);
294     std::vector<uint8_t> tmpKey;
295     LOG_INFO("start RegisterObserver %{public}s", key.c_str());
296     DistributedDB::DBStatus status =
297         delegate->RegisterObserver(tmpKey, DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, watcher.get());
298     if (status != DistributedDB::DBStatus::OK) {
299         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver watch err %{public}d", status);
300         return ERR_REGISTER;
301     }
302     LOG_INFO("end RegisterObserver %{public}s", key.c_str());
303     observerMap_.insert_or_assign(key, watcher);
304     return SUCCESS;
305 }
306 
UnRegisterObserver(const std::string & key)307 uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
308 {
309     if (!isOpened_) {
310         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
311         return ERR_DB_NOT_INIT;
312     }
313     std::lock_guard<std::mutex> lock(operationMutex_);
314     if (delegates_.count(key) == 0) {
315         LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
316         return ERR_DB_NOT_EXIST;
317     }
318     auto iter = observerMap_.find(key);
319     if (iter == observerMap_.end()) {
320         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
321         return ERR_NO_OBSERVER;
322     }
323     auto delegate = delegates_.at(key);
324     std::shared_ptr<TableWatcher> watcher = iter->second;
325     LOG_INFO("start UnRegisterObserver %{public}s", key.c_str());
326     DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
327     if (status != DistributedDB::DBStatus::OK) {
328         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
329         return ERR_UNRIGSTER;
330     }
331     LOG_INFO("end UnRegisterObserver %{public}s", key.c_str());
332     observerMap_.erase(key);
333     return SUCCESS;
334 }
335 
SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)336 uint32_t FlatObjectStorageEngine::SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)
337 {
338     if (!isOpened_) {
339         LOG_ERROR("FlatObjectStorageEngine::SetStatusNotifier kvStore has not init");
340         return ERR_DB_NOT_INIT;
341     }
342     auto databaseStatusNotifyCallback = [this](std::string userId, std::string appId, std::string storeId,
343                                             const std::string deviceId, bool onlineStatus) -> void {
344         LOG_INFO("complete");
345         if (statusWatcher_ == nullptr) {
346             LOG_INFO("FlatObjectStorageEngine::statusWatcher_ null");
347             return;
348         }
349         if (onlineStatus) {
350             auto onComplete = [this, storeId](const std::map<std::string, DistributedDB::DBStatus> &devices) {
351                 for (auto item : devices) {
352                     LOG_INFO("%{public}s pull data result %{public}d in device %{public}s", storeId.c_str(),
353                         item.second, SoftBusAdapter::GetInstance()->ToNodeID(item.first).c_str());
354                 }
355                 if (statusWatcher_ != nullptr) {
356                     for (auto item : devices) {
357                         statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
358                             item.second == DistributedDB::OK ? "online" : "offline");
359                     }
360                 }
361             };
362             SyncAllData(storeId, std::vector<std::string>({ deviceId }), onComplete);
363         } else {
364             statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(deviceId), "offline");
365         }
366     };
367     storeManager_->SetStoreStatusNotifier(databaseStatusNotifyCallback);
368     LOG_INFO("FlatObjectStorageEngine::SetStatusNotifier success");
369     statusWatcher_ = watcher;
370     return SUCCESS;
371 }
372 
SyncAllData(const std::string & sessionId,const std::vector<std::string> & deviceIds,const std::function<void (const std::map<std::string,DistributedDB::DBStatus> &)> & onComplete)373 uint32_t FlatObjectStorageEngine::SyncAllData(const std::string &sessionId, const std::vector<std::string> &deviceIds,
374     const std::function<void(const std::map<std::string, DistributedDB::DBStatus> &)> &onComplete)
375 {
376     LOG_INFO("start");
377     std::lock_guard<std::mutex> lock(operationMutex_);
378     if (delegates_.count(sessionId) == 0) {
379         LOG_ERROR("FlatObjectStorageEngine::SyncAllData %{public}s already deleted", sessionId.c_str());
380         return ERR_DB_NOT_EXIST;
381     }
382     DistributedDB::KvStoreNbDelegate *kvstore = delegates_.at(sessionId);
383     if (deviceIds.empty()) {
384         LOG_INFO("single device,no need sync");
385         return ERR_SINGLE_DEVICE;
386     }
387     LOG_INFO("start sync %{public}s", sessionId.c_str());
388     DistributedDB::DBStatus status = kvstore->Sync(deviceIds, DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY, onComplete);
389     if (status != DistributedDB::DBStatus::OK) {
390         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
391         return ERR_UNRIGSTER;
392     }
393     LOG_INFO("end sync %{public}s", sessionId.c_str());
394     return SUCCESS;
395 }
396 
GetItems(const std::string & key,std::map<std::string,std::vector<uint8_t>> & data)397 uint32_t FlatObjectStorageEngine::GetItems(const std::string &key, std::map<std::string, std::vector<uint8_t>> &data)
398 {
399     if (!isOpened_) {
400         LOG_ERROR("FlatObjectStorageEngine::GetItems %{public}s not init", key.c_str());
401         return ERR_DB_NOT_INIT;
402     }
403     std::lock_guard<std::mutex> lock(operationMutex_);
404     if (delegates_.count(key) == 0) {
405         LOG_ERROR("FlatObjectStorageEngine::GetItems %{public}s not exist", key.c_str());
406         return ERR_DB_NOT_EXIST;
407     }
408     LOG_INFO("start Get %{public}s", key.c_str());
409     std::vector<DistributedDB::Entry> entries;
410     DistributedDB::DBStatus status = delegates_.at(key)->GetEntries(StringUtils::StrToBytes(""), entries);
411     if (status != DistributedDB::DBStatus::OK) {
412         LOG_ERROR("FlatObjectStorageEngine::GetItems item fail status = %{public}d", status);
413         return status;
414     }
415     for (auto &item : entries) {
416         data[StringUtils::BytesToStr(item.key)] = item.value;
417     }
418     LOG_INFO("end Get %{public}s", key.c_str());
419     return SUCCESS;
420 }
421 
NotifyStatus(const std::string & sessionId,const std::string & deviceId,const std::string & status)422 void FlatObjectStorageEngine::NotifyStatus(const std::string &sessionId, const std::string &deviceId,
423                                            const std::string &status)
424 {
425     if (statusWatcher_ == nullptr) {
426         return;
427     }
428     statusWatcher_->OnChanged(sessionId, deviceId, status);
429 }
430 
NotifyChange(const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & changedData)431 void FlatObjectStorageEngine::NotifyChange(const std::string &sessionId,
432                                            const std::map<std::string, std::vector<uint8_t>> &changedData)
433 {
434     std::lock_guard<std::mutex> lock(operationMutex_);
435     if (observerMap_.count(sessionId) == 0) {
436         return;
437     }
438     std::vector<std::string> data {};
439     for (const auto &item : changedData) {
440         data.push_back(item.first);
441     }
442     observerMap_[sessionId]->OnChanged(sessionId, data);
443 }
444 
OnChange(const DistributedDB::KvStoreChangedData & data)445 void Watcher::OnChange(const DistributedDB::KvStoreChangedData &data)
446 {
447     std::vector<std::string> changedData;
448     std::string tmp;
449     for (DistributedDB::Entry item : data.GetEntriesInserted()) {
450         tmp = StringUtils::BytesToStr(item.key);
451         LOG_INFO("inserted %{public}s", tmp.c_str());
452         // property key start with p_, 2 is p_ size
453         if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
454             changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
455         }
456     }
457     for (DistributedDB::Entry item : data.GetEntriesUpdated()) {
458         tmp = StringUtils::BytesToStr(item.key);
459         LOG_INFO("updated %{public}s", tmp.c_str());
460         // property key start with p_, 2 is p_ size
461         if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
462             changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
463         }
464     }
465     this->OnChanged(sessionId_, changedData);
466 }
467 
Watcher(const std::string & sessionId)468 Watcher::Watcher(const std::string &sessionId) : sessionId_(sessionId)
469 {
470 }
471 } // namespace OHOS::ObjectStore
472