1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "db_adapter.h"
17
18 #include "anonymous_string.h"
19 #include "capability_info.h"
20 #include "capability_info_manager.h"
21 #include "capability_utils.h"
22 #include "constants.h"
23 #include "dh_context.h"
24 #include "distributed_hardware_errno.h"
25 #include "distributed_hardware_log.h"
26 #include "event_bus.h"
27 #include "version_info_event.h"
28
29 namespace OHOS {
30 namespace DistributedHardware {
31 #undef DH_LOG_TAG
32 #define DH_LOG_TAG "DBAdapter"
33
34 namespace {
35 constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
36 constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
37 constexpr int32_t MANUAL_SYNC_TIMES = 6;
38 constexpr int32_t MANUAL_SYNC_INTERVAL = 100 * 1000; // 100ms
39 constexpr int32_t DIED_CHECK_MAX_TIMES = 300;
40 constexpr int32_t DIED_CHECK_INTERVAL = 100 * 1000; // 100ms
41 const std::string DATABASE_DIR = "/data/service/el1/public/database/";
42 }
43
DBAdapter(const std::string & appId,const std::string & storeId,const std::shared_ptr<DistributedKv::KvStoreObserver> & changeListener)44 DBAdapter::DBAdapter(const std::string &appId, const std::string &storeId,
45 const std::shared_ptr<DistributedKv::KvStoreObserver> &changeListener)
46 {
47 this->appId_.appId = appId;
48 this->storeId_.storeId = storeId;
49 this->dataChangeListener_ = changeListener;
50 DHLOGI("DBAdapter Constructor Success, appId: %s, storeId: %s", appId.c_str(), storeId.c_str());
51 }
52
~DBAdapter()53 DBAdapter::~DBAdapter()
54 {
55 DHLOGI("DBAdapter Destruction");
56 }
57
GetKvStorePtr()58 DistributedKv::Status DBAdapter::GetKvStorePtr()
59 {
60 DistributedKv::Options options = {
61 .createIfMissing = true,
62 .encrypt = false,
63 .autoSync = true,
64 .securityLevel = DistributedKv::SecurityLevel::S1,
65 .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
66 .area = DistributedKv::EL1,
67 .baseDir = DATABASE_DIR + appId_.appId
68 };
69 return kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStoragePtr_);
70 }
71
Init()72 int32_t DBAdapter::Init()
73 {
74 DHLOGI("Init DB, storeId: %s", storeId_.storeId.c_str());
75 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
76 int32_t tryTimes = MAX_INIT_RETRY_TIMES;
77 while (tryTimes > 0) {
78 DistributedKv::Status status = GetKvStorePtr();
79 if (status == DistributedKv::Status::SUCCESS && kvStoragePtr_) {
80 DHLOGI("Init KvStorePtr Success");
81 RegisterManualSyncListener();
82 RegisterChangeListener();
83 RegisterKvStoreDeathListener();
84 return DH_FWK_SUCCESS;
85 }
86 DHLOGD("CheckKvStore, left times: %d", tryTimes);
87 usleep(INIT_RETRY_SLEEP_INTERVAL);
88 tryTimes--;
89 }
90 if (kvStoragePtr_ == nullptr) {
91 DHLOGE("Init KvStorePtr failed");
92 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
93 }
94 return DH_FWK_SUCCESS;
95 }
96
UnInit()97 void DBAdapter::UnInit()
98 {
99 DHLOGI("DBAdapter UnInit");
100 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
101 if (kvStoragePtr_ == nullptr) {
102 DHLOGE("kvStoragePtr_ is null");
103 return;
104 }
105 UnRegisterKvStoreDeathListener();
106 UnRegisterChangeListener();
107 UnRegisterManualSyncListener();
108 kvStoragePtr_.reset();
109 }
110
ReInit()111 int32_t DBAdapter::ReInit()
112 {
113 DHLOGI("ReInit DB, storeId: %s", storeId_.storeId.c_str());
114 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
115 if (kvStoragePtr_ == nullptr) {
116 DHLOGE("kvStoragePtr_ is null");
117 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
118 }
119 UnRegisterManualSyncListener();
120 kvStoragePtr_.reset();
121 DistributedKv::Status status = GetKvStorePtr();
122 if (status != DistributedKv::Status::SUCCESS || !kvStoragePtr_) {
123 DHLOGW("Get kvStoragePtr_ failed, status: %d", status);
124 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
125 }
126 RegisterManualSyncListener();
127 RegisterKvStoreDeathListener();
128 return DH_FWK_SUCCESS;
129 }
130
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)131 void DBAdapter::SyncCompleted(const std::map<std::string, DistributedKv::Status> &results)
132 {
133 DHLOGI("DBAdapter SyncCompleted start");
134 if (results.size() == 0 || results.size() > MAX_DB_RECORD_SIZE) {
135 DHLOGE("Results size is invalid!");
136 return;
137 }
138 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
139 for (const auto &result : results) {
140 std::string deviceId = result.first;
141 DHLOGI("SyncCompleted, deviceId: %s", GetAnonyString(deviceId).c_str());
142 if (manualSyncCountMap_.count(deviceId) == 0) {
143 DHLOGE("SyncCompleted, error, ManualSyncCount is removed");
144 return;
145 }
146 DHLOGI("ManualSyncCallback::SyncCompleted, retryCount: %d", manualSyncCountMap_[deviceId]);
147 if (result.second == DistributedKv::Status::SUCCESS) {
148 manualSyncCountMap_[deviceId] = 0;
149 } else {
150 manualSyncCountMap_[deviceId]++;
151 if (manualSyncCountMap_[deviceId] >= MANUAL_SYNC_TIMES) {
152 manualSyncCountMap_[deviceId] = 0;
153 } else {
154 auto retryTask = [this, deviceId] {
155 this->ManualSync(deviceId);
156 usleep(MANUAL_SYNC_INTERVAL);
157 };
158 DHContext::GetInstance().GetEventBus()->PostTask(retryTask, "retryTask", 0);
159 }
160 }
161 }
162 }
163
GetDataByKey(const std::string & key,std::string & data)164 int32_t DBAdapter::GetDataByKey(const std::string &key, std::string &data)
165 {
166 DHLOGI("Get data by key: %s", GetAnonyString(key).c_str());
167 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
168 if (kvStoragePtr_ == nullptr) {
169 DHLOGE("kvStoragePtr_ is null");
170 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
171 }
172 DistributedKv::Key kvKey(key);
173 DistributedKv::Value kvValue;
174 DistributedKv::Status status = kvStoragePtr_->Get(kvKey, kvValue);
175 if (status != DistributedKv::Status::SUCCESS) {
176 DHLOGE("Query from db failed, key: %s", GetAnonyString(key).c_str());
177 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
178 }
179 data = kvValue.ToString();
180 return DH_FWK_SUCCESS;
181 }
182
GetDataByKeyPrefix(const std::string & keyPrefix,std::vector<std::string> & values)183 int32_t DBAdapter::GetDataByKeyPrefix(const std::string &keyPrefix, std::vector<std::string> &values)
184 {
185 DHLOGI("Get data by key prefix: %s", GetAnonyString(keyPrefix).c_str());
186 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
187 if (kvStoragePtr_ == nullptr) {
188 DHLOGE("kvStoragePtr_ is null");
189 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
190 }
191
192 // if prefix is empty, get all entries.
193 DistributedKv::Key allEntryKeyPrefix(keyPrefix);
194 std::vector<DistributedKv::Entry> allEntries;
195 DistributedKv::Status status = kvStoragePtr_->GetEntries(allEntryKeyPrefix, allEntries);
196 if (status != DistributedKv::Status::SUCCESS) {
197 DHLOGE("Query data by keyPrefix failed, prefix: %s",
198 GetAnonyString(keyPrefix).c_str());
199 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
200 }
201 if (allEntries.size() == 0 || allEntries.size() > MAX_DB_RECORD_SIZE) {
202 DHLOGE("AllEntries size is invalid!");
203 return ERR_DH_FWK_PARA_INVALID;
204 }
205 for (const auto& item : allEntries) {
206 values.push_back(item.value.ToString());
207 }
208 return DH_FWK_SUCCESS;
209 }
210
PutData(const std::string & key,const std::string & value)211 int32_t DBAdapter::PutData(const std::string &key, const std::string &value)
212 {
213 if (key.empty() || key.size() > MAX_MESSAGE_LEN || value.empty() || value.size() > MAX_MESSAGE_LEN) {
214 DHLOGI("Param is invalid!");
215 return ERR_DH_FWK_PARA_INVALID;
216 }
217 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
218 if (kvStoragePtr_ == nullptr) {
219 DHLOGE("kvStoragePtr_ is null");
220 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
221 }
222 DistributedKv::Key kvKey(key);
223 DistributedKv::Value kvValue(value);
224 DistributedKv::Status status = kvStoragePtr_->Put(kvKey, kvValue);
225 if (status == DistributedKv::Status::IPC_ERROR) {
226 DHLOGE("Put kv to db failed, ret: %d", status);
227 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
228 }
229 return DH_FWK_SUCCESS;
230 }
231
PutDataBatch(const std::vector<std::string> & keys,const std::vector<std::string> & values)232 int32_t DBAdapter::PutDataBatch(const std::vector<std::string> &keys, const std::vector<std::string> &values)
233 {
234 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
235 if (kvStoragePtr_ == nullptr) {
236 DHLOGE("kvStoragePtr_ is null");
237 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
238 }
239 if (keys.size() != values.size() || keys.empty() || values.empty()) {
240 DHLOGE("Param is invalid!");
241 return ERR_DH_FWK_PARA_INVALID;
242 }
243 std::vector<DistributedKv::Entry> entries;
244 for (unsigned long i = 0; i < keys.size(); i++) {
245 DistributedKv::Entry entry;
246 entry.key = keys[i];
247 entry.value = values[i];
248 entries.push_back(entry);
249 }
250 DistributedKv::Status status = kvStoragePtr_->PutBatch(entries);
251 if (status != DistributedKv::Status::SUCCESS) {
252 DHLOGE("Put kv batch to db failed, ret: %d", status);
253 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
254 }
255 DHLOGI("Put kv batch to db success");
256 return DH_FWK_SUCCESS;
257 }
258
CreateManualSyncCount(const std::string & deviceId)259 void DBAdapter::CreateManualSyncCount(const std::string &deviceId)
260 {
261 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
262 manualSyncCountMap_[deviceId] = 0;
263 }
264
RemoveManualSyncCount(const std::string & deviceId)265 void DBAdapter::RemoveManualSyncCount(const std::string &deviceId)
266 {
267 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
268 manualSyncCountMap_.erase(deviceId);
269 }
270
ManualSync(const std::string & networkId)271 int32_t DBAdapter::ManualSync(const std::string &networkId)
272 {
273 DHLOGI("Manual sync between networkId: %s", GetAnonyString(networkId).c_str());
274 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
275 if (kvStoragePtr_ == nullptr) {
276 DHLOGE("kvStoragePtr_ is null");
277 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
278 }
279 std::vector<std::string> devList = { networkId };
280 DistributedKv::Status status = kvStoragePtr_->Sync(devList, DistributedKv::SyncMode::PULL);
281 if (status != DistributedKv::Status::SUCCESS) {
282 DHLOGE("ManualSync Data failed, networkId: %s", GetAnonyString(networkId).c_str());
283 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
284 }
285 return DH_FWK_SUCCESS;
286 }
287
SyncDBForRecover()288 void DBAdapter::SyncDBForRecover()
289 {
290 DHLOGI("Sync store id: %s after db recover", storeId_.storeId.c_str());
291 if (storeId_.storeId == GLOBAL_CAPABILITY_ID) {
292 CapabilityInfoEvent recoverEvent(*this, CapabilityInfoEvent::EventType::RECOVER);
293 DHContext::GetInstance().GetEventBus()->PostEvent<CapabilityInfoEvent>(recoverEvent);
294 }
295
296 if (storeId_.storeId == GLOBAL_VERSION_ID) {
297 VersionInfoEvent recoverEvent(*this, VersionInfoEvent::EventType::RECOVER);
298 DHContext::GetInstance().GetEventBus()->PostEvent<VersionInfoEvent>(recoverEvent);
299 }
300 }
301
RegisterChangeListener()302 int32_t DBAdapter::RegisterChangeListener()
303 {
304 DHLOGI("Register db data change listener");
305 if (kvStoragePtr_ == nullptr) {
306 DHLOGE("kvStoragePtr_ is null");
307 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
308 }
309 DistributedKv::Status status = kvStoragePtr_->SubscribeKvStore(DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE,
310 dataChangeListener_);
311 if (status == DistributedKv::Status::IPC_ERROR) {
312 DHLOGE("Register db data change listener failed, ret: %d", status);
313 return ERR_DH_FWK_RESOURCE_REGISTER_DB_FAILED;
314 }
315 return DH_FWK_SUCCESS;
316 }
317
UnRegisterChangeListener()318 int32_t DBAdapter::UnRegisterChangeListener()
319 {
320 DHLOGI("UnRegister db data change listener");
321 if (kvStoragePtr_ == nullptr) {
322 DHLOGE("kvStoragePtr_ is null");
323 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
324 }
325 DistributedKv::Status status = kvStoragePtr_->UnSubscribeKvStore(
326 DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE, dataChangeListener_);
327 if (status == DistributedKv::Status::IPC_ERROR) {
328 DHLOGE("UnRegister db data change listener failed, ret: %d", status);
329 return ERR_DH_FWK_RESOURCE_UNREGISTER_DB_FAILED;
330 }
331 return DH_FWK_SUCCESS;
332 }
333
RegisterKvStoreDeathListener()334 void DBAdapter::RegisterKvStoreDeathListener()
335 {
336 DHLOGI("Register kvStore death listener");
337 kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
338 }
339
UnRegisterKvStoreDeathListener()340 void DBAdapter::UnRegisterKvStoreDeathListener()
341 {
342 DHLOGI("UnRegister kvStore death listener");
343 kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
344 }
345
RegisterManualSyncListener()346 void DBAdapter::RegisterManualSyncListener()
347 {
348 DHLOGI("Register manualSyncCallback");
349 if (kvStoragePtr_ == nullptr) {
350 DHLOGE("kvStoragePtr_ is null");
351 return;
352 }
353 kvStoragePtr_->RegisterSyncCallback(shared_from_this());
354 }
355
UnRegisterManualSyncListener()356 void DBAdapter::UnRegisterManualSyncListener()
357 {
358 DHLOGI("UnRegister manualSyncCallback");
359 if (kvStoragePtr_ == nullptr) {
360 DHLOGE("kvStoragePtr_ is null");
361 return;
362 }
363 kvStoragePtr_->UnRegisterSyncCallback();
364 }
365
OnRemoteDied()366 void DBAdapter::OnRemoteDied()
367 {
368 DHLOGI("OnRemoteDied, recover db begin");
369 auto reInitTask = [this] {
370 int32_t times = 0;
371 while (times < DIED_CHECK_MAX_TIMES) {
372 // init kvStore.
373 if (this->ReInit() == DH_FWK_SUCCESS) {
374 // register data change listener again.
375 this->RegisterChangeListener();
376 this->SyncDBForRecover();
377 DHLOGE("Current times is %d", times);
378 break;
379 }
380 times++;
381 usleep(DIED_CHECK_INTERVAL);
382 }
383 };
384 DHContext::GetInstance().GetEventBus()->PostTask(reInitTask, "reInitTask", 0);
385 DHLOGI("OnRemoteDied, recover db end");
386 }
387
DeleteKvStore()388 void DBAdapter::DeleteKvStore()
389 {
390 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
391 DistributedKv::Status status = kvDataMgr_.DeleteKvStore(appId_, storeId_);
392 if (status != DistributedKv::Status::SUCCESS) {
393 DHLOGE("DeleteKvStore error, appId: %s, storeId: %s, status: %d",
394 appId_.appId.c_str(), storeId_.storeId.c_str(), status);
395 return;
396 }
397 DHLOGI("DeleteKvStore success appId: %s", appId_.appId.c_str());
398 }
399
RemoveDeviceData(const std::string & deviceId)400 int32_t DBAdapter::RemoveDeviceData(const std::string &deviceId)
401 {
402 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
403 if (kvStoragePtr_ == nullptr) {
404 DHLOGE("kvStoragePtr_ is null");
405 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
406 }
407 DistributedKv::Status status = kvStoragePtr_->RemoveDeviceData(deviceId);
408 if (status != DistributedKv::Status::SUCCESS) {
409 DHLOGE("Remove device data failed, deviceId: %s", GetAnonyString(deviceId).c_str());
410 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
411 }
412 DHLOGD("Remove device data success, deviceId: %s", GetAnonyString(deviceId).c_str());
413 return DH_FWK_SUCCESS;
414 }
415
RemoveDataByKey(const std::string & key)416 int32_t DBAdapter::RemoveDataByKey(const std::string &key)
417 {
418 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
419 if (kvStoragePtr_ == nullptr) {
420 DHLOGE("kvStoragePtr_ is null");
421 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
422 }
423 DistributedKv::Key kvKey(key);
424 DistributedKv::Status status = kvStoragePtr_->Delete(kvKey);
425 if (status != DistributedKv::Status::SUCCESS) {
426 DHLOGE("Remove data by key failed");
427 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
428 }
429 DHLOGD("Remove data by key success");
430 return DH_FWK_SUCCESS;
431 }
432 } // namespace DistributedHardware
433 } // namespace OHOS
434