• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "db_adapter.h"
17 
18 #include "anonymous_string.h"
19 #include "capability_info.h"
20 #include "capability_info_manager.h"
21 #include "capability_utils.h"
22 #include "constants.h"
23 #include "dh_context.h"
24 #include "distributed_hardware_errno.h"
25 #include "distributed_hardware_log.h"
26 #include "event_bus.h"
27 
28 namespace OHOS {
29 namespace DistributedHardware {
30 #undef DH_LOG_TAG
31 #define DH_LOG_TAG "DBAdapter"
32 
DBAdapter(const std::string & appId,const std::string & storeId,const std::shared_ptr<DistributedKv::KvStoreObserver> & changeListener)33 DBAdapter::DBAdapter(const std::string &appId, const std::string &storeId,
34                      const std::shared_ptr<DistributedKv::KvStoreObserver> &changeListener)
35 {
36     this->appId_.appId = appId;
37     this->storeId_.storeId = storeId;
38     this->dataChangeListener_ = changeListener;
39     DHLOGI("DBAdapter Constructor Success, appId: %s, storeId: %s", appId.c_str(), storeId.c_str());
40 }
41 
~DBAdapter()42 DBAdapter::~DBAdapter()
43 {
44     DHLOGI("DBAdapter Destruction");
45 }
46 
GetKvStorePtr()47 DistributedKv::Status DBAdapter::GetKvStorePtr()
48 {
49     DistributedKv::Options options = {
50         .createIfMissing = true,
51         .encrypt = false,
52         .autoSync = true,
53         .securityLevel = DistributedKv::SecurityLevel::S1,
54         .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION
55     };
56     return kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStoragePtr_);
57 }
58 
Init()59 int32_t DBAdapter::Init()
60 {
61     DHLOGI("Init DB, storeId: %s", storeId_.storeId.c_str());
62     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
63     int32_t tryTimes = MAX_INIT_RETRY_TIMES;
64     while (tryTimes > 0) {
65         DistributedKv::Status status = GetKvStorePtr();
66         if (status == DistributedKv::Status::SUCCESS && kvStoragePtr_) {
67             DHLOGI("Init KvStorePtr Success");
68             RegisterManualSyncListener();
69             RegisterChangeListener();
70             RegisterKvStoreDeathListener();
71             return DH_FWK_SUCCESS;
72         }
73         DHLOGD("CheckKvStore, left times: %d", tryTimes);
74         usleep(INIT_RETRY_SLEEP_INTERVAL);
75         tryTimes--;
76     }
77     if (kvStoragePtr_ == nullptr) {
78         DHLOGE("Init KvStorePtr failed");
79         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
80     }
81     return DH_FWK_SUCCESS;
82 }
83 
UnInit()84 void DBAdapter::UnInit()
85 {
86     DHLOGI("DBAdapter UnInit");
87     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
88     if (kvStoragePtr_ == nullptr) {
89         DHLOGE("kvStoragePtr_ is null");
90         return;
91     }
92     UnRegisterKvStoreDeathListener();
93     UnRegisterChangeListener();
94     UnRegisterManualSyncListener();
95     kvStoragePtr_.reset();
96 }
97 
ReInit()98 int32_t DBAdapter::ReInit()
99 {
100     DHLOGI("ReInit DB, storeId: %s", storeId_.storeId.c_str());
101     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
102     if (kvStoragePtr_ == nullptr) {
103         DHLOGE("kvStoragePtr_ is null");
104         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
105     }
106     UnRegisterManualSyncListener();
107     kvStoragePtr_.reset();
108     DistributedKv::Status status = GetKvStorePtr();
109     if (status != DistributedKv::Status::SUCCESS || !kvStoragePtr_) {
110         DHLOGW("Get kvStoragePtr_ failed, status: %d", status);
111         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
112     }
113     RegisterManualSyncListener();
114     RegisterKvStoreDeathListener();
115     return DH_FWK_SUCCESS;
116 }
117 
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)118 void DBAdapter::SyncCompleted(const std::map<std::string, DistributedKv::Status> &results)
119 {
120     DHLOGI("DBAdapter SyncCompleted start");
121     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
122     for (const auto &result : results) {
123         std::string deviceId = result.first;
124         DHLOGI("SyncCompleted, deviceId: %s", GetAnonyString(deviceId).c_str());
125         if (manualSyncCountMap_.count(deviceId) == 0) {
126             DHLOGE("SyncCompleted, error, ManualSyncCount is removed");
127             return;
128         }
129         DHLOGI("ManualSyncCallback::SyncCompleted, retryCount: %d", manualSyncCountMap_[deviceId]);
130         if (result.second == DistributedKv::Status::SUCCESS) {
131             manualSyncCountMap_[deviceId] = 0;
132         } else {
133             manualSyncCountMap_[deviceId]++;
134             if (manualSyncCountMap_[deviceId] >= MANUAL_SYNC_TIMES) {
135                 manualSyncCountMap_[deviceId] = 0;
136             } else {
137                 auto retryTask = [this, deviceId] {
138                     this->ManualSync(deviceId);
139                     usleep(MANUAL_SYNC_INTERVAL);
140                 };
141                 DHContext::GetInstance().GetEventBus()->PostTask(retryTask, "retryTask", 0);
142             }
143         }
144     }
145 }
146 
GetDataByKey(const std::string & key,std::string & data)147 int32_t DBAdapter::GetDataByKey(const std::string &key, std::string &data)
148 {
149     DHLOGI("Get data by key: %s", GetAnonyString(key).c_str());
150     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
151     if (kvStoragePtr_ == nullptr) {
152         DHLOGE("kvStoragePtr_ is null");
153         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
154     }
155     DistributedKv::Key kvKey(key);
156     DistributedKv::Value kvValue;
157     DistributedKv::Status status = kvStoragePtr_->Get(kvKey, kvValue);
158     if (status != DistributedKv::Status::SUCCESS) {
159         DHLOGE("Query from db failed, key: %s", GetAnonyString(key).c_str());
160         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
161     }
162     data = kvValue.ToString();
163     return DH_FWK_SUCCESS;
164 }
165 
GetDataByKeyPrefix(const std::string & keyPrefix,std::vector<std::string> & values)166 int32_t DBAdapter::GetDataByKeyPrefix(const std::string &keyPrefix, std::vector<std::string> &values)
167 {
168     DHLOGI("Get data by key prefix: %s", GetAnonyString(keyPrefix).c_str());
169     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
170     if (kvStoragePtr_ == nullptr) {
171         DHLOGE("kvStoragePtr_ is null");
172         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
173     }
174 
175     // if prefix is empty, get all entries.
176     DistributedKv::Key allEntryKeyPrefix(keyPrefix);
177     std::vector<DistributedKv::Entry> allEntries;
178     DistributedKv::Status status = kvStoragePtr_->GetEntries(allEntryKeyPrefix, allEntries);
179     if (status != DistributedKv::Status::SUCCESS) {
180         DHLOGE("Query data by keyPrefix failed, prefix: %s",
181             GetAnonyString(keyPrefix).c_str());
182         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
183     }
184     for (const auto& item : allEntries) {
185         values.push_back(item.value.ToString());
186     }
187     return DH_FWK_SUCCESS;
188 }
189 
PutData(const std::string & key,std::string & value)190 int32_t DBAdapter::PutData(const std::string &key, std::string &value)
191 {
192     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
193     if (kvStoragePtr_ == nullptr) {
194         DHLOGE("kvStoragePtr_ is null");
195         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
196     }
197     DistributedKv::Key kvKey(key);
198     DistributedKv::Value kvValue(value);
199     DistributedKv::Status status = kvStoragePtr_->Put(kvKey, kvValue);
200     if (status == DistributedKv::Status::IPC_ERROR) {
201         DHLOGE("Put kv to db failed, ret: %d", status);
202         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
203     }
204     return DH_FWK_SUCCESS;
205 }
206 
PutDataBatch(const std::vector<std::string> & keys,const std::vector<std::string> & values)207 int32_t DBAdapter::PutDataBatch(const std::vector<std::string> &keys, const std::vector<std::string> &values)
208 {
209     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
210     if (kvStoragePtr_ == nullptr) {
211         DHLOGE("kvStoragePtr_ is null");
212         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
213     }
214     if (keys.size() != values.size()) {
215         DHLOGE("Param invalid");
216         return ERR_DH_FWK_PARA_INVALID;
217     }
218     std::vector<DistributedKv::Entry> entries;
219     for (unsigned long i = 0; i < keys.size(); i++) {
220         DistributedKv::Entry entry;
221         entry.key = keys[i];
222         entry.value = values[i];
223         entries.push_back(entry);
224     }
225     DistributedKv::Status status = kvStoragePtr_->PutBatch(entries);
226     if (status != DistributedKv::Status::SUCCESS) {
227         DHLOGE("Put kv batch to db failed, ret: %d", status);
228         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
229     }
230     DHLOGI("Put kv batch to db success");
231     return DH_FWK_SUCCESS;
232 }
233 
CreateManualSyncCount(const std::string & deviceId)234 void DBAdapter::CreateManualSyncCount(const std::string &deviceId)
235 {
236     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
237     manualSyncCountMap_[deviceId] = 0;
238 }
239 
RemoveManualSyncCount(const std::string & deviceId)240 void DBAdapter::RemoveManualSyncCount(const std::string &deviceId)
241 {
242     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
243     manualSyncCountMap_.erase(deviceId);
244 }
245 
ManualSync(const std::string & networkId)246 int32_t DBAdapter::ManualSync(const std::string &networkId)
247 {
248     DHLOGI("Manual sync between networkId: %s", GetAnonyString(networkId).c_str());
249     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
250     if (kvStoragePtr_ == nullptr) {
251         DHLOGE("kvStoragePtr_ is null");
252         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
253     }
254     std::vector<std::string> devList = { networkId };
255     DistributedKv::Status status = kvStoragePtr_->Sync(devList, DistributedKv::SyncMode::PULL);
256     if (status != DistributedKv::Status::SUCCESS) {
257         DHLOGE("ManualSync Data failed, networkId: %s", GetAnonyString(networkId).c_str());
258         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
259     }
260     return DH_FWK_SUCCESS;
261 }
262 
SyncDBForRecover()263 void DBAdapter::SyncDBForRecover()
264 {
265     DHLOGI("Sync store id: %s after db recover", storeId_.storeId.c_str());
266     CapabilityInfoEvent recoverEvent(*this, CapabilityInfoEvent::EventType::RECOVER);
267     DHContext::GetInstance().GetEventBus()->PostEvent<CapabilityInfoEvent>(recoverEvent);
268 }
269 
RegisterChangeListener()270 int32_t DBAdapter::RegisterChangeListener()
271 {
272     DHLOGI("Register db data change listener");
273     if (kvStoragePtr_ == nullptr) {
274         DHLOGE("kvStoragePtr_ is null");
275         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
276     }
277     DistributedKv::Status status = kvStoragePtr_->SubscribeKvStore(DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE,
278         dataChangeListener_);
279     if (status == DistributedKv::Status::IPC_ERROR) {
280         DHLOGE("Register db data change listener failed, ret: %d", status);
281         return ERR_DH_FWK_RESOURCE_REGISTER_DB_FAILED;
282     }
283     return DH_FWK_SUCCESS;
284 }
285 
UnRegisterChangeListener()286 int32_t DBAdapter::UnRegisterChangeListener()
287 {
288     DHLOGI("UnRegister db data change listener");
289     if (kvStoragePtr_ == nullptr) {
290         DHLOGE("kvStoragePtr_ is null");
291         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
292     }
293     DistributedKv::Status status = kvStoragePtr_->UnSubscribeKvStore(
294         DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE, dataChangeListener_);
295     if (status == DistributedKv::Status::IPC_ERROR) {
296         DHLOGE("UnRegister db data change listener failed, ret: %d", status);
297         return ERR_DH_FWK_RESOURCE_UNREGISTER_DB_FAILED;
298     }
299     return DH_FWK_SUCCESS;
300 }
301 
RegisterKvStoreDeathListener()302 void DBAdapter::RegisterKvStoreDeathListener()
303 {
304     DHLOGI("Register kvStore death listener");
305     kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
306 }
307 
UnRegisterKvStoreDeathListener()308 void DBAdapter::UnRegisterKvStoreDeathListener()
309 {
310     DHLOGI("UnRegister kvStore death listener");
311     kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
312 }
313 
RegisterManualSyncListener()314 void DBAdapter::RegisterManualSyncListener()
315 {
316     DHLOGI("Register manualSyncCallback");
317     if (kvStoragePtr_ == nullptr) {
318         DHLOGE("kvStoragePtr_ is null");
319         return;
320     }
321     kvStoragePtr_->RegisterSyncCallback(shared_from_this());
322 }
323 
UnRegisterManualSyncListener()324 void DBAdapter::UnRegisterManualSyncListener()
325 {
326     DHLOGI("UnRegister manualSyncCallback");
327     if (kvStoragePtr_ == nullptr) {
328         DHLOGE("kvStoragePtr_ is null");
329         return;
330     }
331     kvStoragePtr_->UnRegisterSyncCallback();
332 }
333 
OnRemoteDied()334 void DBAdapter::OnRemoteDied()
335 {
336     DHLOGI("OnRemoteDied, recover db begin");
337     auto ReInitTask = [this] {
338         int32_t times = 0;
339         while (times < DIED_CHECK_MAX_TIMES) {
340             // init kvStore.
341             if (this->ReInit() == DH_FWK_SUCCESS) {
342                 // register data change listener again.
343                 this->RegisterChangeListener();
344                 this->SyncDBForRecover();
345                 DHLOGE("Current times is %d", times);
346                 break;
347             }
348             times++;
349             usleep(DIED_CHECK_INTERVAL);
350         }
351     };
352     DHContext::GetInstance().GetEventBus()->PostTask(ReInitTask, "ReInitTask", 0);
353     DHLOGI("OnRemoteDied, recover db end");
354 }
355 
DeleteKvStore()356 void DBAdapter::DeleteKvStore()
357 {
358     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
359     DistributedKv::Status status = kvDataMgr_.DeleteKvStore(appId_, storeId_);
360     if (status != DistributedKv::Status::SUCCESS) {
361         DHLOGE("DeleteKvStore error, appId: %s, storeId: %s, status: %d",
362             appId_.appId.c_str(), storeId_.storeId.c_str(), status);
363         return;
364     }
365     DHLOGI("DeleteKvStore success appId: %s", appId_.appId.c_str());
366 }
367 
RemoveDeviceData(const std::string & deviceId)368 int32_t DBAdapter::RemoveDeviceData(const std::string &deviceId)
369 {
370     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
371     if (kvStoragePtr_ == nullptr) {
372         DHLOGE("kvStoragePtr_ is null");
373         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
374     }
375     DistributedKv::Status status = kvStoragePtr_->RemoveDeviceData(deviceId);
376     if (status != DistributedKv::Status::SUCCESS) {
377         DHLOGE("Remove device data failed, deviceId: %s", GetAnonyString(deviceId).c_str());
378         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
379     }
380     DHLOGD("Remove device data success, deviceId: %s", GetAnonyString(deviceId).c_str());
381     return DH_FWK_SUCCESS;
382 }
383 
RemoveDataByKey(const std::string & key)384 int32_t DBAdapter::RemoveDataByKey(const std::string &key)
385 {
386     std::lock_guard<std::mutex> lock(dbAdapterMutex_);
387     if (kvStoragePtr_ == nullptr) {
388         DHLOGE("kvStoragePtr_ is null");
389         return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
390     }
391     DistributedKv::Key kvKey(key);
392     DistributedKv::Status status = kvStoragePtr_->Delete(kvKey);
393     if (status != DistributedKv::Status::SUCCESS) {
394         DHLOGE("Remove data by key failed");
395         return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
396     }
397     DHLOGD("Remove data by key success");
398     return DH_FWK_SUCCESS;
399 }
400 }
401 }
402