1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_data_storage.h"
17
18 #include <thread>
19 #include <unistd.h>
20 #include "datetime_ex.h"
21 #include "dtbschedmgr_device_info_storage.h"
22 #include "dtbschedmgr_log.h"
23 #include "ipc_object_proxy.h"
24 #include "ipc_skeleton.h"
25 #include "iservice_registry.h"
26 #include "mission/distributed_sched_mission_manager.h"
27 #include "system_ability_definition.h"
28
29 using namespace std;
30 using namespace OHOS::DistributedKv;
31
32 namespace OHOS {
33 namespace DistributedSchedule {
34 namespace {
35 const string TAG = "DistributedDataStorage";
36 const string APP_ID = "DistributedSchedule";
37 const string STORE_ID = "SnapshotInfoDataStorage";
38 const string KVDB_PATH = "/data/service/el1/public/database/DistributedSchedule";
39 constexpr int32_t RETRY_TIMES_WAIT_KV_DATA = 30;
40 constexpr int32_t RETRY_TIMES_GET_KVSTORE = 5;
41 }
42
DistributedDataStorage()43 DistributedDataStorage::DistributedDataStorage()
44 {
45 appId_.appId = APP_ID;
46 storeId_.storeId = STORE_ID;
47 }
48
Init()49 bool DistributedDataStorage::Init()
50 {
51 HILOGD("begin.");
52 if (kvStoreDeathRecipient_ == nullptr) {
53 kvStoreDeathRecipient_ = sptr<IRemoteObject::DeathRecipient>(new KvStoreDeathRecipient());
54 }
55 if (dmsDataStorageHandler_ == nullptr) {
56 shared_ptr<AppExecFwk::EventRunner> runner = AppExecFwk::EventRunner::Create("dmsDataStorageHandler");
57 dmsDataStorageHandler_ = make_shared<AppExecFwk::EventHandler>(runner);
58 }
59 int32_t ret = InitKvDataService();
60 if (!ret) {
61 HILOGE("InitKvDataService failed!");
62 return false;
63 }
64 return true;
65 }
66
InitKvDataService()67 bool DistributedDataStorage::InitKvDataService()
68 {
69 auto waitTask = [this]() {
70 if (!WaitKvDataService()) {
71 HILOGE("get kvDataService failed!");
72 return;
73 }
74 InitDistributedDataStorage();
75 distributedDataChangeListener_ = make_unique<DistributedDataChangeListener>();
76 SubscribeDistributedDataStorage();
77 };
78 if (!dmsDataStorageHandler_->PostTask(waitTask)) {
79 HILOGE("post task failed!");
80 return false;
81 }
82 return true;
83 }
84
WaitKvDataService()85 bool DistributedDataStorage::WaitKvDataService()
86 {
87 auto samgrProxy = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
88 if (samgrProxy == nullptr) {
89 HILOGE("get samgrProxy failed!");
90 return false;
91 }
92 int32_t retryTimes = RETRY_TIMES_WAIT_KV_DATA;
93 do {
94 auto kvDataSvr = samgrProxy->CheckSystemAbility(DISTRIBUTED_KV_DATA_SERVICE_ABILITY_ID);
95 if (kvDataSvr != nullptr) {
96 IPCObjectProxy* proxy = reinterpret_cast<IPCObjectProxy*>(kvDataSvr.GetRefPtr());
97 if (proxy != nullptr && !proxy->IsObjectDead()) {
98 HILOGI("get service success!");
99 proxy->AddDeathRecipient(kvStoreDeathRecipient_);
100 return true;
101 }
102 }
103 HILOGD("waiting for service...");
104 this_thread::sleep_for(1s);
105 if (--retryTimes <= 0) {
106 HILOGE("waiting service timeout(30)s.");
107 return false;
108 }
109 } while (true);
110 return false;
111 }
112
InitDistributedDataStorage()113 void DistributedDataStorage::InitDistributedDataStorage()
114 {
115 int64_t begin = GetTickCount();
116 unique_lock<shared_mutex> writeLock(initLock_);
117 bool result = TryGetKvStore();
118 int64_t end = GetTickCount();
119 HILOGI("TryGetKvStore %{public}s, spend %{public}" PRId64 " ms", result ? "success" : "failed", end - begin);
120 }
121
TryGetKvStore()122 bool DistributedDataStorage::TryGetKvStore()
123 {
124 int32_t retryTimes = 0;
125 while (retryTimes < RETRY_TIMES_GET_KVSTORE) {
126 if (GetKvStore() == Status::SUCCESS && kvStorePtr_ != nullptr) {
127 return true;
128 }
129 HILOGD("retry get kvstore...");
130 this_thread::sleep_for(500ms);
131 retryTimes++;
132 }
133 if (kvStorePtr_ == nullptr) {
134 return false;
135 }
136 return true;
137 }
138
GetKvStore()139 Status DistributedDataStorage::GetKvStore()
140 {
141 Options options = {
142 .createIfMissing = true,
143 .encrypt = false,
144 .autoSync = true,
145 .securityLevel = DistributedKv::SecurityLevel::S2,
146 .kvStoreType = KvStoreType::SINGLE_VERSION,
147 .area = 1,
148 .baseDir = KVDB_PATH
149 };
150 Status status = dataManager_.GetSingleKvStore(options, appId_, storeId_, kvStorePtr_);
151 if (status != Status::SUCCESS) {
152 HILOGE("GetSingleKvStore failed, status = %{public}d.", status);
153 }
154 HILOGI("GetSingleKvStore success!");
155 return status;
156 }
157
SubscribeDistributedDataStorage()158 void DistributedDataStorage::SubscribeDistributedDataStorage()
159 {
160 int64_t begin = GetTickCount();
161 shared_lock<shared_mutex> readLock(initLock_);
162 if (kvStorePtr_ == nullptr) {
163 HILOGW("kvStorePtr is null!");
164 return;
165 }
166 SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
167 if (distributedDataChangeListener_ != nullptr) {
168 HILOGD("SubscribeKvStore start.");
169 Status status = kvStorePtr_->SubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
170 HILOGD("[PerformanceTest] SubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
171 if (status != Status::SUCCESS) {
172 HILOGE("SubscribeKvStore failed! status = %{public}d.", status);
173 return;
174 }
175 }
176 }
177
NotifyRemoteDied(const wptr<IRemoteObject> & remote)178 void DistributedDataStorage::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
179 {
180 HILOGD("begin.");
181 if (kvStoreDeathRecipient_ != nullptr) {
182 remote->RemoveDeathRecipient(kvStoreDeathRecipient_);
183 }
184 }
185
Stop()186 bool DistributedDataStorage::Stop()
187 {
188 HILOGD("begin.");
189 dmsDataStorageHandler_ = nullptr;
190 bool ret = UninitDistributedDataStorage();
191 if (!ret) {
192 HILOGE("UninitDistributedDataStorage failed!");
193 return false;
194 }
195 HILOGD("Stop success!");
196 return true;
197 }
198
UninitDistributedDataStorage()199 bool DistributedDataStorage::UninitDistributedDataStorage()
200 {
201 int64_t begin = GetTickCount();
202 Status status;
203 if (distributedDataChangeListener_ != nullptr && kvStorePtr_ != nullptr) {
204 SubscribeType subscribeType = SubscribeType::SUBSCRIBE_TYPE_REMOTE;
205 status = kvStorePtr_->UnSubscribeKvStore(subscribeType, move(distributedDataChangeListener_));
206 HILOGI("[PerformanceTest] UnSubscribeKvStore spend %{public}" PRId64 " ms", GetTickCount() - begin);
207 if (status != Status::SUCCESS) {
208 HILOGE("UnSubscribeKvStore failed! status = %{public}d.", status);
209 return false;
210 }
211 distributedDataChangeListener_ = nullptr;
212 }
213 if (kvStorePtr_ != nullptr) {
214 status = dataManager_.CloseKvStore(appId_, storeId_);
215 if (status != Status::SUCCESS) {
216 HILOGE("CloseKvStore failed! status = %{public}d.", status);
217 return false;
218 }
219 kvStorePtr_ = nullptr;
220 }
221 status = dataManager_.DeleteKvStore(appId_, storeId_, KVDB_PATH);
222 if (status != Status::SUCCESS) {
223 HILOGE("DeleteKvStore failed! status = %{public}d.", status);
224 return false;
225 }
226 return true;
227 }
228
Insert(const string & networkId,int32_t missionId,const uint8_t * byteStream,size_t len)229 bool DistributedDataStorage::Insert(const string& networkId, int32_t missionId,
230 const uint8_t* byteStream, size_t len)
231 {
232 if (networkId.empty()) {
233 HILOGW("networkId is empty!");
234 return false;
235 }
236 if (missionId < 0) {
237 HILOGW("missionId is invalid!");
238 return false;
239 }
240 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
241 if (uuid.empty()) {
242 HILOGW("uuid is empty!");
243 return false;
244 }
245 {
246 unique_lock<shared_mutex> writeLock(initLock_);
247 bool ret = InsertInnerLocked(uuid, missionId, byteStream, len);
248 if (!ret) {
249 HILOGE("Insert uuid = %{public}s + missionId = %{public}d failed!",
250 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
251 return false;
252 }
253 }
254 HILOGI("Insert uuid = %{public}s + missionId = %{public}d successful!",
255 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
256 return true;
257 }
258
InsertInnerLocked(const string & uuid,int32_t missionId,const uint8_t * byteStream,size_t len)259 bool DistributedDataStorage::InsertInnerLocked(const string& uuid, int32_t missionId,
260 const uint8_t* byteStream, size_t len)
261 {
262 HILOGD("called.");
263 int64_t begin = GetTickCount();
264 if (kvStorePtr_ == nullptr) {
265 HILOGW("kvStorePtr is null!");
266 return false;
267 }
268 Key key;
269 Value value;
270 GenerateKey(uuid, missionId, key);
271 GenerateValue(byteStream, len, value);
272 auto status = kvStorePtr_->Put(key, value);
273 HILOGI("[PerformanceTest] Put Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
274 if (status != Status::SUCCESS) {
275 HILOGE("kvStorePtr Put failed! status = %{public}d.", status);
276 return false;
277 }
278 return true;
279 }
280
Delete(const string & networkId,int32_t missionId)281 bool DistributedDataStorage::Delete(const string& networkId, int32_t missionId)
282 {
283 if (networkId.empty()) {
284 HILOGW("networkId is empty!");
285 return false;
286 }
287 if (missionId < 0) {
288 HILOGW("missionId is invalid!");
289 return false;
290 }
291 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
292 if (uuid.empty()) {
293 HILOGW("uuid is empty!");
294 return false;
295 }
296 {
297 unique_lock<shared_mutex> writeLock(initLock_);
298 bool ret = DeleteInnerLocked(uuid, missionId);
299 if (!ret) {
300 HILOGE("Delete uuid = %{public}s + missionId = %{public}d failed!",
301 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
302 return false;
303 }
304 }
305 HILOGI("Delete uuid = %{public}s + missionId = %{public}d successful!",
306 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
307 return true;
308 }
309
DeleteInnerLocked(const string & uuid,int32_t missionId)310 bool DistributedDataStorage::DeleteInnerLocked(const string& uuid, int32_t missionId)
311 {
312 HILOGD("called.");
313 int64_t begin = GetTickCount();
314 if (kvStorePtr_ == nullptr) {
315 HILOGW("kvStorePtr is null!");
316 return false;
317 }
318 Key key;
319 GenerateKey(uuid, missionId, key);
320 auto status = kvStorePtr_->Delete(key);
321 HILOGI("[PerformanceTest] Delete Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
322 if (status != Status::SUCCESS) {
323 HILOGE("kvStorePtr Delete failed! status = %{public}d.", status);
324 return false;
325 }
326 return true;
327 }
328
FuzzyDelete(const string & networkId)329 bool DistributedDataStorage::FuzzyDelete(const string& networkId)
330 {
331 if (networkId.empty()) {
332 HILOGW("networkId is empty!");
333 return false;
334 }
335 {
336 unique_lock<shared_mutex> writeLock(initLock_);
337 bool ret = FuzzyDeleteInnerLocked(networkId);
338 if (!ret) {
339 HILOGW("FuzzyDelete networkId = %{public}s failed!",
340 DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
341 return false;
342 }
343 }
344 HILOGI("FuzzyDelete networkId = %{public}s successful!", DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
345 return true;
346 }
347
FuzzyDeleteInnerLocked(const string & networkId)348 bool DistributedDataStorage::FuzzyDeleteInnerLocked(const string& networkId)
349 {
350 HILOGD("called.");
351 int64_t begin = GetTickCount();
352 if (kvStorePtr_ == nullptr) {
353 HILOGW("kvStorePtr is null!");
354 return false;
355 }
356 auto status = kvStorePtr_->RemoveDeviceData(networkId);
357 HILOGI("[PerformanceTest] RemoveDeviceData Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
358 if (status != Status::SUCCESS) {
359 HILOGE("kvStorePtr RemoveDeviceData failed! status = %{public}d.", status);
360 return false;
361 }
362 return true;
363 }
364
Query(const string & networkId,int32_t missionId,Value & value) const365 bool DistributedDataStorage::Query(const string& networkId, int32_t missionId, Value& value) const
366 {
367 if (networkId.empty()) {
368 HILOGW("networkId is empty!");
369 return false;
370 }
371 if (missionId < 0) {
372 HILOGW("missionId is invalid!");
373 return false;
374 }
375 string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
376 if (uuid.empty()) {
377 HILOGW("uuid is empty!");
378 return false;
379 }
380 {
381 shared_lock<shared_mutex> readLock(initLock_);
382 bool ret = QueryInnerLocked(uuid, missionId, value);
383 if (!ret) {
384 HILOGE("Query uuid = %{public}s + missionId = %{public}d failed!",
385 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
386 return false;
387 }
388 }
389 HILOGI("Query uuid = %{public}s + missionId = %{public}d successful!",
390 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
391 return true;
392 }
393
QueryInnerLocked(const string & uuid,int32_t missionId,Value & value) const394 bool DistributedDataStorage::QueryInnerLocked(const string& uuid, int32_t missionId, Value& value) const
395 {
396 HILOGD("called.");
397 int64_t begin = GetTickCount();
398 if (kvStorePtr_ == nullptr) {
399 HILOGW("kvStorePtr is null!");
400 return false;
401 }
402 Key key;
403 GenerateKey(uuid, missionId, key);
404 auto status = kvStorePtr_->Get(key, value);
405 HILOGI("[PerformanceTest] Get Snapshot spend %{public}" PRId64 " ms", GetTickCount() - begin);
406 if (status != Status::SUCCESS) {
407 HILOGE("kvStorePtr Get failed! status = %{public}d.", status);
408 return false;
409 }
410 return true;
411 }
412
GenerateKey(const string & uuid,int32_t missionId,Key & key)413 void DistributedDataStorage::GenerateKey(const string& uuid, int32_t missionId, Key& key)
414 {
415 string keyString;
416 keyString.append(uuid).append("_").append(to_string(missionId));
417 key = keyString;
418 }
419
GenerateValue(const uint8_t * byteStream,size_t len,Value & value)420 void DistributedDataStorage::GenerateValue(const uint8_t* byteStream, size_t len, Value& value)
421 {
422 Value valueString((char *)byteStream, len);
423 value = valueString;
424 }
425 } // DistributedSchedule
426 } // namespace OHOS