1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "db_adapter.h"
17
18 #include "anonymous_string.h"
19 #include "capability_info.h"
20 #include "capability_info_manager.h"
21 #include "capability_utils.h"
22 #include "constants.h"
23 #include "dh_context.h"
24 #include "distributed_hardware_errno.h"
25 #include "distributed_hardware_log.h"
26 #include "event_bus.h"
27
28 namespace OHOS {
29 namespace DistributedHardware {
30 #undef DH_LOG_TAG
31 #define DH_LOG_TAG "DBAdapter"
32
DBAdapter(const std::string & appId,const std::string & storeId,const std::shared_ptr<DistributedKv::KvStoreObserver> & changeListener)33 DBAdapter::DBAdapter(const std::string &appId, const std::string &storeId,
34 const std::shared_ptr<DistributedKv::KvStoreObserver> &changeListener)
35 {
36 this->appId_.appId = appId;
37 this->storeId_.storeId = storeId;
38 this->dataChangeListener_ = changeListener;
39 DHLOGI("DBAdapter Constructor Success, appId: %s, storeId: %s", appId.c_str(), storeId.c_str());
40 }
41
~DBAdapter()42 DBAdapter::~DBAdapter()
43 {
44 DHLOGI("DBAdapter Destruction");
45 }
46
GetKvStorePtr()47 DistributedKv::Status DBAdapter::GetKvStorePtr()
48 {
49 DistributedKv::Options options = {
50 .createIfMissing = true,
51 .encrypt = false,
52 .autoSync = true,
53 .securityLevel = DistributedKv::SecurityLevel::S1,
54 .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION
55 };
56 return kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStoragePtr_);
57 }
58
Init()59 int32_t DBAdapter::Init()
60 {
61 DHLOGI("Init DB, storeId: %s", storeId_.storeId.c_str());
62 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
63 int32_t tryTimes = MAX_INIT_RETRY_TIMES;
64 while (tryTimes > 0) {
65 DistributedKv::Status status = GetKvStorePtr();
66 if (status == DistributedKv::Status::SUCCESS && kvStoragePtr_) {
67 DHLOGI("Init KvStorePtr Success");
68 RegisterManualSyncListener();
69 RegisterChangeListener();
70 RegisterKvStoreDeathListener();
71 return DH_FWK_SUCCESS;
72 }
73 DHLOGD("CheckKvStore, left times: %d", tryTimes);
74 usleep(INIT_RETRY_SLEEP_INTERVAL);
75 tryTimes--;
76 }
77 if (kvStoragePtr_ == nullptr) {
78 DHLOGE("Init KvStorePtr failed");
79 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
80 }
81 return DH_FWK_SUCCESS;
82 }
83
UnInit()84 void DBAdapter::UnInit()
85 {
86 DHLOGI("DBAdapter UnInit");
87 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
88 if (kvStoragePtr_ == nullptr) {
89 DHLOGE("kvStoragePtr_ is null");
90 return;
91 }
92 UnRegisterKvStoreDeathListener();
93 UnRegisterChangeListener();
94 UnRegisterManualSyncListener();
95 kvStoragePtr_.reset();
96 }
97
ReInit()98 int32_t DBAdapter::ReInit()
99 {
100 DHLOGI("ReInit DB, storeId: %s", storeId_.storeId.c_str());
101 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
102 if (kvStoragePtr_ == nullptr) {
103 DHLOGE("kvStoragePtr_ is null");
104 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
105 }
106 UnRegisterManualSyncListener();
107 kvStoragePtr_.reset();
108 DistributedKv::Status status = GetKvStorePtr();
109 if (status != DistributedKv::Status::SUCCESS || !kvStoragePtr_) {
110 DHLOGW("Get kvStoragePtr_ failed, status: %d", status);
111 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
112 }
113 RegisterManualSyncListener();
114 RegisterKvStoreDeathListener();
115 return DH_FWK_SUCCESS;
116 }
117
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)118 void DBAdapter::SyncCompleted(const std::map<std::string, DistributedKv::Status> &results)
119 {
120 DHLOGI("DBAdapter SyncCompleted start");
121 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
122 for (const auto &result : results) {
123 std::string deviceId = result.first;
124 DHLOGI("SyncCompleted, deviceId: %s", GetAnonyString(deviceId).c_str());
125 if (manualSyncCountMap_.count(deviceId) == 0) {
126 DHLOGE("SyncCompleted, error, ManualSyncCount is removed");
127 return;
128 }
129 DHLOGI("ManualSyncCallback::SyncCompleted, retryCount: %d", manualSyncCountMap_[deviceId]);
130 if (result.second == DistributedKv::Status::SUCCESS) {
131 manualSyncCountMap_[deviceId] = 0;
132 } else {
133 manualSyncCountMap_[deviceId]++;
134 if (manualSyncCountMap_[deviceId] >= MANUAL_SYNC_TIMES) {
135 manualSyncCountMap_[deviceId] = 0;
136 } else {
137 auto retryTask = [this, deviceId] {
138 this->ManualSync(deviceId);
139 usleep(MANUAL_SYNC_INTERVAL);
140 };
141 DHContext::GetInstance().GetEventBus()->PostTask(retryTask, "retryTask", 0);
142 }
143 }
144 }
145 }
146
GetDataByKey(const std::string & key,std::string & data)147 int32_t DBAdapter::GetDataByKey(const std::string &key, std::string &data)
148 {
149 DHLOGI("Get data by key: %s", GetAnonyString(key).c_str());
150 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
151 if (kvStoragePtr_ == nullptr) {
152 DHLOGE("kvStoragePtr_ is null");
153 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
154 }
155 DistributedKv::Key kvKey(key);
156 DistributedKv::Value kvValue;
157 DistributedKv::Status status = kvStoragePtr_->Get(kvKey, kvValue);
158 if (status != DistributedKv::Status::SUCCESS) {
159 DHLOGE("Query from db failed, key: %s", GetAnonyString(key).c_str());
160 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
161 }
162 data = kvValue.ToString();
163 return DH_FWK_SUCCESS;
164 }
165
GetDataByKeyPrefix(const std::string & keyPrefix,std::vector<std::string> & values)166 int32_t DBAdapter::GetDataByKeyPrefix(const std::string &keyPrefix, std::vector<std::string> &values)
167 {
168 DHLOGI("Get data by key prefix: %s", GetAnonyString(keyPrefix).c_str());
169 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
170 if (kvStoragePtr_ == nullptr) {
171 DHLOGE("kvStoragePtr_ is null");
172 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
173 }
174
175 // if prefix is empty, get all entries.
176 DistributedKv::Key allEntryKeyPrefix(keyPrefix);
177 std::vector<DistributedKv::Entry> allEntries;
178 DistributedKv::Status status = kvStoragePtr_->GetEntries(allEntryKeyPrefix, allEntries);
179 if (status != DistributedKv::Status::SUCCESS) {
180 DHLOGE("Query data by keyPrefix failed, prefix: %s",
181 GetAnonyString(keyPrefix).c_str());
182 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
183 }
184 for (const auto& item : allEntries) {
185 values.push_back(item.value.ToString());
186 }
187 return DH_FWK_SUCCESS;
188 }
189
PutData(const std::string & key,std::string & value)190 int32_t DBAdapter::PutData(const std::string &key, std::string &value)
191 {
192 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
193 if (kvStoragePtr_ == nullptr) {
194 DHLOGE("kvStoragePtr_ is null");
195 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
196 }
197 DistributedKv::Key kvKey(key);
198 DistributedKv::Value kvValue(value);
199 DistributedKv::Status status = kvStoragePtr_->Put(kvKey, kvValue);
200 if (status == DistributedKv::Status::IPC_ERROR) {
201 DHLOGE("Put kv to db failed, ret: %d", status);
202 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
203 }
204 return DH_FWK_SUCCESS;
205 }
206
PutDataBatch(const std::vector<std::string> & keys,const std::vector<std::string> & values)207 int32_t DBAdapter::PutDataBatch(const std::vector<std::string> &keys, const std::vector<std::string> &values)
208 {
209 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
210 if (kvStoragePtr_ == nullptr) {
211 DHLOGE("kvStoragePtr_ is null");
212 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
213 }
214 if (keys.size() != values.size()) {
215 DHLOGE("Param invalid");
216 return ERR_DH_FWK_PARA_INVALID;
217 }
218 std::vector<DistributedKv::Entry> entries;
219 for (unsigned long i = 0; i < keys.size(); i++) {
220 DistributedKv::Entry entry;
221 entry.key = keys[i];
222 entry.value = values[i];
223 entries.push_back(entry);
224 }
225 DistributedKv::Status status = kvStoragePtr_->PutBatch(entries);
226 if (status != DistributedKv::Status::SUCCESS) {
227 DHLOGE("Put kv batch to db failed, ret: %d", status);
228 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
229 }
230 DHLOGI("Put kv batch to db success");
231 return DH_FWK_SUCCESS;
232 }
233
CreateManualSyncCount(const std::string & deviceId)234 void DBAdapter::CreateManualSyncCount(const std::string &deviceId)
235 {
236 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
237 manualSyncCountMap_[deviceId] = 0;
238 }
239
RemoveManualSyncCount(const std::string & deviceId)240 void DBAdapter::RemoveManualSyncCount(const std::string &deviceId)
241 {
242 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
243 manualSyncCountMap_.erase(deviceId);
244 }
245
ManualSync(const std::string & networkId)246 int32_t DBAdapter::ManualSync(const std::string &networkId)
247 {
248 DHLOGI("Manual sync between networkId: %s", GetAnonyString(networkId).c_str());
249 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
250 if (kvStoragePtr_ == nullptr) {
251 DHLOGE("kvStoragePtr_ is null");
252 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
253 }
254 std::vector<std::string> devList = { networkId };
255 DistributedKv::Status status = kvStoragePtr_->Sync(devList, DistributedKv::SyncMode::PULL);
256 if (status != DistributedKv::Status::SUCCESS) {
257 DHLOGE("ManualSync Data failed, networkId: %s", GetAnonyString(networkId).c_str());
258 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
259 }
260 return DH_FWK_SUCCESS;
261 }
262
SyncDBForRecover()263 void DBAdapter::SyncDBForRecover()
264 {
265 DHLOGI("Sync store id: %s after db recover", storeId_.storeId.c_str());
266 CapabilityInfoEvent recoverEvent(*this, CapabilityInfoEvent::EventType::RECOVER);
267 DHContext::GetInstance().GetEventBus()->PostEvent<CapabilityInfoEvent>(recoverEvent);
268 }
269
RegisterChangeListener()270 int32_t DBAdapter::RegisterChangeListener()
271 {
272 DHLOGI("Register db data change listener");
273 if (kvStoragePtr_ == nullptr) {
274 DHLOGE("kvStoragePtr_ is null");
275 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
276 }
277 DistributedKv::Status status = kvStoragePtr_->SubscribeKvStore(DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE,
278 dataChangeListener_);
279 if (status == DistributedKv::Status::IPC_ERROR) {
280 DHLOGE("Register db data change listener failed, ret: %d", status);
281 return ERR_DH_FWK_RESOURCE_REGISTER_DB_FAILED;
282 }
283 return DH_FWK_SUCCESS;
284 }
285
UnRegisterChangeListener()286 int32_t DBAdapter::UnRegisterChangeListener()
287 {
288 DHLOGI("UnRegister db data change listener");
289 if (kvStoragePtr_ == nullptr) {
290 DHLOGE("kvStoragePtr_ is null");
291 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
292 }
293 DistributedKv::Status status = kvStoragePtr_->UnSubscribeKvStore(
294 DistributedKv::SubscribeType::SUBSCRIBE_TYPE_REMOTE, dataChangeListener_);
295 if (status == DistributedKv::Status::IPC_ERROR) {
296 DHLOGE("UnRegister db data change listener failed, ret: %d", status);
297 return ERR_DH_FWK_RESOURCE_UNREGISTER_DB_FAILED;
298 }
299 return DH_FWK_SUCCESS;
300 }
301
RegisterKvStoreDeathListener()302 void DBAdapter::RegisterKvStoreDeathListener()
303 {
304 DHLOGI("Register kvStore death listener");
305 kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
306 }
307
UnRegisterKvStoreDeathListener()308 void DBAdapter::UnRegisterKvStoreDeathListener()
309 {
310 DHLOGI("UnRegister kvStore death listener");
311 kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
312 }
313
RegisterManualSyncListener()314 void DBAdapter::RegisterManualSyncListener()
315 {
316 DHLOGI("Register manualSyncCallback");
317 if (kvStoragePtr_ == nullptr) {
318 DHLOGE("kvStoragePtr_ is null");
319 return;
320 }
321 kvStoragePtr_->RegisterSyncCallback(shared_from_this());
322 }
323
UnRegisterManualSyncListener()324 void DBAdapter::UnRegisterManualSyncListener()
325 {
326 DHLOGI("UnRegister manualSyncCallback");
327 if (kvStoragePtr_ == nullptr) {
328 DHLOGE("kvStoragePtr_ is null");
329 return;
330 }
331 kvStoragePtr_->UnRegisterSyncCallback();
332 }
333
OnRemoteDied()334 void DBAdapter::OnRemoteDied()
335 {
336 DHLOGI("OnRemoteDied, recover db begin");
337 auto ReInitTask = [this] {
338 int32_t times = 0;
339 while (times < DIED_CHECK_MAX_TIMES) {
340 // init kvStore.
341 if (this->ReInit() == DH_FWK_SUCCESS) {
342 // register data change listener again.
343 this->RegisterChangeListener();
344 this->SyncDBForRecover();
345 DHLOGE("Current times is %d", times);
346 break;
347 }
348 times++;
349 usleep(DIED_CHECK_INTERVAL);
350 }
351 };
352 DHContext::GetInstance().GetEventBus()->PostTask(ReInitTask, "ReInitTask", 0);
353 DHLOGI("OnRemoteDied, recover db end");
354 }
355
DeleteKvStore()356 void DBAdapter::DeleteKvStore()
357 {
358 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
359 DistributedKv::Status status = kvDataMgr_.DeleteKvStore(appId_, storeId_);
360 if (status != DistributedKv::Status::SUCCESS) {
361 DHLOGE("DeleteKvStore error, appId: %s, storeId: %s, status: %d",
362 appId_.appId.c_str(), storeId_.storeId.c_str(), status);
363 return;
364 }
365 DHLOGI("DeleteKvStore success appId: %s", appId_.appId.c_str());
366 }
367
RemoveDeviceData(const std::string & deviceId)368 int32_t DBAdapter::RemoveDeviceData(const std::string &deviceId)
369 {
370 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
371 if (kvStoragePtr_ == nullptr) {
372 DHLOGE("kvStoragePtr_ is null");
373 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
374 }
375 DistributedKv::Status status = kvStoragePtr_->RemoveDeviceData(deviceId);
376 if (status != DistributedKv::Status::SUCCESS) {
377 DHLOGE("Remove device data failed, deviceId: %s", GetAnonyString(deviceId).c_str());
378 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
379 }
380 DHLOGD("Remove device data success, deviceId: %s", GetAnonyString(deviceId).c_str());
381 return DH_FWK_SUCCESS;
382 }
383
RemoveDataByKey(const std::string & key)384 int32_t DBAdapter::RemoveDataByKey(const std::string &key)
385 {
386 std::lock_guard<std::mutex> lock(dbAdapterMutex_);
387 if (kvStoragePtr_ == nullptr) {
388 DHLOGE("kvStoragePtr_ is null");
389 return ERR_DH_FWK_RESOURCE_KV_STORAGE_POINTER_NULL;
390 }
391 DistributedKv::Key kvKey(key);
392 DistributedKv::Status status = kvStoragePtr_->Delete(kvKey);
393 if (status != DistributedKv::Status::SUCCESS) {
394 DHLOGE("Remove data by key failed");
395 return ERR_DH_FWK_RESOURCE_KV_STORAGE_OPERATION_FAIL;
396 }
397 DHLOGD("Remove data by key success");
398 return DH_FWK_SUCCESS;
399 }
400 }
401 }
402