1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "flat_object_store.h"
17
18 #include "client_adaptor.h"
19 #include "distributed_objectstore_impl.h"
20 #include "logger.h"
21 #include "object_callback.h"
22 #include "object_service_proxy.h"
23 #include "objectstore_errors.h"
24 #include "softbus_adapter.h"
25
26 namespace OHOS::ObjectStore {
FlatObjectStore(const std::string & bundleName)27 FlatObjectStore::FlatObjectStore(const std::string &bundleName)
28 {
29 bundleName_ = bundleName;
30 storageEngine_ = std::make_shared<FlatObjectStorageEngine>();
31 uint32_t status = storageEngine_->Open(bundleName);
32 if (status != SUCCESS) {
33 LOG_ERROR("FlatObjectStore: Failed to open, error: open storage engine failure %{public}d", status);
34 }
35 cacheManager_ = new CacheManager();
36 }
37
~FlatObjectStore()38 FlatObjectStore::~FlatObjectStore()
39 {
40 if (storageEngine_ != nullptr) {
41 storageEngine_->Close();
42 storageEngine_ = nullptr;
43 }
44 delete cacheManager_;
45 cacheManager_ = nullptr;
46 }
47
CreateObject(const std::string & sessionId)48 uint32_t FlatObjectStore::CreateObject(const std::string &sessionId)
49 {
50 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
51 LOG_ERROR("FlatObjectStore::DB has not inited");
52 return ERR_DB_NOT_INIT;
53 }
54 uint32_t status = storageEngine_->CreateTable(sessionId);
55 if (status != SUCCESS) {
56 LOG_ERROR("FlatObjectStore::CreateObject createTable err %{public}d", status);
57 return status;
58 }
59 std::function<void(const std::map<std::string, std::vector<uint8_t>> &data)> callback =
60 [sessionId, this](
61 const std::map<std::string, std::vector<uint8_t>> &data) {
62 if (data.size() > 0) {
63 LOG_INFO("objectstore, retrieve success");
64 {
65 std::lock_guard<std::mutex> lck(mutex_);
66 if (find(retrievedCache_.begin(), retrievedCache_.end(), sessionId) == retrievedCache_.end()) {
67 retrievedCache_.push_back(sessionId);
68 }
69 }
70 auto result = storageEngine_->UpdateItems(sessionId, data);
71 if (result != SUCCESS) {
72 LOG_ERROR("UpdateItems failed, status = %{public}d", result);
73 }
74 } else {
75 LOG_INFO("objectstore, retrieve empty");
76 }
77 };
78 std::function<void(const std::map<std::string, std::vector<uint8_t>> &data)> remoteResumeCallback =
79 [sessionId, this](
80 const std::map<std::string, std::vector<uint8_t>> &data) {
81 LOG_INFO("SubscribeDataChange callback success.");
82 std::map<std::string, std::vector<uint8_t>> filteredData = data;
83 FilterData(sessionId, filteredData);
84 if (!filteredData.empty()) {
85 auto status = storageEngine_->UpdateItems(sessionId, filteredData);
86 if (status != SUCCESS) {
87 LOG_ERROR("UpdateItems failed, status = %{public}d", status);
88 }
89 storageEngine_->NotifyChange(sessionId, filteredData);
90 }
91 storageEngine_->NotifyStatus(sessionId, "local", "restored");
92 };
93 cacheManager_->ResumeObject(bundleName_, sessionId, callback);
94 cacheManager_->SubscribeDataChange(bundleName_, sessionId, remoteResumeCallback);
95 return SUCCESS;
96 }
97
Delete(const std::string & sessionId)98 uint32_t FlatObjectStore::Delete(const std::string &sessionId)
99 {
100 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
101 LOG_ERROR("FlatObjectStore::DB has not inited");
102 return ERR_DB_NOT_INIT;
103 }
104 uint32_t status = storageEngine_->DeleteTable(sessionId);
105 if (status != SUCCESS) {
106 LOG_ERROR("FlatObjectStore: Failed to delete object %{public}d", status);
107 return status;
108 }
109 cacheManager_->UnregisterDataChange(bundleName_, sessionId);
110 return SUCCESS;
111 }
112
Watch(const std::string & sessionId,std::shared_ptr<FlatObjectWatcher> watcher)113 uint32_t FlatObjectStore::Watch(const std::string &sessionId, std::shared_ptr<FlatObjectWatcher> watcher)
114 {
115 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
116 LOG_ERROR("FlatObjectStore::DB has not inited");
117 return ERR_DB_NOT_INIT;
118 }
119 uint32_t status = storageEngine_->RegisterObserver(sessionId, watcher);
120 if (status != SUCCESS) {
121 LOG_ERROR("FlatObjectStore::Watch failed %{public}d", status);
122 }
123 return status;
124 }
125
UnWatch(const std::string & sessionId)126 uint32_t FlatObjectStore::UnWatch(const std::string &sessionId)
127 {
128 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
129 LOG_ERROR("FlatObjectStore::DB has not inited");
130 return ERR_DB_NOT_INIT;
131 }
132 uint32_t status = storageEngine_->UnRegisterObserver(sessionId);
133 if (status != SUCCESS) {
134 LOG_ERROR("FlatObjectStore::Watch failed %{public}d", status);
135 }
136 return status;
137 }
138
Put(const std::string & sessionId,const std::string & key,std::vector<uint8_t> value)139 uint32_t FlatObjectStore::Put(const std::string &sessionId, const std::string &key, std::vector<uint8_t> value)
140 {
141 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
142 LOG_ERROR("FlatObjectStore::DB has not inited");
143 return ERR_DB_NOT_INIT;
144 }
145 return storageEngine_->UpdateItem(sessionId, key, value);
146 }
147
Get(std::string & sessionId,const std::string & key,Bytes & value)148 uint32_t FlatObjectStore::Get(std::string &sessionId, const std::string &key, Bytes &value)
149 {
150 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
151 LOG_ERROR("FlatObjectStore::DB has not inited");
152 return ERR_DB_NOT_INIT;
153 }
154 return storageEngine_->GetItem(sessionId, key, value);
155 }
156
SetStatusNotifier(std::shared_ptr<StatusWatcher> notifier)157 uint32_t FlatObjectStore::SetStatusNotifier(std::shared_ptr<StatusWatcher> notifier)
158 {
159 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
160 LOG_ERROR("FlatObjectStore::DB has not inited");
161 return ERR_DB_NOT_INIT;
162 }
163 return storageEngine_->SetStatusNotifier(notifier);
164 }
165
SyncAllData(const std::string & sessionId,const std::function<void (const std::map<std::string,DistributedDB::DBStatus> &)> & onComplete)166 uint32_t FlatObjectStore::SyncAllData(const std::string &sessionId,
167 const std::function<void(const std::map<std::string, DistributedDB::DBStatus> &)> &onComplete)
168 {
169 if (!storageEngine_->isOpened_ && storageEngine_->Open(bundleName_) != SUCCESS) {
170 LOG_ERROR("FlatObjectStore::DB has not inited");
171 return ERR_DB_NOT_INIT;
172 }
173 std::vector<DeviceInfo> devices = SoftBusAdapter::GetInstance()->GetDeviceList();
174 std::vector<std::string> deviceIds;
175 for (auto item : devices) {
176 deviceIds.push_back(item.deviceId);
177 }
178 return storageEngine_->SyncAllData(sessionId, deviceIds, onComplete);
179 }
180
Save(const std::string & sessionId,const std::string & deviceId)181 uint32_t FlatObjectStore::Save(const std::string &sessionId, const std::string &deviceId)
182 {
183 if (cacheManager_ == nullptr) {
184 LOG_ERROR("FlatObjectStore::cacheManager_ is null");
185 return ERR_NULL_PTR;
186 }
187 std::map<std::string, std::vector<uint8_t>> objectData;
188 uint32_t status = storageEngine_->GetItems(sessionId, objectData);
189 if (status != SUCCESS) {
190 LOG_ERROR("FlatObjectStore::GetItems fail");
191 return status;
192 }
193 return cacheManager_->Save(bundleName_, sessionId, deviceId, objectData);
194 }
195
RevokeSave(const std::string & sessionId)196 uint32_t FlatObjectStore::RevokeSave(const std::string &sessionId)
197 {
198 if (cacheManager_ == nullptr) {
199 LOG_ERROR("FlatObjectStore::cacheManager_ is null");
200 return ERR_NULL_PTR;
201 }
202 return cacheManager_->RevokeSave(bundleName_, sessionId);
203 }
204
CheckRetrieveCache(const std::string & sessionId)205 void FlatObjectStore::CheckRetrieveCache(const std::string &sessionId)
206 {
207 std::lock_guard<std::mutex> lck(mutex_);
208 auto iter = find(retrievedCache_.begin(), retrievedCache_.end(), sessionId);
209 if (iter != retrievedCache_.end()) {
210 storageEngine_->NotifyStatus(*iter, "local", "restored");
211 retrievedCache_.erase(iter);
212 }
213 }
214
FilterData(const std::string & sessionId,std::map<std::string,std::vector<uint8_t>> & data)215 void FlatObjectStore::FilterData(const std::string &sessionId,
216 std::map<std::string, std::vector<uint8_t>> &data)
217 {
218 std::map<std::string, std::vector<uint8_t>> allData {};
219 storageEngine_->GetItems(sessionId, allData);
220 for (const auto &item : allData) {
221 data.erase(item.first);
222 }
223 }
224
CacheManager()225 CacheManager::CacheManager()
226 {
227 }
228
Save(const std::string & bundleName,const std::string & sessionId,const std::string & deviceId,const std::map<std::string,std::vector<uint8_t>> & objectData)229 uint32_t CacheManager::Save(const std::string &bundleName, const std::string &sessionId, const std::string &deviceId,
230 const std::map<std::string, std::vector<uint8_t>> &objectData)
231 {
232 std::unique_lock<std::mutex> lck(mutex_);
233 ConditionLock<int32_t> conditionLock;
234 int32_t status = SaveObject(bundleName, sessionId, deviceId, objectData,
235 [this, &deviceId, &conditionLock](const std::map<std::string, int32_t> &results) {
236 LOG_INFO("CacheManager::task callback");
237 if (results.count(deviceId) != 0) {
238 conditionLock.Notify(results.at(deviceId));
239 } else {
240 conditionLock.Notify(ERR_DB_GET_FAIL);
241 }
242 });
243 if (status != SUCCESS) {
244 LOG_ERROR("SaveObject failed");
245 return status;
246 }
247 LOG_INFO("CacheManager::start wait");
248 status = conditionLock.Wait();
249 LOG_INFO("CacheManager::end wait, %{public}d", status);
250 return status == SUCCESS ? status : ERR_DB_GET_FAIL;
251 }
252
RevokeSave(const std::string & bundleName,const std::string & sessionId)253 uint32_t CacheManager::RevokeSave(const std::string &bundleName, const std::string &sessionId)
254 {
255 std::unique_lock<std::mutex> lck(mutex_);
256 ConditionLock<int32_t> conditionLock;
257 std::function<void(int32_t)> callback = [this, &conditionLock](int32_t result) {
258 LOG_INFO("CacheManager::task callback");
259 conditionLock.Notify(result);
260 };
261 int32_t status = RevokeSaveObject(bundleName, sessionId, callback);
262 if (status != SUCCESS) {
263 LOG_ERROR("RevokeSaveObject failed");
264 return status;
265 }
266 LOG_INFO("CacheManager::start wait");
267 status = conditionLock.Wait();
268 LOG_INFO("CacheManager::end wait, %{public}d", status);
269 return status == SUCCESS ? status : ERR_DB_GET_FAIL;
270 }
271
SaveObject(const std::string & bundleName,const std::string & sessionId,const std::string & deviceId,const std::map<std::string,std::vector<uint8_t>> & objectData,const std::function<void (const std::map<std::string,int32_t> &)> & callback)272 int32_t CacheManager::SaveObject(const std::string &bundleName, const std::string &sessionId,
273 const std::string &deviceId, const std::map<std::string, std::vector<uint8_t>> &objectData,
274 const std::function<void(const std::map<std::string, int32_t> &)> &callback)
275 {
276 sptr<OHOS::DistributedObject::IObjectService> proxy = ClientAdaptor::GetObjectService();
277 if (proxy == nullptr) {
278 LOG_ERROR("proxy is nullptr.");
279 return ERR_PROCESSING;
280 }
281 sptr<IObjectSaveCallback> objectSaveCallback = new (std::nothrow) ObjectSaveCallback(callback);
282 if (objectSaveCallback == nullptr) {
283 LOG_ERROR("CacheManager::SaveObject no memory for ObjectSaveCallback malloc!");
284 return ERR_NULL_PTR;
285 }
286 int32_t status = proxy->ObjectStoreSave(bundleName, sessionId, deviceId, objectData, objectSaveCallback);
287 if (status != SUCCESS) {
288 LOG_ERROR("object save failed code=%d.", static_cast<int>(status));
289 }
290 LOG_INFO("object save successful");
291 return status;
292 }
293
RevokeSaveObject(const std::string & bundleName,const std::string & sessionId,std::function<void (int32_t)> & callback)294 int32_t CacheManager::RevokeSaveObject(
295 const std::string &bundleName, const std::string &sessionId, std::function<void(int32_t)> &callback)
296 {
297 sptr<OHOS::DistributedObject::IObjectService> proxy = ClientAdaptor::GetObjectService();
298 if (proxy == nullptr) {
299 LOG_ERROR("proxy is nullptr.");
300 return ERR_PROCESSING;
301 }
302 sptr<IObjectRevokeSaveCallback> objectRevokeSaveCallback = new (std::nothrow) ObjectRevokeSaveCallback(callback);
303 if (objectRevokeSaveCallback == nullptr) {
304 LOG_ERROR("CacheManager::RevokeSaveObject no memory for ObjectRevokeSaveCallback malloc!");
305 return ERR_NULL_PTR;
306 }
307 int32_t status = proxy->ObjectStoreRevokeSave(bundleName, sessionId, objectRevokeSaveCallback);
308 if (status != SUCCESS) {
309 LOG_ERROR("object revoke save failed code=%d.", static_cast<int>(status));
310 }
311 LOG_INFO("object revoke save successful");
312 return status;
313 }
314
ResumeObject(const std::string & bundleName,const std::string & sessionId,std::function<void (const std::map<std::string,std::vector<uint8_t>> & data)> & callback)315 int32_t CacheManager::ResumeObject(const std::string &bundleName, const std::string &sessionId,
316 std::function<void(const std::map<std::string, std::vector<uint8_t>> &data)> &callback)
317 {
318 sptr<OHOS::DistributedObject::IObjectService> proxy = ClientAdaptor::GetObjectService();
319 if (proxy == nullptr) {
320 LOG_ERROR("proxy is nullptr.");
321 return ERR_NULL_PTR;
322 }
323 sptr<IObjectRetrieveCallback> objectRetrieveCallback = new (std::nothrow)ObjectRetrieveCallback(callback);
324 if (objectRetrieveCallback == nullptr) {
325 LOG_ERROR("CacheManager::ResumeObject no memory for ObjectRetrieveCallback malloc!");
326 return ERR_NULL_PTR;
327 }
328 int32_t status = proxy->ObjectStoreRetrieve(bundleName, sessionId, objectRetrieveCallback);
329 if (status != SUCCESS) {
330 LOG_ERROR("object resume failed code=%d.", static_cast<int>(status));
331 }
332 LOG_INFO("object resume successful");
333 return status;
334 }
335
SubscribeDataChange(const std::string & bundleName,const std::string & sessionId,std::function<void (const std::map<std::string,std::vector<uint8_t>> & data)> & callback)336 int32_t CacheManager::SubscribeDataChange(const std::string &bundleName, const std::string &sessionId,
337 std::function<void(const std::map<std::string, std::vector<uint8_t>> &data)> &callback)
338 {
339 sptr<OHOS::DistributedObject::IObjectService> proxy = ClientAdaptor::GetObjectService();
340 if (proxy == nullptr) {
341 LOG_ERROR("proxy is nullptr.");
342 return ERR_NULL_PTR;
343 }
344 sptr<IObjectChangeCallback> objectRemoteResumeCallback = new (std::nothrow) ObjectChangeCallback(callback);
345 if (objectRemoteResumeCallback == nullptr) {
346 LOG_ERROR("CacheManager::SubscribeDataChange no memory for ObjectChangeCallback malloc!");
347 return ERR_NULL_PTR;
348 }
349 DistributedKv::AppId appId;
350 appId.appId = bundleName;
351 ClientAdaptor::RegisterClientDeathListener(appId, objectRemoteResumeCallback->AsObject());
352 int32_t status = proxy->RegisterDataObserver(bundleName, sessionId, objectRemoteResumeCallback);
353 if (status != SUCCESS) {
354 LOG_ERROR("object remote resume failed code=%d.", static_cast<int>(status));
355 }
356 LOG_INFO("object remote resume successful");
357 return status;
358 }
359
UnregisterDataChange(const std::string & bundleName,const std::string & sessionId)360 int32_t CacheManager::UnregisterDataChange(const std::string &bundleName, const std::string &sessionId)
361 {
362 sptr<OHOS::DistributedObject::IObjectService> proxy = ClientAdaptor::GetObjectService();
363 if (proxy == nullptr) {
364 LOG_ERROR("proxy is nullptr.");
365 return ERR_NULL_PTR;
366 }
367 int32_t status = proxy->UnregisterDataChangeObserver(bundleName, sessionId);
368 if (status != SUCCESS) {
369 LOG_ERROR("object remote resume failed code=%d.", static_cast<int>(status));
370 }
371 LOG_INFO("object unregister data change observer successful");
372 return status;
373 }
374 } // namespace OHOS::ObjectStore
375