• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "db_adapter.h"
17 
18 #include "anonymous_string.h"
19 #include "capability_info.h"
20 #include "capability_info_manager.h"
21 #include "capability_utils.h"
22 #include "constants.h"
23 #include "dh_context.h"
24 #include "distributed_hardware_errno.h"
25 #include "distributed_hardware_log.h"
26 #include "event_bus.h"
27 #include "version_info_event.h"
28 
29 namespace OHOS {
30 namespace DistributedHardware {
31 #undef DH_LOG_TAG
32 #define DH_LOG_TAG "DBAdapter"
33 
34 namespace {
35     constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
36     constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
37     constexpr int32_t MANUAL_SYNC_TIMES = 6;
38     constexpr int32_t MANUAL_SYNC_INTERVAL = 100 * 1000; // 100ms
39     constexpr int32_t DIED_CHECK_MAX_TIMES = 300;
40     constexpr int32_t DIED_CHECK_INTERVAL = 100 * 1000; // 100ms
41     const std::string DATABASE_DIR = "/data/service/el1/public/database/";
42 }
43 
DBAdapter(const std::string & appId,const std::string & storeId,const std::shared_ptr<DistributedKv::KvStoreObserver> & changeListener)44 DBAdapter::DBAdapter(const std::string &appId, const std::string &storeId,
45                      const std::shared_ptr<DistributedKv::KvStoreObserver> &changeListener)
46 {
47     this->appId_.appId = appId;
48     this->storeId_.storeId = storeId;
49     this->dataChangeListener_ = changeListener;
50     DHLOGI("DBAdapter Constructor Success, appId: %s, storeId: %s", appId.c_str(), storeId.c_str());
51 }
52 
~DBAdapter()53 DBAdapter::~DBAdapter()
54 {
55     DHLOGI("DBAdapter Destruction");
56 }
57 
GetKvStorePtr()58 DistributedKv::Status DBAdapter::GetKvStorePtr()
59 {
60     DistributedKv::Options options = {
61         .createIfMissing = true,
62         .encrypt = false,
63         .autoSync = true,
64         .securityLevel = DistributedKv::SecurityLevel::S1,
65         .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
66         .area = DistributedKv::EL1,
67         .baseDir = DATABASE_DIR + appId_.appId
68     };
69     return kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStoragePtr_);
70 }
71 
Init()72 int32_t DBAdapter::Init()
73 {
74     DHLOGI("Init DB, storeId: %s", storeId_.storeId.c_str());
75     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
76     int32_t tryTimes = MAX_INIT_RETRY_TIMES;
77     while (tryTimes > 0) {
78         DistributedKv::Status status = GetKvStorePtr();
79         if (status == DistributedKv::Status::SUCCESS && kvStoragePtr_) {
80             DHLOGI("Init KvStorePtr Success");
81             RegisterManualSyncListener();
82             RegisterChangeListener();
83             RegisterKvStoreDeathListener();
84             return DH_FWK_SUCCESS;
85         }
86         DHLOGD("CheckKvStore, left times: %d", tryTimes);
87         usleep(INIT_RETRY_SLEEP_INTERVAL);
88         tryTimes--;
89     }
90     if (kvStoragePtr_ == nullptr) {
91         DHLOGE("Init KvStorePtr failed");
92         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
93     }
94     return DH_FWK_SUCCESS;
95 }
96 
UnInit()97 void DBAdapter::UnInit()
98 {
99     DHLOGI("DBAdapter UnInit");
100     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
101     if (kvStoragePtr_ == nullptr) {
102         DHLOGE("kvStoragePtr_ is null");
103         return;
104     }
105     UnRegisterKvStoreDeathListener();
106     UnRegisterChangeListener();
107     UnRegisterManualSyncListener();
108     kvStoragePtr_.reset();
109 }
110 
ReInit()111 int32_t DBAdapter::ReInit()
112 {
113     DHLOGI("ReInit DB, storeId: %s", storeId_.storeId.c_str());
114     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
115     if (kvStoragePtr_ == nullptr) {
116         DHLOGE("kvStoragePtr_ is null");
117         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
118     }
119     UnRegisterManualSyncListener();
120     kvStoragePtr_.reset();
121     DistributedKv::Status status = GetKvStorePtr();
122     if (status != DistributedKv::Status::SUCCESS || !kvStoragePtr_) {
123         DHLOGW("Get kvStoragePtr_ failed, status: %d", status);
124         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
125     }
126     RegisterManualSyncListener();
127     RegisterKvStoreDeathListener();
128     return DH_FWK_SUCCESS;
129 }
130 
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)131 void DBAdapter::SyncCompleted(const std::map<std::string, DistributedKv::Status> &results)
132 {
133     DHLOGI("DBAdapter SyncCompleted start");
134     if (results.size() == 0 || results.size() > MAX_DB_RECORD_SIZE) {
135         DHLOGE("Results size is invalid!");
136         return;
137     }
138     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
139     for (const auto &result : results) {
140         std::string deviceId = result.first;
141         DHLOGI("SyncCompleted, deviceId: %s", GetAnonyString(deviceId).c_str());
142         if (manualSyncCountMap_.count(deviceId) == 0) {
143             DHLOGE("SyncCompleted, error, ManualSyncCount is removed");
144             return;
145         }
146         DHLOGI("ManualSyncCallback::SyncCompleted, retryCount: %d", manualSyncCountMap_[deviceId]);
147         if (result.second == DistributedKv::Status::SUCCESS) {
148             manualSyncCountMap_[deviceId] = 0;
149         } else {
150             manualSyncCountMap_[deviceId]++;
151             if (manualSyncCountMap_[deviceId] >= MANUAL_SYNC_TIMES) {
152                 manualSyncCountMap_[deviceId] = 0;
153             } else {
154                 auto retryTask = [this, deviceId] {
155                     this->ManualSync(deviceId);
156                     usleep(MANUAL_SYNC_INTERVAL);
157                 };
158                 DHContext::GetInstance().GetEventBus()->PostTask(retryTask, "retryTask", 0);
159             }
160         }
161     }
162 }
163 
GetDataByKey(const std::string & key,std::string & data)164 int32_t DBAdapter::GetDataByKey(const std::string &key, std::string &data)
165 {
166     DHLOGI("Get data by key: %s", GetAnonyString(key).c_str());
167     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
168     if (kvStoragePtr_ == nullptr) {
169         DHLOGE("kvStoragePtr_ is null");
170         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
171     }
172     DistributedKv::Key kvKey(key);
173     DistributedKv::Value kvValue;
174     DistributedKv::Status status = kvStoragePtr_->Get(kvKey, kvValue);
175     if (status != DistributedKv::Status::SUCCESS) {
176         DHLOGE("Query from db failed, key: %s", GetAnonyString(key).c_str());
177         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
178     }
179     data = kvValue.ToString();
180     return DH_FWK_SUCCESS;
181 }
182 
GetDataByKeyPrefix(const std::string & keyPrefix,std::vector<std::string> & values)183 int32_t DBAdapter::GetDataByKeyPrefix(const std::string &keyPrefix, std::vector<std::string> &values)
184 {
185     DHLOGI("Get data by key prefix: %s", GetAnonyString(keyPrefix).c_str());
186     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
187     if (kvStoragePtr_ == nullptr) {
188         DHLOGE("kvStoragePtr_ is null");
189         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
190     }
191 
192     // if prefix is empty, get all entries.
193     DistributedKv::Key allEntryKeyPrefix(keyPrefix);
194     std::vector<DistributedKv::Entry> allEntries;
195     DistributedKv::Status status = kvStoragePtr_->GetEntries(allEntryKeyPrefix, allEntries);
196     if (status != DistributedKv::Status::SUCCESS) {
197         DHLOGE("Query data by keyPrefix failed, prefix: %s",
198             GetAnonyString(keyPrefix).c_str());
199         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
200     }
201     if (allEntries.size() == 0 || allEntries.size() > MAX_DB_RECORD_SIZE) {
202         DHLOGE("AllEntries size is invalid!");
203         return ERR_DH_FWK_PARA_INVALID;
204     }
205     for (const auto& item : allEntries) {
206         values.push_back(item.value.ToString());
207     }
208     return DH_FWK_SUCCESS;
209 }
210 
PutData(const std::string & key,const std::string & value)211 int32_t DBAdapter::PutData(const std::string &key, const std::string &value)
212 {
213     if (key.empty() || key.size() > MAX_MESSAGE_LEN || value.empty() || value.size() > MAX_MESSAGE_LEN) {
214         DHLOGI("Param is invalid!");
215         return ERR_DH_FWK_PARA_INVALID;
216     }
217     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
218     if (kvStoragePtr_ == nullptr) {
219         DHLOGE("kvStoragePtr_ is null");
220         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
221     }
222     DistributedKv::Key kvKey(key);
223     DistributedKv::Value kvValue(value);
224     DistributedKv::Status status = kvStoragePtr_->Put(kvKey, kvValue);
225     if (status == DistributedKv::Status::IPC_ERROR) {
226         DHLOGE("Put kv to db failed, ret: %d", status);
227         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
228     }
229     return DH_FWK_SUCCESS;
230 }
231 
PutDataBatch(const std::vector<std::string> & keys,const std::vector<std::string> & values)232 int32_t DBAdapter::PutDataBatch(const std::vector<std::string> &keys, const std::vector<std::string> &values)
233 {
234     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
235     if (kvStoragePtr_ == nullptr) {
236         DHLOGE("kvStoragePtr_ is null");
237         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
238     }
239     if (keys.size() != values.size() || keys.empty() || values.empty()) {
240         DHLOGE("Param is invalid!");
241         return ERR_DH_FWK_PARA_INVALID;
242     }
243     std::vector<DistributedKv::Entry> entries;
244     for (unsigned long i = 0; i < keys.size(); i++) {
245         DistributedKv::Entry entry;
246         entry.key = keys[i];
247         entry.value = values[i];
248         entries.push_back(entry);
249     }
250     DistributedKv::Status status = kvStoragePtr_->PutBatch(entries);
251     if (status != DistributedKv::Status::SUCCESS) {
252         DHLOGE("Put kv batch to db failed, ret: %d", status);
253         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
254     }
255     DHLOGI("Put kv batch to db success");
256     return DH_FWK_SUCCESS;
257 }
258 
CreateManualSyncCount(const std::string & deviceId)259 void DBAdapter::CreateManualSyncCount(const std::string &deviceId)
260 {
261     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
262     manualSyncCountMap_[deviceId] = 0;
263 }
264 
RemoveManualSyncCount(const std::string & deviceId)265 void DBAdapter::RemoveManualSyncCount(const std::string &deviceId)
266 {
267     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
268     manualSyncCountMap_.erase(deviceId);
269 }
270 
ManualSync(const std::string & networkId)271 int32_t DBAdapter::ManualSync(const std::string &networkId)
272 {
273     DHLOGI("Manual sync between networkId: %s", GetAnonyString(networkId).c_str());
274     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
275     if (kvStoragePtr_ == nullptr) {
276         DHLOGE("kvStoragePtr_ is null");
277         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
278     }
279     std::vector<std::string> devList = { networkId };
280     DistributedKv::Status status = kvStoragePtr_->Sync(devList, DistributedKv::SyncMode::PULL);
281     if (status != DistributedKv::Status::SUCCESS) {
282         DHLOGE("ManualSync Data failed, networkId: %s", GetAnonyString(networkId).c_str());
283         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
284     }
285     return DH_FWK_SUCCESS;
286 }
287 
SyncDBForRecover()288 void DBAdapter::SyncDBForRecover()
289 {
290     DHLOGI("Sync store id: %s after db recover", storeId_.storeId.c_str());
291     if (storeId_.storeId == GLOBAL_CAPABILITY_ID) {
292         CapabilityInfoEvent recoverEvent(*this, CapabilityInfoEvent::EventType::RECOVER);
293         DHContext::GetInstance().GetEventBus()->PostEvent<CapabilityInfoEvent>(recoverEvent);
294     }
295 
296     if (storeId_.storeId == GLOBAL_VERSION_ID) {
297         VersionInfoEvent recoverEvent(*this, VersionInfoEvent::EventType::RECOVER);
298         DHContext::GetInstance().GetEventBus()->PostEvent<VersionInfoEvent>(recoverEvent);
299     }
300 }
301 
RegisterChangeListener()302 int32_t DBAdapter::RegisterChangeListener()
303 {
304     DHLOGI("Register db data change listener");
305     if (kvStoragePtr_ == nullptr) {
306         DHLOGE("kvStoragePtr_ is null");
307         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
308     }
309     DistributedKv::Status status = kvStoragePtr_->SubscribeKvStore(DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE,
310         dataChangeListener_);
311     if (status == DistributedKv::Status::IPC_ERROR) {
312         DHLOGE("Register db data change listener failed, ret: %d", status);
313         return ERR_DH_FWK_RESOURCE_REGISTER_DB_FAILED;
314     }
315     return DH_FWK_SUCCESS;
316 }
317 
UnRegisterChangeListener()318 int32_t DBAdapter::UnRegisterChangeListener()
319 {
320     DHLOGI("UnRegister db data change listener");
321     if (kvStoragePtr_ == nullptr) {
322         DHLOGE("kvStoragePtr_ is null");
323         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
324     }
325     DistributedKv::Status status = kvStoragePtr_->UnSubscribeKvStore(
326         DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE, dataChangeListener_);
327     if (status == DistributedKv::Status::IPC_ERROR) {
328         DHLOGE("UnRegister db data change listener failed, ret: %d", status);
329         return ERR_DH_FWK_RESOURCE_UNREGISTER_DB_FAILED;
330     }
331     return DH_FWK_SUCCESS;
332 }
333 
RegisterKvStoreDeathListener()334 void DBAdapter::RegisterKvStoreDeathListener()
335 {
336     DHLOGI("Register kvStore death listener");
337     kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
338 }
339 
UnRegisterKvStoreDeathListener()340 void DBAdapter::UnRegisterKvStoreDeathListener()
341 {
342     DHLOGI("UnRegister kvStore death listener");
343     kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
344 }
345 
RegisterManualSyncListener()346 void DBAdapter::RegisterManualSyncListener()
347 {
348     DHLOGI("Register manualSyncCallback");
349     if (kvStoragePtr_ == nullptr) {
350         DHLOGE("kvStoragePtr_ is null");
351         return;
352     }
353     kvStoragePtr_->RegisterSyncCallback(shared_from_this());
354 }
355 
UnRegisterManualSyncListener()356 void DBAdapter::UnRegisterManualSyncListener()
357 {
358     DHLOGI("UnRegister manualSyncCallback");
359     if (kvStoragePtr_ == nullptr) {
360         DHLOGE("kvStoragePtr_ is null");
361         return;
362     }
363     kvStoragePtr_->UnRegisterSyncCallback();
364 }
365 
OnRemoteDied()366 void DBAdapter::OnRemoteDied()
367 {
368     DHLOGI("OnRemoteDied, recover db begin");
369     auto reInitTask = [this] {
370         int32_t times = 0;
371         while (times < DIED_CHECK_MAX_TIMES) {
372             // init kvStore.
373             if (this->ReInit() == DH_FWK_SUCCESS) {
374                 // register data change listener again.
375                 this->RegisterChangeListener();
376                 this->SyncDBForRecover();
377                 DHLOGE("Current times is %d", times);
378                 break;
379             }
380             times++;
381             usleep(DIED_CHECK_INTERVAL);
382         }
383     };
384     DHContext::GetInstance().GetEventBus()->PostTask(reInitTask, "reInitTask", 0);
385     DHLOGI("OnRemoteDied, recover db end");
386 }
387 
DeleteKvStore()388 void DBAdapter::DeleteKvStore()
389 {
390     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
391     DistributedKv::Status status = kvDataMgr_.DeleteKvStore(appId_, storeId_);
392     if (status != DistributedKv::Status::SUCCESS) {
393         DHLOGE("DeleteKvStore error, appId: %s, storeId: %s, status: %d",
394             appId_.appId.c_str(), storeId_.storeId.c_str(), status);
395         return;
396     }
397     DHLOGI("DeleteKvStore success appId: %s", appId_.appId.c_str());
398 }
399 
RemoveDeviceData(const std::string & deviceId)400 int32_t DBAdapter::RemoveDeviceData(const std::string &deviceId)
401 {
402     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
403     if (kvStoragePtr_ == nullptr) {
404         DHLOGE("kvStoragePtr_ is null");
405         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
406     }
407     DistributedKv::Status status = kvStoragePtr_->RemoveDeviceData(deviceId);
408     if (status != DistributedKv::Status::SUCCESS) {
409         DHLOGE("Remove device data failed, deviceId: %s", GetAnonyString(deviceId).c_str());
410         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
411     }
412     DHLOGD("Remove device data success, deviceId: %s", GetAnonyString(deviceId).c_str());
413     return DH_FWK_SUCCESS;
414 }
415 
RemoveDataByKey(const std::string & key)416 int32_t DBAdapter::RemoveDataByKey(const std::string &key)
417 {
418     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
419     if (kvStoragePtr_ == nullptr) {
420         DHLOGE("kvStoragePtr_ is null");
421         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
422     }
423     DistributedKv::Key kvKey(key);
424     DistributedKv::Status status = kvStoragePtr_->Delete(kvKey);
425     if (status != DistributedKv::Status::SUCCESS) {
426         DHLOGE("Remove data by key failed");
427         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
428     }
429     DHLOGD("Remove data by key success");
430     return DH_FWK_SUCCESS;
431 }
432 } // namespace DistributedHardware
433 } // namespace OHOS
434