1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "kv_adapter.h"
16
17 #include <cinttypes>
18 #include <mutex>
19 #include <unistd.h>
20
21 #include "cJSON.h"
22 #include "datetime_ex.h"
23 #include "string_ex.h"
24
25 #include "data_query.h"
26 #include "dm_anonymous.h"
27 #include "dm_error_type.h"
28 #include "dm_log.h"
29 #include "ffrt.h"
30
31 namespace OHOS {
32 namespace DistributedHardware {
33 using namespace OHOS::DistributedKv;
34 namespace {
35 const std::string APP_ID = "distributed_device_manager_service";
36 const std::string STORE_ID = "dm_kv_store";
37 const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service";
38 const std::string KV_REINIT_THREAD = "reinit_kv_store";
39 constexpr uint32_t MAX_BATCH_SIZE = 128;
40 constexpr int32_t MAX_STRING_LEN = 4096;
41 constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
42 constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
43 constexpr uint32_t DM_OSTYPE_PREFIX_LEN = 16;
44 const char* PEER_UDID = "peer_udid";
45 const char* PEER_OSTYPE = "peer_ostype";
46 const char* TIME_STAMP = "time_stamp";
47 }
48
Init()49 int32_t KVAdapter::Init()
50 {
51 LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_));
52 if (isInited_.load()) {
53 LOGI("Local DB already inited.");
54 return DM_OK;
55 }
56 this->appId_.appId = APP_ID;
57 this->storeId_.storeId = STORE_ID;
58 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
59 int32_t tryTimes = MAX_INIT_RETRY_TIMES;
60 while (tryTimes > 0) {
61 DistributedKv::Status status = GetLocalKvStorePtr();
62 if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) {
63 LOGI("Init KvStorePtr Success");
64 RegisterKvStoreDeathListener();
65 isInited_.store(true);
66 return DM_OK;
67 }
68 LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status);
69 if (status == DistributedKv::Status::STORE_META_CHANGED ||
70 status == DistributedKv::Status::SECURITY_LEVEL_ERROR ||
71 status == DistributedKv::Status::DATA_CORRUPTED) {
72 LOGE("init db error, remove and rebuild it");
73 DeleteKvStore();
74 }
75 usleep(INIT_RETRY_SLEEP_INTERVAL);
76 tryTimes--;
77 }
78 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED);
79 isInited_.store(true);
80 return DM_OK;
81 }
82
UnInit()83 void KVAdapter::UnInit()
84 {
85 LOGI("KVAdapter Uninted");
86 if (isInited_.load()) {
87 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
88 CHECK_NULL_VOID(kvStorePtr_);
89 UnregisterKvStoreDeathListener();
90 kvStorePtr_.reset();
91 isInited_.store(false);
92 }
93 }
94
ReInit()95 int32_t KVAdapter::ReInit()
96 {
97 LOGI("KVAdapter ReInit");
98 UnInit();
99 return Init();
100 }
101
Put(const std::string & key,const std::string & value)102 int32_t KVAdapter::Put(const std::string &key, const std::string &value)
103 {
104 if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) {
105 LOGE("Param is invalid!");
106 return ERR_DM_FAILED;
107 }
108 DistributedKv::Status status;
109 {
110 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
111 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
112
113 DistributedKv::Key kvKey(key);
114 DistributedKv::Value kvValue(value);
115 status = kvStorePtr_->Put(kvKey, kvValue);
116 }
117 if (status != DistributedKv::Status::SUCCESS) {
118 LOGE("Put kv to db failed, ret: %{public}d", status);
119 return ERR_DM_FAILED;
120 }
121 return DM_OK;
122 }
123
Get(const std::string & key,std::string & value)124 int32_t KVAdapter::Get(const std::string &key, std::string &value)
125 {
126 LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str());
127 DistributedKv::Key kvKey(key);
128 DistributedKv::Value kvValue;
129 DistributedKv::Status status;
130 {
131 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
132 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
133 status = kvStorePtr_->Get(kvKey, kvValue);
134 }
135 if (status != DistributedKv::Status::SUCCESS) {
136 LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str());
137 return ERR_DM_FAILED;
138 }
139 value = kvValue.ToString();
140 return DM_OK;
141 }
142
OnRemoteDied()143 void KVAdapter::OnRemoteDied()
144 {
145 LOGI("OnRemoteDied, recover db begin");
146 auto reInitTask = [this]() {
147 LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str());
148 ReInit();
149 };
150 ffrt::submit(reInitTask);
151 }
152
GetLocalKvStorePtr()153 DistributedKv::Status KVAdapter::GetLocalKvStorePtr()
154 {
155 DistributedKv::Options options = {
156 .createIfMissing = true,
157 .encrypt = false,
158 .autoSync = false,
159 .securityLevel = DistributedKv::SecurityLevel::S1,
160 .area = DistributedKv::EL1,
161 .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
162 .baseDir = DATABASE_DIR
163 };
164 DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
165 return status;
166 }
167
RegisterKvStoreDeathListener()168 void KVAdapter::RegisterKvStoreDeathListener()
169 {
170 LOGI("Register syncCompleted listener");
171 kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
172 }
173
UnregisterKvStoreDeathListener()174 void KVAdapter::UnregisterKvStoreDeathListener()
175 {
176 LOGI("UnRegister death listener");
177 kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
178 }
179
DeleteKvStore()180 int32_t KVAdapter::DeleteKvStore()
181 {
182 LOGI("Delete KvStore!");
183 kvDataMgr_.CloseKvStore(appId_, storeId_);
184 kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR);
185 return DM_OK;
186 }
187
DeleteByAppId(const std::string & appId,const std::string & prefix)188 int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix)
189 {
190 if (appId.empty()) {
191 LOGE("appId is empty");
192 return ERR_DM_FAILED;
193 }
194 std::vector<DistributedKv::Entry> localEntries;
195 {
196 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
197 if (kvStorePtr_ == nullptr) {
198 LOGE("kvStoragePtr_ is null");
199 return ERR_DM_POINT_NULL;
200 }
201 if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) {
202 LOGE("Get entrys from DB failed.");
203 return ERR_DM_FAILED;
204 }
205 }
206 std::vector<std::string> delKeys;
207 for (const auto &entry : localEntries) {
208 delKeys.emplace_back(entry.key.ToString());
209 DmKVValue kvValue;
210 ConvertJsonToDmKVValue(entry.value.ToString(), kvValue);
211 delKeys.emplace_back(prefix + kvValue.anoyDeviceId);
212 }
213 return DeleteBatch(delKeys);
214 }
215
DeleteBatch(const std::vector<std::string> & keys)216 int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys)
217 {
218 if (keys.empty()) {
219 LOGE("keys size(%{public}zu) is invalid!", keys.size());
220 return ERR_DM_FAILED;
221 }
222 uint32_t keysSize = static_cast<uint32_t>(keys.size());
223 std::vector<std::vector<DistributedKv::Key>> delKeyBatches;
224 for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) {
225 uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE);
226 auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end);
227 std::vector<DistributedKv::Key> delKeys;
228 for (auto item : batch) {
229 DistributedKv::Key key(item);
230 delKeys.emplace_back(key);
231 }
232 delKeyBatches.emplace_back(delKeys);
233 }
234
235 {
236 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
237 if (kvStorePtr_ == nullptr) {
238 LOGE("kvStorePtr is nullptr!");
239 return ERR_DM_POINT_NULL;
240 }
241 for (auto delKeys : delKeyBatches) {
242 DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys);
243 if (status != DistributedKv::Status::SUCCESS) {
244 LOGE("DeleteBatch failed!");
245 return ERR_DM_FAILED;
246 }
247 }
248 }
249 return DM_OK;
250 }
251
Delete(const std::string & key)252 int32_t KVAdapter::Delete(const std::string& key)
253 {
254 DistributedKv::Status status;
255 {
256 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
257 if (kvStorePtr_ == nullptr) {
258 LOGE("kvStorePtr is nullptr!");
259 return ERR_DM_POINT_NULL;
260 }
261 DistributedKv::Key kvKey(key);
262 status = kvStorePtr_->Delete(kvKey);
263 }
264 if (status != DistributedKv::Status::SUCCESS) {
265 LOGE("Delete kv by key failed!");
266 return ERR_DM_FAILED;
267 }
268 return DM_OK;
269 }
270
GetAllOstypeData(const std::string & key,std::vector<std::string> & values)271 int32_t KVAdapter::GetAllOstypeData(const std::string &key, std::vector<std::string> &values)
272 {
273 if (key.empty()) {
274 LOGE("key is empty");
275 return ERR_DM_FAILED;
276 }
277 std::vector<DistributedKv::Entry> localEntries;
278 {
279 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
280 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
281 if (kvStorePtr_->GetEntries(key, localEntries) != DistributedKv::Status::SUCCESS) {
282 LOGE("Get entrys from DB failed.");
283 return ERR_DM_FAILED;
284 }
285 }
286 values.clear();
287 for (const auto &entry : localEntries) {
288 JsonObject osTyoeJson(entry.value.ToString());
289 if (osTyoeJson.IsDiscarded() || !IsInt32(osTyoeJson, PEER_OSTYPE) || !IsInt64(osTyoeJson, TIME_STAMP)) {
290 LOGE("entry parse error.");
291 continue;
292 }
293 if (entry.key.ToString().size() < DM_OSTYPE_PREFIX_LEN) {
294 LOGE("entry value invalid.");
295 continue;
296 }
297 JsonObject jsonObj;
298 jsonObj[PEER_UDID] = entry.key.ToString().substr(DM_OSTYPE_PREFIX_LEN);
299 jsonObj[PEER_OSTYPE] = osTyoeJson[PEER_OSTYPE].Get<int32_t>();
300 jsonObj[TIME_STAMP] = osTyoeJson[TIME_STAMP].Get<int64_t>();
301 values.push_back(jsonObj.Dump());
302 }
303 return DM_OK;
304 }
305
GetOstypeCountByPrefix(const std::string & prefix,int32_t & count)306 int32_t KVAdapter::GetOstypeCountByPrefix(const std::string &prefix, int32_t &count)
307 {
308 LOGI("prefix %{public}s.", prefix.c_str());
309 if (prefix.empty()) {
310 LOGE("prefix is empty.");
311 return ERR_DM_FAILED;
312 }
313 {
314 std::lock_guard<std::mutex> lock(kvAdapterMutex_);
315 CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
316 DataQuery prefixQuery;
317 prefixQuery.KeyPrefix(prefix);
318 if (kvStorePtr_->GetCount(prefixQuery, count) != DistributedKv::Status::SUCCESS) {
319 LOGE("GetCount failed.");
320 return ERR_DM_FAILED;
321 }
322 }
323 return DM_OK;
324 }
325 } // namespace DistributedHardware
326 } // namespace OHOS
327