1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "flat_object_storage_engine.h"
16
17 #include "logger.h"
18 #include "objectstore_errors.h"
19 #include "process_communicator_impl.h"
20 #include "securec.h"
21 #include "softbus_adapter.h"
22 #include "string_utils.h"
23 #include "types_export.h"
24
25 namespace OHOS::ObjectStore {
~FlatObjectStorageEngine()26 FlatObjectStorageEngine::~FlatObjectStorageEngine()
27 {
28 if (!isOpened_) {
29 return;
30 }
31 storeManager_ = nullptr;
32 LOG_INFO("FlatObjectStorageEngine::~FlatObjectStorageEngine Crash! end");
33 }
34
Open(const std::string & bundleName)35 uint32_t FlatObjectStorageEngine::Open(const std::string &bundleName)
36 {
37 if (isOpened_) {
38 LOG_INFO("FlatObjectDatabase: No need to reopen it");
39 return SUCCESS;
40 }
41 auto status = DistributedDB::KvStoreDelegateManager::SetProcessLabel("objectstoreDB", bundleName);
42 if (status != DistributedDB::DBStatus::OK) {
43 LOG_ERROR("delegate SetProcessLabel failed: %{public}d.", static_cast<int>(status));
44 }
45
46 auto communicator = std::make_shared<ProcessCommunicatorImpl>();
47 auto commStatus = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(communicator);
48 if (commStatus != DistributedDB::DBStatus::OK) {
49 LOG_ERROR("set distributed db communicator failed.");
50 }
51 storeManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>(bundleName, "default");
52 if (storeManager_ == nullptr) {
53 LOG_ERROR("FlatObjectStorageEngine::make shared fail");
54 return ERR_NOMEM;
55 }
56
57 DistributedDB::KvStoreConfig config;
58 config.dataDir = "/data/log";
59 storeManager_->SetKvStoreConfig(config);
60 isOpened_ = true;
61 LOG_INFO("FlatObjectDatabase::Open Succeed");
62 return SUCCESS;
63 }
64
Close()65 uint32_t FlatObjectStorageEngine::Close()
66 {
67 if (!isOpened_) {
68 LOG_INFO("FlatObjectStorageEngine::Close has been closed!");
69 return SUCCESS;
70 }
71 std::lock_guard<std::mutex> lock(operationMutex_);
72 storeManager_ = nullptr;
73 isOpened_ = false;
74 return SUCCESS;
75 }
76
OnComplete(const std::string & key,const std::map<std::string,DistributedDB::DBStatus> & devices,std::shared_ptr<StatusWatcher> statusWatcher)77 void FlatObjectStorageEngine::OnComplete(const std::string &key,
78 const std::map<std::string, DistributedDB::DBStatus> &devices, std::shared_ptr<StatusWatcher> statusWatcher)
79 {
80 LOG_INFO("complete");
81 if (statusWatcher != nullptr) {
82 for (auto item : devices) {
83 statusWatcher->OnChanged(key, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
84 item.second == DistributedDB::OK ? "online" : "offline");
85 }
86 }
87 }
88
CreateTable(const std::string & key)89 uint32_t FlatObjectStorageEngine::CreateTable(const std::string &key)
90 {
91 if (!isOpened_) {
92 return ERR_DB_NOT_INIT;
93 }
94 {
95 std::lock_guard<std::mutex> lock(operationMutex_);
96 if (delegates_.count(key) != 0) {
97 LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s already created", key.c_str());
98 return ERR_EXIST;
99 }
100 }
101 DistributedDB::KvStoreNbDelegate *kvStore = nullptr;
102 DistributedDB::DBStatus status;
103 DistributedDB::KvStoreNbDelegate::Option option = { true, true,
104 false }; // createIfNecessary, isMemoryDb, isEncryptedDb
105 LOG_INFO("start create table");
106 storeManager_->GetKvStore(key, option,
107 [&status, &kvStore](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
108 status = dbStatus;
109 kvStore = kvStoreNbDelegate;
110 LOG_INFO("create table result %{public}d", status);
111 });
112 if (status != DistributedDB::DBStatus::OK || kvStore == nullptr) {
113 LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s getkvstore fail[%{public}d]", key.c_str(), status);
114 return ERR_DB_GETKV_FAIL;
115 }
116 bool autoSync = true;
117 DistributedDB::PragmaData data = static_cast<DistributedDB::PragmaData>(&autoSync);
118 LOG_INFO("start Pragma");
119 status = kvStore->Pragma(DistributedDB::AUTO_SYNC, data);
120 if (status != DistributedDB::DBStatus::OK) {
121 LOG_ERROR("FlatObjectStorageEngine::CreateTable %{public}s Pragma fail[%{public}d]", key.c_str(), status);
122 return ERR_DB_GETKV_FAIL;
123 }
124 LOG_INFO("create table %{public}s success", key.c_str());
125 {
126 std::lock_guard<std::mutex> lock(operationMutex_);
127 delegates_.insert_or_assign(key, kvStore);
128 }
129
130 auto onComplete = [key, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
131 OnComplete(key, devices, statusWatcher_);
132 };
133 std::vector<DeviceInfo> devices = SoftBusAdapter::GetInstance()->GetDeviceList();
134 std::vector<std::string> deviceIds;
135 for (auto item : devices) {
136 deviceIds.push_back(item.deviceId);
137 }
138 SyncAllData(key, deviceIds, onComplete);
139 return SUCCESS;
140 }
141
GetTable(const std::string & key,std::map<std::string,Value> & result)142 uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std::string, Value> &result)
143 {
144 if (!isOpened_) {
145 LOG_ERROR("not opened %{public}s", key.c_str());
146 return ERR_DB_NOT_INIT;
147 }
148 std::lock_guard<std::mutex> lock(operationMutex_);
149 if (delegates_.count(key) == 0) {
150 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
151 return ERR_DB_NOT_EXIST;
152 }
153 result.clear();
154 DistributedDB::KvStoreResultSet *resultSet = nullptr;
155 Key emptyKey;
156 LOG_INFO("start GetEntries");
157 auto delegate = delegates_.at(key);
158 DistributedDB::DBStatus status = delegate->GetEntries(emptyKey, resultSet);
159 if (status != DistributedDB::DBStatus::OK || resultSet == nullptr) {
160 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s GetEntries fail", key.c_str());
161 return ERR_DB_GET_FAIL;
162 }
163 LOG_INFO("end GetEntries");
164 while (resultSet->IsAfterLast()) {
165 DistributedDB::Entry entry;
166 status = resultSet->GetEntry(entry);
167 if (status != DistributedDB::DBStatus::OK) {
168 LOG_INFO("FlatObjectStorageEngine::GetTable GetEntry fail, errcode = %{public}d", status);
169 status = delegate->CloseResultSet(resultSet);
170 if (status != DistributedDB::DBStatus::OK) {
171 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
172 return ERR_RESULTSET;
173 }
174 return ERR_DB_ENTRY_FAIL;
175 }
176 result.insert_or_assign(StringUtils::BytesToStr(entry.key), entry.value);
177 resultSet->MoveToNext();
178 }
179 status = delegate->CloseResultSet(resultSet);
180 if (status != DistributedDB::DBStatus::OK) {
181 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
182 return ERR_RESULTSET;
183 }
184 return SUCCESS;
185 }
186
UpdateItem(const std::string & key,const std::string & itemKey,Value & value)187 uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::string &itemKey, Value &value)
188 {
189 if (!isOpened_) {
190 return ERR_DB_NOT_INIT;
191 }
192 std::lock_guard<std::mutex> lock(operationMutex_);
193 if (delegates_.count(key) == 0) {
194 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
195 return ERR_DB_NOT_EXIST;
196 }
197 auto delegate = delegates_.at(key);
198 LOG_INFO("start Put");
199 auto status = delegate->Put(StringUtils::StrToBytes(itemKey), value);
200 if (status != DistributedDB::DBStatus::OK) {
201 LOG_ERROR("%{public}s Put fail[%{public}d]", key.c_str(), status);
202 return ERR_CLOSE_STORAGE;
203 }
204 LOG_INFO("put success");
205 return SUCCESS;
206 }
207
UpdateItems(const std::string & key,const std::map<std::string,std::vector<uint8_t>> & data)208 uint32_t FlatObjectStorageEngine::UpdateItems(
209 const std::string &key, const std::map<std::string, std::vector<uint8_t>> &data)
210 {
211 if (!isOpened_ || data.size() == 0) {
212 return ERR_DB_NOT_INIT;
213 }
214 std::lock_guard<std::mutex> lock(operationMutex_);
215 if (delegates_.count(key) == 0) {
216 LOG_INFO("FlatObjectStorageEngine::UpdateItems %{public}s not exist", key.c_str());
217 return ERR_DB_NOT_EXIST;
218 }
219
220 std::vector<DistributedDB::Entry> entries;
221 for (auto &item : data) {
222 DistributedDB::Entry entry = { .key = StringUtils::StrToBytes(item.first), .value = item.second };
223 entries.emplace_back(entry);
224 }
225 auto delegate = delegates_.at(key);
226 LOG_INFO("start PutBatch");
227 auto status = delegate->PutBatch(entries);
228 if (status != DistributedDB::DBStatus::OK) {
229 LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
230 return ERR_CLOSE_STORAGE;
231 }
232 LOG_INFO("put success");
233 return SUCCESS;
234 }
235
DeleteTable(const std::string & key)236 uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
237 {
238 if (!isOpened_) {
239 return ERR_DB_NOT_INIT;
240 }
241 std::lock_guard<std::mutex> lock(operationMutex_);
242 if (delegates_.count(key) == 0) {
243 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
244 return ERR_DB_NOT_EXIST;
245 }
246 LOG_INFO("start DeleteTable %{public}s", key.c_str());
247 auto status = storeManager_->CloseKvStore(delegates_.at(key));
248 if (status != DistributedDB::DBStatus::OK) {
249 LOG_ERROR(
250 "FlatObjectStorageEngine::CloseKvStore %{public}s CloseKvStore fail[%{public}d]", key.c_str(), status);
251 return ERR_CLOSE_STORAGE;
252 }
253 LOG_INFO("DeleteTable success");
254 delegates_.erase(key);
255 return SUCCESS;
256 }
257
GetItem(const std::string & key,const std::string & itemKey,Value & value)258 uint32_t FlatObjectStorageEngine::GetItem(const std::string &key, const std::string &itemKey, Value &value)
259 {
260 if (!isOpened_) {
261 return ERR_DB_NOT_INIT;
262 }
263 std::lock_guard<std::mutex> lock(operationMutex_);
264 if (delegates_.count(key) == 0) {
265 LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s not exist", key.c_str());
266 return ERR_DB_NOT_EXIST;
267 }
268 LOG_INFO("start Get %{public}s", key.c_str());
269 DistributedDB::DBStatus status = delegates_.at(key)->Get(StringUtils::StrToBytes(itemKey), value);
270 if (status != DistributedDB::DBStatus::OK) {
271 LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s item fail %{public}d", itemKey.c_str(), status);
272 return status;
273 }
274 LOG_INFO("end Get %{public}s", key.c_str());
275 return SUCCESS;
276 }
277
RegisterObserver(const std::string & key,std::shared_ptr<TableWatcher> watcher)278 uint32_t FlatObjectStorageEngine::RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher)
279 {
280 if (!isOpened_) {
281 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
282 return ERR_DB_NOT_INIT;
283 }
284 std::lock_guard<std::mutex> lock(operationMutex_);
285 if (delegates_.count(key) == 0) {
286 LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
287 return ERR_DB_NOT_EXIST;
288 }
289 if (observerMap_.count(key) != 0) {
290 LOG_INFO("FlatObjectStorageEngine::RegisterObserver observer already exist.");
291 return SUCCESS;
292 }
293 auto delegate = delegates_.at(key);
294 std::vector<uint8_t> tmpKey;
295 LOG_INFO("start RegisterObserver %{public}s", key.c_str());
296 DistributedDB::DBStatus status =
297 delegate->RegisterObserver(tmpKey, DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, watcher.get());
298 if (status != DistributedDB::DBStatus::OK) {
299 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver watch err %{public}d", status);
300 return ERR_REGISTER;
301 }
302 LOG_INFO("end RegisterObserver %{public}s", key.c_str());
303 observerMap_.insert_or_assign(key, watcher);
304 return SUCCESS;
305 }
306
UnRegisterObserver(const std::string & key)307 uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
308 {
309 if (!isOpened_) {
310 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
311 return ERR_DB_NOT_INIT;
312 }
313 std::lock_guard<std::mutex> lock(operationMutex_);
314 if (delegates_.count(key) == 0) {
315 LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
316 return ERR_DB_NOT_EXIST;
317 }
318 auto iter = observerMap_.find(key);
319 if (iter == observerMap_.end()) {
320 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
321 return ERR_NO_OBSERVER;
322 }
323 auto delegate = delegates_.at(key);
324 std::shared_ptr<TableWatcher> watcher = iter->second;
325 LOG_INFO("start UnRegisterObserver %{public}s", key.c_str());
326 DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
327 if (status != DistributedDB::DBStatus::OK) {
328 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
329 return ERR_UNRIGSTER;
330 }
331 LOG_INFO("end UnRegisterObserver %{public}s", key.c_str());
332 observerMap_.erase(key);
333 return SUCCESS;
334 }
335
SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)336 uint32_t FlatObjectStorageEngine::SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)
337 {
338 if (!isOpened_) {
339 LOG_ERROR("FlatObjectStorageEngine::SetStatusNotifier kvStore has not init");
340 return ERR_DB_NOT_INIT;
341 }
342 auto databaseStatusNotifyCallback = [this](std::string userId, std::string appId, std::string storeId,
343 const std::string deviceId, bool onlineStatus) -> void {
344 LOG_INFO("complete");
345 if (statusWatcher_ == nullptr) {
346 LOG_INFO("FlatObjectStorageEngine::statusWatcher_ null");
347 return;
348 }
349 if (onlineStatus) {
350 auto onComplete = [this, storeId](const std::map<std::string, DistributedDB::DBStatus> &devices) {
351 for (auto item : devices) {
352 LOG_INFO("%{public}s pull data result %{public}d in device %{public}s", storeId.c_str(),
353 item.second, SoftBusAdapter::GetInstance()->ToNodeID(item.first).c_str());
354 }
355 if (statusWatcher_ != nullptr) {
356 for (auto item : devices) {
357 statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
358 item.second == DistributedDB::OK ? "online" : "offline");
359 }
360 }
361 };
362 SyncAllData(storeId, std::vector<std::string>({ deviceId }), onComplete);
363 } else {
364 statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(deviceId), "offline");
365 }
366 };
367 storeManager_->SetStoreStatusNotifier(databaseStatusNotifyCallback);
368 LOG_INFO("FlatObjectStorageEngine::SetStatusNotifier success");
369 statusWatcher_ = watcher;
370 return SUCCESS;
371 }
372
SyncAllData(const std::string & sessionId,const std::vector<std::string> & deviceIds,const std::function<void (const std::map<std::string,DistributedDB::DBStatus> &)> & onComplete)373 uint32_t FlatObjectStorageEngine::SyncAllData(const std::string &sessionId, const std::vector<std::string> &deviceIds,
374 const std::function<void(const std::map<std::string, DistributedDB::DBStatus> &)> &onComplete)
375 {
376 LOG_INFO("start");
377 std::lock_guard<std::mutex> lock(operationMutex_);
378 if (delegates_.count(sessionId) == 0) {
379 LOG_ERROR("FlatObjectStorageEngine::SyncAllData %{public}s already deleted", sessionId.c_str());
380 return ERR_DB_NOT_EXIST;
381 }
382 DistributedDB::KvStoreNbDelegate *kvstore = delegates_.at(sessionId);
383 if (deviceIds.empty()) {
384 LOG_INFO("single device,no need sync");
385 return ERR_SINGLE_DEVICE;
386 }
387 LOG_INFO("start sync %{public}s", sessionId.c_str());
388 DistributedDB::DBStatus status = kvstore->Sync(deviceIds, DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY, onComplete);
389 if (status != DistributedDB::DBStatus::OK) {
390 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
391 return ERR_UNRIGSTER;
392 }
393 LOG_INFO("end sync %{public}s", sessionId.c_str());
394 return SUCCESS;
395 }
396
GetItems(const std::string & key,std::map<std::string,std::vector<uint8_t>> & data)397 uint32_t FlatObjectStorageEngine::GetItems(const std::string &key, std::map<std::string, std::vector<uint8_t>> &data)
398 {
399 if (!isOpened_) {
400 LOG_ERROR("FlatObjectStorageEngine::GetItems %{public}s not init", key.c_str());
401 return ERR_DB_NOT_INIT;
402 }
403 std::lock_guard<std::mutex> lock(operationMutex_);
404 if (delegates_.count(key) == 0) {
405 LOG_ERROR("FlatObjectStorageEngine::GetItems %{public}s not exist", key.c_str());
406 return ERR_DB_NOT_EXIST;
407 }
408 LOG_INFO("start Get %{public}s", key.c_str());
409 std::vector<DistributedDB::Entry> entries;
410 DistributedDB::DBStatus status = delegates_.at(key)->GetEntries(StringUtils::StrToBytes(""), entries);
411 if (status != DistributedDB::DBStatus::OK) {
412 LOG_ERROR("FlatObjectStorageEngine::GetItems item fail status = %{public}d", status);
413 return status;
414 }
415 for (auto &item : entries) {
416 data[StringUtils::BytesToStr(item.key)] = item.value;
417 }
418 LOG_INFO("end Get %{public}s", key.c_str());
419 return SUCCESS;
420 }
421
NotifyStatus(const std::string & sessionId,const std::string & deviceId,const std::string & status)422 void FlatObjectStorageEngine::NotifyStatus(const std::string &sessionId, const std::string &deviceId,
423 const std::string &status)
424 {
425 if (statusWatcher_ == nullptr) {
426 return;
427 }
428 statusWatcher_->OnChanged(sessionId, deviceId, status);
429 }
430
NotifyChange(const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & changedData)431 void FlatObjectStorageEngine::NotifyChange(const std::string &sessionId,
432 const std::map<std::string, std::vector<uint8_t>> &changedData)
433 {
434 std::lock_guard<std::mutex> lock(operationMutex_);
435 if (observerMap_.count(sessionId) == 0) {
436 return;
437 }
438 std::vector<std::string> data {};
439 for (const auto &item : changedData) {
440 data.push_back(item.first);
441 }
442 observerMap_[sessionId]->OnChanged(sessionId, data);
443 }
444
OnChange(const DistributedDB::KvStoreChangedData & data)445 void Watcher::OnChange(const DistributedDB::KvStoreChangedData &data)
446 {
447 std::vector<std::string> changedData;
448 std::string tmp;
449 for (DistributedDB::Entry item : data.GetEntriesInserted()) {
450 tmp = StringUtils::BytesToStr(item.key);
451 LOG_INFO("inserted %{public}s", tmp.c_str());
452 // property key start with p_, 2 is p_ size
453 if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
454 changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
455 }
456 }
457 for (DistributedDB::Entry item : data.GetEntriesUpdated()) {
458 tmp = StringUtils::BytesToStr(item.key);
459 LOG_INFO("updated %{public}s", tmp.c_str());
460 // property key start with p_, 2 is p_ size
461 if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
462 changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
463 }
464 }
465 this->OnChanged(sessionId_, changedData);
466 }
467
Watcher(const std::string & sessionId)468 Watcher::Watcher(const std::string &sessionId) : sessionId_(sessionId)
469 {
470 }
471 } // namespace OHOS::ObjectStore
472