• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "kv_adapter.h"
16 
17 #include <cinttypes>
18 #include <mutex>
19 #include <unistd.h>
20 
21 #include "cJSON.h"
22 #include "datetime_ex.h"
23 #include "string_ex.h"
24 
25 #include "data_query.h"
26 #include "dm_anonymous.h"
27 #include "dm_error_type.h"
28 #include "dm_log.h"
29 #include "ffrt.h"
30 
31 namespace OHOS {
32 namespace DistributedHardware {
33 using namespace OHOS::DistributedKv;
34 namespace {
35     const std::string APP_ID = "distributed_device_manager_service";
36     const std::string STORE_ID = "dm_kv_store";
37     const std::string DATABASE_DIR = "/data/service/el1/public/database/distributed_device_manager_service";
38     const std::string KV_REINIT_THREAD = "reinit_kv_store";
39     constexpr uint32_t MAX_BATCH_SIZE = 128;
40     constexpr int32_t MAX_STRING_LEN = 4096;
41     constexpr int32_t MAX_INIT_RETRY_TIMES = 20;
42     constexpr int32_t INIT_RETRY_SLEEP_INTERVAL = 200 * 1000; // 200ms
43     constexpr uint32_t DM_OSTYPE_PREFIX_LEN = 16;
44     const char* PEER_UDID = "peer_udid";
45     const char* PEER_OSTYPE = "peer_ostype";
46     const char* TIME_STAMP = "time_stamp";
47 }
48 
Init()49 int32_t KVAdapter::Init()
50 {
51     LOGI("Init local DB, dataType: %{public}d", static_cast<int32_t>(dataType_));
52     if (isInited_.load()) {
53         LOGI("Local DB already inited.");
54         return DM_OK;
55     }
56     this->appId_.appId = APP_ID;
57     this->storeId_.storeId = STORE_ID;
58     std::lock_guard<std::mutex> lock(kvAdapterMutex_);
59     int32_t tryTimes = MAX_INIT_RETRY_TIMES;
60     while (tryTimes > 0) {
61         DistributedKv::Status status = GetLocalKvStorePtr();
62         if (status == DistributedKv::Status::SUCCESS && kvStorePtr_) {
63             LOGI("Init KvStorePtr Success");
64             RegisterKvStoreDeathListener();
65             isInited_.store(true);
66             return DM_OK;
67         }
68         LOGE("CheckKvStore, left times: %{public}d, status: %{public}d", tryTimes, status);
69         if (status == DistributedKv::Status::STORE_META_CHANGED ||
70             status == DistributedKv::Status::SECURITY_LEVEL_ERROR ||
71             status == DistributedKv::Status::DATA_CORRUPTED) {
72             LOGE("init db error, remove and rebuild it");
73             DeleteKvStore();
74         }
75         usleep(INIT_RETRY_SLEEP_INTERVAL);
76         tryTimes--;
77     }
78     CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_INIT_FAILED);
79     isInited_.store(true);
80     return DM_OK;
81 }
82 
UnInit()83 void KVAdapter::UnInit()
84 {
85     LOGI("KVAdapter Uninted");
86     if (isInited_.load()) {
87         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
88         CHECK_NULL_VOID(kvStorePtr_);
89         UnregisterKvStoreDeathListener();
90         kvStorePtr_.reset();
91         isInited_.store(false);
92     }
93 }
94 
ReInit()95 int32_t KVAdapter::ReInit()
96 {
97     LOGI("KVAdapter ReInit");
98     UnInit();
99     return Init();
100 }
101 
Put(const std::string & key,const std::string & value)102 int32_t KVAdapter::Put(const std::string &key, const std::string &value)
103 {
104     if (key.empty() || key.size() > MAX_STRING_LEN || value.empty() || value.size() > MAX_STRING_LEN) {
105         LOGE("Param is invalid!");
106         return ERR_DM_FAILED;
107     }
108     DistributedKv::Status status;
109     {
110         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
111         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
112 
113         DistributedKv::Key kvKey(key);
114         DistributedKv::Value kvValue(value);
115         status = kvStorePtr_->Put(kvKey, kvValue);
116     }
117     if (status != DistributedKv::Status::SUCCESS) {
118         LOGE("Put kv to db failed, ret: %{public}d", status);
119         return ERR_DM_FAILED;
120     }
121     return DM_OK;
122 }
123 
Get(const std::string & key,std::string & value)124 int32_t KVAdapter::Get(const std::string &key, std::string &value)
125 {
126     LOGI("Get data by key: %{public}s", GetAnonyString(key).c_str());
127     DistributedKv::Key kvKey(key);
128     DistributedKv::Value kvValue;
129     DistributedKv::Status status;
130     {
131         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
132         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
133         status = kvStorePtr_->Get(kvKey, kvValue);
134     }
135     if (status != DistributedKv::Status::SUCCESS) {
136         LOGE("Get data from kv failed, key: %{public}s", GetAnonyString(key).c_str());
137         return ERR_DM_FAILED;
138     }
139     value = kvValue.ToString();
140     return DM_OK;
141 }
142 
OnRemoteDied()143 void KVAdapter::OnRemoteDied()
144 {
145     LOGI("OnRemoteDied, recover db begin");
146     auto reInitTask = [this]() {
147         LOGI("ReInit, storeId:%{public}s", storeId_.storeId.c_str());
148         ReInit();
149     };
150     ffrt::submit(reInitTask);
151 }
152 
GetLocalKvStorePtr()153 DistributedKv::Status KVAdapter::GetLocalKvStorePtr()
154 {
155     DistributedKv::Options options = {
156         .createIfMissing = true,
157         .encrypt = false,
158         .autoSync = false,
159         .securityLevel = DistributedKv::SecurityLevel::S1,
160         .area = DistributedKv::EL1,
161         .kvStoreType = DistributedKv::KvStoreType::SINGLE_VERSION,
162         .baseDir = DATABASE_DIR
163     };
164     DistributedKv::Status status = kvDataMgr_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
165     return status;
166 }
167 
RegisterKvStoreDeathListener()168 void KVAdapter::RegisterKvStoreDeathListener()
169 {
170     LOGI("Register syncCompleted listener");
171     kvDataMgr_.RegisterKvStoreServiceDeathRecipient(shared_from_this());
172 }
173 
UnregisterKvStoreDeathListener()174 void KVAdapter::UnregisterKvStoreDeathListener()
175 {
176     LOGI("UnRegister death listener");
177     kvDataMgr_.UnRegisterKvStoreServiceDeathRecipient(shared_from_this());
178 }
179 
DeleteKvStore()180 int32_t KVAdapter::DeleteKvStore()
181 {
182     LOGI("Delete KvStore!");
183     kvDataMgr_.CloseKvStore(appId_, storeId_);
184     kvDataMgr_.DeleteKvStore(appId_, storeId_, DATABASE_DIR);
185     return DM_OK;
186 }
187 
DeleteByAppId(const std::string & appId,const std::string & prefix)188 int32_t KVAdapter::DeleteByAppId(const std::string &appId, const std::string &prefix)
189 {
190     if (appId.empty()) {
191         LOGE("appId is empty");
192         return ERR_DM_FAILED;
193     }
194     std::vector<DistributedKv::Entry> localEntries;
195     {
196         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
197         if (kvStorePtr_ == nullptr) {
198             LOGE("kvStoragePtr_ is null");
199             return ERR_DM_POINT_NULL;
200         }
201         if (kvStorePtr_->GetEntries(prefix + appId, localEntries) != DistributedKv::Status::SUCCESS) {
202             LOGE("Get entrys from DB failed.");
203             return ERR_DM_FAILED;
204         }
205     }
206     std::vector<std::string> delKeys;
207     for (const auto &entry : localEntries) {
208         delKeys.emplace_back(entry.key.ToString());
209         DmKVValue kvValue;
210         ConvertJsonToDmKVValue(entry.value.ToString(), kvValue);
211         delKeys.emplace_back(prefix + kvValue.anoyDeviceId);
212     }
213     return DeleteBatch(delKeys);
214 }
215 
DeleteBatch(const std::vector<std::string> & keys)216 int32_t KVAdapter::DeleteBatch(const std::vector<std::string> &keys)
217 {
218     if (keys.empty()) {
219         LOGE("keys size(%{public}zu) is invalid!", keys.size());
220         return ERR_DM_FAILED;
221     }
222     uint32_t keysSize = static_cast<uint32_t>(keys.size());
223     std::vector<std::vector<DistributedKv::Key>> delKeyBatches;
224     for (uint32_t i = 0; i < keysSize; i += MAX_BATCH_SIZE) {
225         uint32_t end = (i + MAX_BATCH_SIZE) > keysSize ? keysSize : (i + MAX_BATCH_SIZE);
226         auto batch = std::vector<std::string>(keys.begin() + i, keys.begin() + end);
227         std::vector<DistributedKv::Key> delKeys;
228         for (auto item : batch) {
229             DistributedKv::Key key(item);
230             delKeys.emplace_back(key);
231         }
232         delKeyBatches.emplace_back(delKeys);
233     }
234 
235     {
236         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
237         if (kvStorePtr_ == nullptr) {
238             LOGE("kvStorePtr is nullptr!");
239             return ERR_DM_POINT_NULL;
240         }
241         for (auto delKeys : delKeyBatches) {
242             DistributedKv::Status status = kvStorePtr_->DeleteBatch(delKeys);
243             if (status != DistributedKv::Status::SUCCESS) {
244                 LOGE("DeleteBatch failed!");
245                 return ERR_DM_FAILED;
246             }
247         }
248     }
249     return DM_OK;
250 }
251 
Delete(const std::string & key)252 int32_t KVAdapter::Delete(const std::string& key)
253 {
254     DistributedKv::Status status;
255     {
256         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
257         if (kvStorePtr_ == nullptr) {
258             LOGE("kvStorePtr is nullptr!");
259             return ERR_DM_POINT_NULL;
260         }
261         DistributedKv::Key kvKey(key);
262         status = kvStorePtr_->Delete(kvKey);
263     }
264     if (status != DistributedKv::Status::SUCCESS) {
265         LOGE("Delete kv by key failed!");
266         return ERR_DM_FAILED;
267     }
268     return DM_OK;
269 }
270 
GetAllOstypeData(const std::string & key,std::vector<std::string> & values)271 int32_t KVAdapter::GetAllOstypeData(const std::string &key, std::vector<std::string> &values)
272 {
273     if (key.empty()) {
274         LOGE("key is empty");
275         return ERR_DM_FAILED;
276     }
277     std::vector<DistributedKv::Entry> localEntries;
278     {
279         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
280         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
281         if (kvStorePtr_->GetEntries(key, localEntries) != DistributedKv::Status::SUCCESS) {
282             LOGE("Get entrys from DB failed.");
283             return ERR_DM_FAILED;
284         }
285     }
286     values.clear();
287     for (const auto &entry : localEntries) {
288         JsonObject osTyoeJson(entry.value.ToString());
289         if (osTyoeJson.IsDiscarded() || !IsInt32(osTyoeJson, PEER_OSTYPE) || !IsInt64(osTyoeJson, TIME_STAMP)) {
290             LOGE("entry parse error.");
291             continue;
292         }
293         if (entry.key.ToString().size() < DM_OSTYPE_PREFIX_LEN) {
294             LOGE("entry value invalid.");
295             continue;
296         }
297         JsonObject jsonObj;
298         jsonObj[PEER_UDID] = entry.key.ToString().substr(DM_OSTYPE_PREFIX_LEN);
299         jsonObj[PEER_OSTYPE] = osTyoeJson[PEER_OSTYPE].Get<int32_t>();
300         jsonObj[TIME_STAMP] = osTyoeJson[TIME_STAMP].Get<int64_t>();
301         values.push_back(jsonObj.Dump());
302     }
303     return DM_OK;
304 }
305 
GetOstypeCountByPrefix(const std::string & prefix,int32_t & count)306 int32_t KVAdapter::GetOstypeCountByPrefix(const std::string &prefix, int32_t &count)
307 {
308     LOGI("prefix %{public}s.", prefix.c_str());
309     if (prefix.empty()) {
310         LOGE("prefix is empty.");
311         return ERR_DM_FAILED;
312     }
313     {
314         std::lock_guard<std::mutex> lock(kvAdapterMutex_);
315         CHECK_NULL_RETURN(kvStorePtr_, ERR_DM_POINT_NULL);
316         DataQuery prefixQuery;
317         prefixQuery.KeyPrefix(prefix);
318         if (kvStorePtr_->GetCount(prefixQuery, count) != DistributedKv::Status::SUCCESS) {
319             LOGE("GetCount failed.");
320             return ERR_DM_FAILED;
321         }
322     }
323     return DM_OK;
324 }
325 } // namespace DistributedHardware
326 } // namespace OHOS
327