1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "SingleKvStoreImpl"
17
18 #include "single_kvstore_impl.h"
19 #include <fstream>
20 #include "account_delegate.h"
21 #include "backup_handler.h"
22 #include "checker/checker_manager.h"
23 #include "constant.h"
24 #include "dds_trace.h"
25 #include "device_kvstore_impl.h"
26 #include "auth/auth_delegate.h"
27 #include "kvstore_data_service.h"
28 #include "kvstore_utils.h"
29 #include "ipc_skeleton.h"
30 #include "log_print.h"
31 #include "permission_validator.h"
32 #include "query_helper.h"
33 #include "reporter.h"
34 #include "upgrade_manager.h"
35
36 namespace OHOS::DistributedKv {
37 using namespace OHOS::DistributedData;
TaskIsBackground(pid_t pid)38 static bool TaskIsBackground(pid_t pid)
39 {
40 std::ifstream ifs("/proc/" + std::to_string(pid) + "/cgroup", std::ios::in);
41 ZLOGD("pid %d open %d", pid, ifs.good());
42 if (!ifs.good()) {
43 return false;
44 }
45
46 while (!ifs.eof()) {
47 const int cgroupLineLen = 256; // enough
48 char buffer[cgroupLineLen] = { 0 };
49 ifs.getline(buffer, sizeof(buffer));
50 std::string line = buffer;
51
52 size_t pos = line.find("background");
53 if (pos != std::string::npos) {
54 ifs.close();
55 return true;
56 }
57 }
58 ifs.close();
59 return false;
60 }
61
~SingleKvStoreImpl()62 SingleKvStoreImpl::~SingleKvStoreImpl()
63 {
64 RemoveAllSyncOperation();
65 ZLOGI("destructor");
66 }
67
SingleKvStoreImpl(const Options & options,const std::string & userId,const std::string & bundleName,const std::string & storeId,const std::string & appId,const std::string & directory,DistributedDB::KvStoreNbDelegate * delegate)68 SingleKvStoreImpl::SingleKvStoreImpl(const Options &options, const std::string &userId, const std::string &bundleName,
69 const std::string &storeId, const std::string &appId, const std::string &directory,
70 DistributedDB::KvStoreNbDelegate *delegate)
71 : options_(options), deviceAccountId_(userId), bundleName_(bundleName), storeId_(storeId), appId_(appId),
72 storePath_(Constant::Concatenate({ directory, storeId })), kvStoreNbDelegate_(delegate), observerMapMutex_(),
73 observerMap_(), storeResultSetMutex_(), storeResultSetMap_(), openCount_(1),
74 flowCtrl_(BURST_CAPACITY, SUSTAINED_CAPACITY)
75 {
76 }
77
Put(const Key & key,const Value & value)78 Status SingleKvStoreImpl::Put(const Key &key, const Value &value)
79 {
80 if (!flowCtrl_.IsTokenEnough()) {
81 ZLOGE("flow control denied");
82 return Status::EXCEED_MAX_ACCESS_RATE;
83 }
84 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
85
86 auto trimmedKey = Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
87 // Restrict key and value size to interface specification.
88 if (trimmedKey.size() == 0 || trimmedKey.size() > Constant::MAX_KEY_LENGTH ||
89 value.Size() > Constant::MAX_VALUE_LENGTH) {
90 ZLOGW("invalid_argument.");
91 return Status::INVALID_ARGUMENT;
92 }
93 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
94 if (kvStoreNbDelegate_ == nullptr) {
95 ZLOGE("kvstore is not open");
96 return Status::ILLEGAL_STATE;
97 }
98 DistributedDB::Key tmpKey = trimmedKey;
99 DistributedDB::Value tmpValue = value.Data();
100 DistributedDB::DBStatus status;
101 {
102 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
103 status = kvStoreNbDelegate_->Put(tmpKey, tmpValue);
104 }
105 if (status == DistributedDB::DBStatus::OK) {
106 ZLOGD("succeed.");
107 return Status::SUCCESS;
108 }
109 ZLOGW("failed status: %d.", static_cast<int>(status));
110
111 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
112 ZLOGI("Put failed, distributeddb need recover.");
113 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
114 }
115
116 return ConvertDbStatus(status);
117 }
118
ConvertDbStatus(DistributedDB::DBStatus status)119 Status SingleKvStoreImpl::ConvertDbStatus(DistributedDB::DBStatus status)
120 {
121 switch (status) {
122 case DistributedDB::DBStatus::BUSY: // fallthrough
123 case DistributedDB::DBStatus::DB_ERROR:
124 return Status::DB_ERROR;
125 case DistributedDB::DBStatus::OK:
126 return Status::SUCCESS;
127 case DistributedDB::DBStatus::INVALID_ARGS:
128 return Status::INVALID_ARGUMENT;
129 case DistributedDB::DBStatus::NOT_FOUND:
130 return Status::KEY_NOT_FOUND;
131 case DistributedDB::DBStatus::INVALID_VALUE_FIELDS:
132 return Status::INVALID_VALUE_FIELDS;
133 case DistributedDB::DBStatus::INVALID_FIELD_TYPE:
134 return Status::INVALID_FIELD_TYPE;
135 case DistributedDB::DBStatus::CONSTRAIN_VIOLATION:
136 return Status::CONSTRAIN_VIOLATION;
137 case DistributedDB::DBStatus::INVALID_FORMAT:
138 return Status::INVALID_FORMAT;
139 case DistributedDB::DBStatus::INVALID_QUERY_FORMAT:
140 return Status::INVALID_QUERY_FORMAT;
141 case DistributedDB::DBStatus::INVALID_QUERY_FIELD:
142 return Status::INVALID_QUERY_FIELD;
143 case DistributedDB::DBStatus::NOT_SUPPORT:
144 return Status::NOT_SUPPORT;
145 case DistributedDB::DBStatus::TIME_OUT:
146 return Status::TIME_OUT;
147 case DistributedDB::DBStatus::OVER_MAX_LIMITS:
148 return Status::OVER_MAX_SUBSCRIBE_LIMITS;
149 case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
150 case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
151 return Status::SECURITY_LEVEL_ERROR;
152 default:
153 break;
154 }
155 return Status::ERROR;
156 }
157
Delete(const Key & key)158 Status SingleKvStoreImpl::Delete(const Key &key)
159 {
160 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
161 if (!flowCtrl_.IsTokenEnough()) {
162 ZLOGE("flow control denied");
163 return Status::EXCEED_MAX_ACCESS_RATE;
164 }
165 auto trimmedKey = DistributedKv::Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
166 if (trimmedKey.size() == 0 || trimmedKey.size() > Constant::MAX_KEY_LENGTH) {
167 ZLOGW("invalid argument.");
168 return Status::INVALID_ARGUMENT;
169 }
170 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
171 if (kvStoreNbDelegate_ == nullptr) {
172 ZLOGE("kvstore is not open");
173 return Status::ILLEGAL_STATE;
174 }
175 DistributedDB::Key tmpKey = trimmedKey;
176 DistributedDB::DBStatus status;
177 {
178 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
179 status = kvStoreNbDelegate_->Delete(tmpKey);
180 }
181 if (status == DistributedDB::DBStatus::OK) {
182 ZLOGD("succeed.");
183 return Status::SUCCESS;
184 }
185 ZLOGW("failed status: %d.", static_cast<int>(status));
186 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
187 ZLOGI("Delete failed, distributeddb need recover.");
188 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
189 }
190 return ConvertDbStatus(status);
191 }
192
Get(const Key & key,Value & value)193 Status SingleKvStoreImpl::Get(const Key &key, Value &value)
194 {
195 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
196 if (!flowCtrl_.IsTokenEnough()) {
197 ZLOGE("flow control denied");
198 return Status::EXCEED_MAX_ACCESS_RATE;
199 }
200 auto trimmedKey = DistributedKv::Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
201 if (trimmedKey.empty() || trimmedKey.size() > DistributedKv::Constant::MAX_KEY_LENGTH) {
202 return Status::INVALID_ARGUMENT;
203 }
204 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
205 if (kvStoreNbDelegate_ == nullptr) {
206 ZLOGE("kvstore is not open");
207 return Status::ILLEGAL_STATE;
208 }
209 DistributedDB::Key tmpKey = trimmedKey;
210 DistributedDB::Value tmpValue;
211 DistributedDB::DBStatus status;
212 {
213 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
214 status = kvStoreNbDelegate_->Get(tmpKey, tmpValue);
215 }
216 ZLOGD("status: %d.", static_cast<int>(status));
217 if (status == DistributedDB::DBStatus::OK) {
218 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
219 // Value don't have other write method.
220 Value tmpValueForCopy(tmpValue);
221 value = tmpValueForCopy;
222 return Status::SUCCESS;
223 }
224 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
225 ZLOGI("Get failed, distributeddb need recover.");
226 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
227 }
228 return ConvertDbStatus(status);
229 }
230
SubscribeKvStore(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)231 Status SingleKvStoreImpl::SubscribeKvStore(const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
232 {
233 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
234 ZLOGD("start.");
235 if (!flowCtrl_.IsTokenEnough()) {
236 ZLOGE("flow control denied");
237 return Status::EXCEED_MAX_ACCESS_RATE;
238 }
239 if (observer == nullptr) {
240 return Status::INVALID_ARGUMENT;
241 }
242 std::shared_lock<std::shared_mutex> sharedLock(storeNbDelegateMutex_);
243 if (kvStoreNbDelegate_ == nullptr) {
244 ZLOGE("kvstore is not open");
245 return Status::ILLEGAL_STATE;
246 }
247 KvStoreObserverImpl *nbObserver = CreateObserver(subscribeType, observer);
248 if (nbObserver == nullptr) {
249 ZLOGW("new KvStoreObserverNbImpl failed");
250 return Status::ERROR;
251 }
252
253 std::lock_guard<std::mutex> lock(observerMapMutex_);
254 IRemoteObject *objectPtr = observer->AsObject().GetRefPtr();
255 bool alreadySubscribed = (observerMap_.find(objectPtr) != observerMap_.end());
256 if (alreadySubscribed) {
257 delete nbObserver;
258 return Status::STORE_ALREADY_SUBSCRIBE;
259 }
260 int dbObserverMode = ConvertToDbObserverMode(subscribeType);
261 DistributedDB::Key emptyKey;
262 DistributedDB::DBStatus dbStatus;
263 {
264 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
265 dbStatus = kvStoreNbDelegate_->RegisterObserver(emptyKey, dbObserverMode, nbObserver);
266 }
267 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
268 if (dbStatus == DistributedDB::DBStatus::OK) {
269 observerMap_.insert(std::pair<IRemoteObject *, KvStoreObserverImpl *>(objectPtr, nbObserver));
270 return Status::SUCCESS;
271 }
272
273 delete nbObserver;
274 if (dbStatus == DistributedDB::DBStatus::INVALID_ARGS) {
275 return Status::INVALID_ARGUMENT;
276 }
277 if (dbStatus == DistributedDB::DBStatus::DB_ERROR) {
278 return Status::DB_ERROR;
279 }
280 return Status::ERROR;
281 }
282
CreateObserver(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)283 KvStoreObserverImpl *SingleKvStoreImpl::CreateObserver(
284 const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
285 {
286 return new (std::nothrow) KvStoreObserverImpl(subscribeType, observer);
287 }
288 // Convert KvStore subscribe type to DistributeDB observer mode.
ConvertToDbObserverMode(const SubscribeType subscribeType) const289 int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType) const
290 {
291 int dbObserverMode;
292 if (subscribeType == SubscribeType::SUBSCRIBE_TYPE_LOCAL) {
293 dbObserverMode = DistributedDB::OBSERVER_CHANGES_NATIVE;
294 } else if (subscribeType == SubscribeType::SUBSCRIBE_TYPE_REMOTE) {
295 dbObserverMode = DistributedDB::OBSERVER_CHANGES_FOREIGN;
296 } else {
297 dbObserverMode = DistributedDB::OBSERVER_CHANGES_FOREIGN | DistributedDB::OBSERVER_CHANGES_NATIVE;
298 }
299 return dbObserverMode;
300 }
301
302 // Convert KvStore sync mode to DistributeDB sync mode.
ConvertToDbSyncMode(SyncMode syncMode) const303 DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const
304 {
305 DistributedDB::SyncMode dbSyncMode;
306 if (syncMode == SyncMode::PUSH) {
307 dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY;
308 } else if (syncMode == SyncMode::PULL) {
309 dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY;
310 } else {
311 dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
312 }
313 return dbSyncMode;
314 }
315
UnSubscribeKvStore(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)316 Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
317 {
318 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
319 if (!flowCtrl_.IsTokenEnough()) {
320 ZLOGE("flow control denied");
321 return Status::EXCEED_MAX_ACCESS_RATE;
322 }
323 if (observer == nullptr) {
324 ZLOGW("observer invalid.");
325 return Status::INVALID_ARGUMENT;
326 }
327 std::shared_lock<std::shared_mutex> sharedLock(storeNbDelegateMutex_);
328 std::lock_guard<std::mutex> lock(observerMapMutex_);
329 if (kvStoreNbDelegate_ == nullptr) {
330 ZLOGE("kvstore is not open");
331 return Status::ILLEGAL_STATE;
332 }
333 IRemoteObject *objectPtr = observer->AsObject().GetRefPtr();
334 auto nbObserver = observerMap_.find(objectPtr);
335 if (nbObserver == observerMap_.end()) {
336 ZLOGW("No existing observer to unsubscribe. Return success.");
337 return Status::SUCCESS;
338 }
339 DistributedDB::DBStatus dbStatus;
340 {
341 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
342 dbStatus = kvStoreNbDelegate_->UnRegisterObserver(nbObserver->second);
343 }
344 if (dbStatus == DistributedDB::DBStatus::OK) {
345 delete nbObserver->second;
346 observerMap_.erase(objectPtr);
347 }
348 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
349 if (dbStatus == DistributedDB::DBStatus::OK) {
350 return Status::SUCCESS;
351 }
352
353 ZLOGW("failed code=%d.", static_cast<int>(dbStatus));
354 if (dbStatus == DistributedDB::DBStatus::NOT_FOUND) {
355 return Status::STORE_NOT_SUBSCRIBE;
356 }
357 if (dbStatus == DistributedDB::DBStatus::INVALID_ARGS) {
358 return Status::INVALID_ARGUMENT;
359 }
360 return Status::ERROR;
361 }
362
GetEntries(const Key & prefixKey,std::vector<Entry> & entries)363 Status SingleKvStoreImpl::GetEntries(const Key &prefixKey, std::vector<Entry> &entries)
364 {
365 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
366 if (!flowCtrl_.IsTokenEnough()) {
367 ZLOGE("flow control denied");
368 return Status::EXCEED_MAX_ACCESS_RATE;
369 }
370 auto trimmedPrefix = Constant::TrimCopy<std::vector<uint8_t>>(prefixKey.Data());
371 if (trimmedPrefix.size() > Constant::MAX_KEY_LENGTH) {
372 return Status::INVALID_ARGUMENT;
373 }
374 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
375 if (kvStoreNbDelegate_ == nullptr) {
376 ZLOGE("kvstore is not open");
377 return Status::ILLEGAL_STATE;
378 }
379 DistributedDB::Key tmpKeyPrefix = trimmedPrefix;
380 std::vector<DistributedDB::Entry> dbEntries;
381 DistributedDB::DBStatus status;
382 {
383 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
384 status = kvStoreNbDelegate_->GetEntries(tmpKeyPrefix, dbEntries);
385 }
386 ZLOGI("result DBStatus: %d", static_cast<int>(status));
387 if (status == DistributedDB::DBStatus::OK) {
388 entries.reserve(dbEntries.size());
389 ZLOGD("vector size: %zu status: %d.", dbEntries.size(), static_cast<int>(status));
390 for (auto const &dbEntry : dbEntries) {
391 Key tmpKey(dbEntry.key);
392 Value tmpValue(dbEntry.value);
393 Entry entry;
394 entry.key = tmpKey;
395 entry.value = tmpValue;
396 entries.push_back(entry);
397 }
398 return Status::SUCCESS;
399 }
400 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
401 ZLOGE("GetEntries failed, distributeddb need recover.");
402 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
403 }
404 if (status == DistributedDB::DBStatus::BUSY || status == DistributedDB::DBStatus::DB_ERROR) {
405 return Status::DB_ERROR;
406 }
407 if (status == DistributedDB::DBStatus::NOT_FOUND) {
408 ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list.");
409 return Status::SUCCESS;
410 }
411 if (status == DistributedDB::DBStatus::INVALID_ARGS) {
412 return Status::INVALID_ARGUMENT;
413 }
414 if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
415 status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
416 return Status::SECURITY_LEVEL_ERROR;
417 }
418 return Status::ERROR;
419 }
420
GetEntriesWithQuery(const std::string & query,std::vector<Entry> & entries)421 Status SingleKvStoreImpl::GetEntriesWithQuery(const std::string &query, std::vector<Entry> &entries)
422 {
423 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
424 if (!flowCtrl_.IsTokenEnough()) {
425 ZLOGE("flow control denied");
426 return Status::EXCEED_MAX_ACCESS_RATE;
427 }
428 ZLOGI("begin");
429 bool isSuccess = false;
430 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
431 if (!isSuccess) {
432 ZLOGE("StringToDbQuery failed.");
433 return Status::INVALID_ARGUMENT;
434 } else {
435 ZLOGD("StringToDbQuery success.");
436 }
437 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
438 if (kvStoreNbDelegate_ == nullptr) {
439 ZLOGE("kvstore is not open");
440 return Status::ILLEGAL_STATE;
441 }
442 std::vector<DistributedDB::Entry> dbEntries;
443 DistributedDB::DBStatus status;
444 {
445 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
446 status = kvStoreNbDelegate_->GetEntries(dbQuery, dbEntries);
447 }
448 ZLOGI("result DBStatus: %d", static_cast<int>(status));
449 if (status == DistributedDB::DBStatus::OK) {
450 entries.reserve(dbEntries.size());
451 ZLOGD("vector size: %zu status: %d.", dbEntries.size(), static_cast<int>(status));
452 for (auto const &dbEntry : dbEntries) {
453 Key tmpKey(dbEntry.key);
454 Value tmpValue(dbEntry.value);
455 Entry entry;
456 entry.key = tmpKey;
457 entry.value = tmpValue;
458 entries.push_back(entry);
459 }
460 return Status::SUCCESS;
461 }
462 if (status == DistributedDB::DBStatus::NOT_FOUND) {
463 ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list.");
464 return Status::SUCCESS;
465 }
466 return ConvertDbStatus(status);
467 }
468
GetResultSet(const Key & prefixKey,std::function<void (Status,sptr<IKvStoreResultSet>)> callback)469 void SingleKvStoreImpl::GetResultSet(const Key &prefixKey,
470 std::function<void(Status, sptr<IKvStoreResultSet>)> callback)
471 {
472 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
473 if (!flowCtrl_.IsTokenEnough()) {
474 ZLOGE("flow control denied");
475 callback(Status::EXCEED_MAX_ACCESS_RATE, nullptr);
476 return;
477 }
478 auto trimmedPrefix = Constant::TrimCopy<std::vector<uint8_t>>(prefixKey.Data());
479 if (trimmedPrefix.size() > Constant::MAX_KEY_LENGTH) {
480 callback(Status::INVALID_ARGUMENT, nullptr);
481 return;
482 }
483
484 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
485 if (kvStoreNbDelegate_ == nullptr) {
486 ZLOGE("kvstore is not open");
487 callback(Status::ILLEGAL_STATE, nullptr);
488 return;
489 }
490 DistributedDB::Key tmpKeyPrefix = trimmedPrefix;
491 DistributedDB::KvStoreResultSet *dbResultSet = nullptr;
492 DistributedDB::DBStatus status;
493 {
494 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
495 status = kvStoreNbDelegate_->GetEntries(tmpKeyPrefix, dbResultSet);
496 }
497 ZLOGI("result DBStatus: %d", static_cast<int>(status));
498 if (status == DistributedDB::DBStatus::OK) {
499 std::lock_guard<std::mutex> lg(storeResultSetMutex_);
500 sptr<KvStoreResultSetImpl> storeResultSet = CreateResultSet(dbResultSet, tmpKeyPrefix);
501 callback(Status::SUCCESS, storeResultSet);
502 storeResultSetMap_.emplace(storeResultSet.GetRefPtr(), storeResultSet);
503 return;
504 }
505 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
506 bool success = Import(bundleName_);
507 callback(success ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED, nullptr);
508 return;
509 }
510 callback(ConvertDbStatus(status), nullptr);
511 }
512
GetResultSetWithQuery(const std::string & query,std::function<void (Status,sptr<IKvStoreResultSet>)> callback)513 void SingleKvStoreImpl::GetResultSetWithQuery(const std::string &query,
514 std::function<void(Status, sptr<IKvStoreResultSet>)> callback)
515 {
516 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
517 if (!flowCtrl_.IsTokenEnough()) {
518 ZLOGE("flow control denied");
519 callback(Status::EXCEED_MAX_ACCESS_RATE, nullptr);
520 return;
521 }
522 bool isSuccess = false;
523 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
524 if (!isSuccess) {
525 ZLOGE("StringToDbQuery failed.");
526 return;
527 } else {
528 ZLOGD("StringToDbQuery success.");
529 }
530 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
531 if (kvStoreNbDelegate_ == nullptr) {
532 ZLOGE("kvstore is not open");
533 callback(Status::ILLEGAL_STATE, nullptr);
534 return;
535 }
536 DistributedDB::KvStoreResultSet *dbResultSet = nullptr;
537 DistributedDB::DBStatus status;
538 {
539 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
540 status = kvStoreNbDelegate_->GetEntries(dbQuery, dbResultSet);
541 }
542 ZLOGI("result DBStatus: %d", static_cast<int>(status));
543 if (status == DistributedDB::DBStatus::OK) {
544 std::lock_guard<std::mutex> lg(storeResultSetMutex_);
545 sptr<KvStoreResultSetImpl> storeResultSet = CreateResultSet(dbResultSet, {});
546 callback(Status::SUCCESS, storeResultSet);
547 storeResultSetMap_.emplace(storeResultSet.GetRefPtr(), storeResultSet);
548 return;
549 }
550 switch (status) {
551 case DistributedDB::DBStatus::BUSY:
552 case DistributedDB::DBStatus::DB_ERROR: {
553 callback(Status::DB_ERROR, nullptr);
554 break;
555 }
556 case DistributedDB::DBStatus::INVALID_ARGS: {
557 callback(Status::INVALID_ARGUMENT, nullptr);
558 break;
559 }
560 case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: {
561 callback(Status::INVALID_QUERY_FORMAT, nullptr);
562 break;
563 }
564 case DistributedDB::DBStatus::INVALID_QUERY_FIELD: {
565 callback(Status::INVALID_QUERY_FIELD, nullptr);
566 break;
567 }
568 case DistributedDB::DBStatus::NOT_SUPPORT: {
569 callback(Status::NOT_SUPPORT, nullptr);
570 break;
571 }
572 case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
573 case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
574 callback(Status::SECURITY_LEVEL_ERROR, nullptr);
575 break;
576 default: {
577 callback(Status::ERROR, nullptr);
578 break;
579 }
580 }
581 }
582
CreateResultSet(DistributedDB::KvStoreResultSet * resultSet,const DistributedDB::Key & prix)583 KvStoreResultSetImpl *SingleKvStoreImpl::CreateResultSet(
584 DistributedDB::KvStoreResultSet *resultSet, const DistributedDB::Key &prix)
585 {
586 return new (std::nothrow) KvStoreResultSetImpl(resultSet, prix);
587 }
588
GetCountWithQuery(const std::string & query,int & result)589 Status SingleKvStoreImpl::GetCountWithQuery(const std::string &query, int &result)
590 {
591 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
592 if (!flowCtrl_.IsTokenEnough()) {
593 ZLOGE("flow control denied");
594 return Status::EXCEED_MAX_ACCESS_RATE;
595 }
596 ZLOGI("begin");
597 bool isSuccess = false;
598 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
599 if (!isSuccess) {
600 ZLOGE("StringToDbQuery failed.");
601 return Status::INVALID_ARGUMENT;
602 } else {
603 ZLOGD("StringToDbQuery success.");
604 }
605 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
606 if (kvStoreNbDelegate_ == nullptr) {
607 ZLOGE("kvstore is not open");
608 return Status::ILLEGAL_STATE;
609 }
610 DistributedDB::DBStatus status;
611 {
612 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
613 status = kvStoreNbDelegate_->GetCount(dbQuery, result);
614 }
615 ZLOGI("result DBStatus: %d", static_cast<int>(status));
616 switch (status) {
617 case DistributedDB::DBStatus::OK: {
618 return Status::SUCCESS;
619 }
620 case DistributedDB::DBStatus::BUSY:
621 case DistributedDB::DBStatus::DB_ERROR: {
622 return Status::DB_ERROR;
623 }
624 case DistributedDB::DBStatus::INVALID_ARGS: {
625 return Status::INVALID_ARGUMENT;
626 }
627 case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: {
628 return Status::INVALID_QUERY_FORMAT;
629 }
630 case DistributedDB::DBStatus::INVALID_QUERY_FIELD: {
631 return Status::INVALID_QUERY_FIELD;
632 }
633 case DistributedDB::DBStatus::NOT_SUPPORT: {
634 return Status::NOT_SUPPORT;
635 }
636 case DistributedDB::DBStatus::NOT_FOUND: {
637 ZLOGE("DB return NOT_FOUND, no matching result. Return success with count 0.");
638 result = 0;
639 return Status::SUCCESS;
640 }
641 case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
642 case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
643 return Status::SECURITY_LEVEL_ERROR;
644 default: {
645 return Status::ERROR;
646 }
647 }
648 }
649
CloseResultSet(sptr<IKvStoreResultSet> resultSet)650 Status SingleKvStoreImpl::CloseResultSet(sptr<IKvStoreResultSet> resultSet)
651 {
652 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
653 if (resultSet == nullptr) {
654 return Status::INVALID_ARGUMENT;
655 }
656 if (!flowCtrl_.IsTokenEnough()) {
657 ZLOGE("flow control denied");
658 return Status::EXCEED_MAX_ACCESS_RATE;
659 }
660 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
661 std::lock_guard<std::mutex> lg(storeResultSetMutex_);
662 Status status;
663 auto it = storeResultSetMap_.find(resultSet.GetRefPtr());
664 if (it == storeResultSetMap_.end()) {
665 ZLOGE("ResultSet not found in this store.");
666 return Status::INVALID_ARGUMENT;
667 }
668 {
669 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
670 status = it->second->CloseResultSet(kvStoreNbDelegate_);
671 }
672 if (status == Status::SUCCESS) {
673 storeResultSetMap_.erase(it);
674 } else {
675 ZLOGE("CloseResultSet failed.");
676 }
677 return status;
678 }
679
RemoveDeviceData(const std::string & device)680 Status SingleKvStoreImpl::RemoveDeviceData(const std::string &device)
681 {
682 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
683 if (!flowCtrl_.IsTokenEnough()) {
684 ZLOGE("flow control denied");
685 return Status::EXCEED_MAX_ACCESS_RATE;
686 }
687 // map UUID to UDID
688 std::string deviceUDID = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(device);
689 if (deviceUDID.empty()) {
690 ZLOGE("can't get nodeid");
691 return Status::ERROR;
692 }
693 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
694 if (kvStoreNbDelegate_ == nullptr) {
695 ZLOGE("kvstore is not open");
696 return Status::ILLEGAL_STATE;
697 }
698 DistributedDB::DBStatus status;
699 {
700 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
701 status = kvStoreNbDelegate_->RemoveDeviceData(deviceUDID);
702 }
703 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
704 ZLOGE("RemoveDeviceData failed, distributeddb need recover.");
705 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
706 }
707 if (status == DistributedDB::DBStatus::OK) {
708 return Status::SUCCESS;
709 }
710 if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
711 status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
712 return Status::SECURITY_LEVEL_ERROR;
713 }
714 return Status::ERROR;
715 }
716
Sync(const std::vector<std::string> & deviceIds,SyncMode mode,uint32_t allowedDelayMs,uint64_t sequenceId)717 Status SingleKvStoreImpl::Sync(const std::vector<std::string> &deviceIds, SyncMode mode,
718 uint32_t allowedDelayMs, uint64_t sequenceId)
719 {
720 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
721 ZLOGD("start.");
722 if (!flowCtrl_.IsTokenEnough()) {
723 ZLOGE("flow control denied");
724 return Status::EXCEED_MAX_ACCESS_RATE;
725 }
726 uint32_t delayMs = GetSyncDelayTime(allowedDelayMs);
727 {
728 std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
729 if ((waitingSyncCount_ > 0) &&
730 (lastSyncDeviceIds_ == deviceIds) && (lastSyncMode_ == mode) && (lastSyncDelayMs_ == delayMs)) {
731 return Status::SUCCESS;
732 }
733 lastSyncDeviceIds_ = deviceIds;
734 lastSyncMode_ = mode;
735 lastSyncDelayMs_ = delayMs;
736 }
737 return AddSync(deviceIds, mode, delayMs, sequenceId);
738 }
739
Sync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,uint64_t sequenceId)740 Status SingleKvStoreImpl::Sync(const std::vector<std::string> &deviceIds, SyncMode mode,
741 const std::string &query, uint64_t sequenceId)
742 {
743 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
744 ZLOGD("start.");
745 if (!flowCtrl_.IsTokenEnough()) {
746 ZLOGE("flow control denied");
747 return Status::EXCEED_MAX_ACCESS_RATE;
748 }
749 uint32_t delayMs = GetSyncDelayTime(0);
750 return AddSync(deviceIds, mode, query, delayMs, sequenceId);
751 }
752
AddSync(const std::vector<std::string> & deviceIds,SyncMode mode,uint32_t delayMs,uint64_t sequenceId)753 Status SingleKvStoreImpl::AddSync(const std::vector<std::string> &deviceIds, SyncMode mode, uint32_t delayMs,
754 uint64_t sequenceId)
755 {
756 ZLOGD("start.");
757 waitingSyncCount_++;
758 return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
759 std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1, sequenceId),
760 std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
761 }
762
AddSync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,uint32_t delayMs,uint64_t sequenceId)763 Status SingleKvStoreImpl::AddSync(const std::vector<std::string> &deviceIds, SyncMode mode,
764 const std::string &query, uint32_t delayMs, uint64_t sequenceId)
765 {
766 ZLOGD("start.");
767 waitingSyncCount_++;
768 return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
769 std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1, sequenceId),
770 std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query, sequenceId));
771 }
772
GetSyncDelayTime(uint32_t allowedDelayMs) const773 uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const
774 {
775 uint32_t delayMs = allowedDelayMs;
776 if (delayMs == 0) {
777 bool isBackground = TaskIsBackground(IPCSkeleton::GetCallingPid());
778 if (isBackground) {
779 // delay schedule
780 delayMs = defaultSyncDelayMs_ ? defaultSyncDelayMs_ : KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS;
781 }
782 } else {
783 if (delayMs < KvStoreSyncManager::SYNC_MIN_DELAY_MS) {
784 delayMs = KvStoreSyncManager::SYNC_MIN_DELAY_MS;
785 }
786 if (delayMs > KvStoreSyncManager::SYNC_MAX_DELAY_MS) {
787 delayMs = KvStoreSyncManager::SYNC_MAX_DELAY_MS;
788 }
789 }
790 return delayMs;
791 }
792
RemoveAllSyncOperation()793 Status SingleKvStoreImpl::RemoveAllSyncOperation()
794 {
795 return KvStoreSyncManager::GetInstance()->RemoveSyncOperation(reinterpret_cast<uintptr_t>(this));
796 }
797
DoSyncComplete(const std::map<std::string,DistributedDB::DBStatus> & devicesSyncResult,const std::string & query,uint64_t sequenceId)798 void SingleKvStoreImpl::DoSyncComplete(const std::map<std::string, DistributedDB::DBStatus> &devicesSyncResult,
799 const std::string &query, uint64_t sequenceId)
800 {
801 DdsTrace trace(std::string("DdsTrace " LOG_TAG "::") + std::string(__FUNCTION__));
802 std::map<std::string, Status> resultMap;
803 for (auto device : devicesSyncResult) {
804 resultMap[device.first] = ConvertDbStatus(device.second);
805 }
806 syncRetries_ = 0;
807 ZLOGD("callback.");
808 if (syncCallback_ != nullptr) {
809 syncCallback_->SyncCompleted(resultMap, sequenceId);
810 }
811 }
812
DoQuerySync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd,uint64_t sequenceId)813 Status SingleKvStoreImpl::DoQuerySync(const std::vector<std::string> &deviceIds, SyncMode mode,
814 const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId)
815 {
816 ZLOGD("start.");
817 std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
818 if (deviceUuids.empty()) {
819 ZLOGE("not found deviceIds.");
820 return Status::ERROR;
821 }
822 DistributedDB::SyncMode dbMode;
823 if (mode == SyncMode::PUSH) {
824 dbMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY;
825 } else if (mode == SyncMode::PULL) {
826 dbMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY;
827 } else {
828 dbMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
829 }
830 bool isSuccess = false;
831 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
832 if (!isSuccess) {
833 ZLOGE("StringToDbQuery failed.");
834 return Status::INVALID_ARGUMENT;
835 }
836 ZLOGD("StringToDbQuery success.");
837 DistributedDB::DBStatus status;
838 {
839 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
840 if (kvStoreNbDelegate_ == nullptr) {
841 ZLOGE("kvstore is not open");
842 return Status::ILLEGAL_STATE;
843 }
844 waitingSyncCount_--;
845 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
846 status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd, dbQuery, false);
847 ZLOGD("end: %d", static_cast<int>(status));
848 }
849 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
850 if (status == DistributedDB::DBStatus::BUSY) {
851 if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) {
852 syncRetries_++;
853 auto addStatus = AddSync(deviceIds, mode, query, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId);
854 if (addStatus == Status::SUCCESS) {
855 return addStatus;
856 }
857 }
858 }
859 return ConvertDbStatus(status);
860 }
861
DoSync(const std::vector<std::string> & deviceIds,SyncMode mode,const KvStoreSyncManager::SyncEnd & syncEnd,uint64_t sequenceId)862 Status SingleKvStoreImpl::DoSync(const std::vector<std::string> &deviceIds, SyncMode mode,
863 const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId)
864 {
865 ZLOGD("start.");
866 std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
867 if (deviceUuids.empty()) {
868 ZLOGE("not found deviceIds.");
869 return Status::ERROR;
870 }
871 DistributedDB::SyncMode dbMode = ConvertToDbSyncMode(mode);
872 DistributedDB::DBStatus status;
873 {
874 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
875 if (kvStoreNbDelegate_ == nullptr) {
876 ZLOGE("kvstore is not open");
877 return Status::ILLEGAL_STATE;
878 }
879 waitingSyncCount_--;
880 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
881 status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd);
882 ZLOGD("end: %d", static_cast<uint32_t>(status));
883 }
884 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
885 if (status == DistributedDB::DBStatus::BUSY) {
886 if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) {
887 syncRetries_++;
888 auto addStatus = AddSync(deviceUuids, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId);
889 if (addStatus == Status::SUCCESS) {
890 return addStatus;
891 }
892 }
893 }
894 return ConvertDbStatus(status);
895 }
896
MapNodeIdToUuids(const std::vector<std::string> & deviceIds)897 std::vector<std::string> SingleKvStoreImpl::MapNodeIdToUuids(const std::vector<std::string> &deviceIds)
898 {
899 std::vector<std::string> deviceUuids;
900 for (auto const &nodeId : deviceIds) {
901 std::string uuid = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(nodeId);
902 if (!uuid.empty()) {
903 deviceUuids.push_back(uuid);
904 }
905 }
906 return deviceUuids;
907 }
908
DoSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd)909 Status SingleKvStoreImpl::DoSubscribe(const std::vector<std::string> &deviceIds,
910 const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd)
911 {
912 ZLOGD("start.");
913 std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
914 if (deviceUuids.empty()) {
915 ZLOGE("not found deviceIds.");
916 return Status::ERROR;
917 }
918 bool isSuccess = false;
919 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
920 if (!isSuccess) {
921 ZLOGE("StringToDbQuery failed.");
922 return Status::INVALID_ARGUMENT;
923 }
924 ZLOGD("StringToDbQuery success.");
925 DistributedDB::DBStatus status;
926 {
927 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
928 if (kvStoreNbDelegate_ == nullptr) {
929 ZLOGE("kvstore is not open");
930 return Status::ILLEGAL_STATE;
931 }
932 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
933 status = kvStoreNbDelegate_->SubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false);
934 ZLOGD("end: %d", static_cast<uint32_t>(status));
935 }
936 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
937 return ConvertDbStatus(status);
938 }
939
DoUnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd)940 Status SingleKvStoreImpl::DoUnSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
941 const KvStoreSyncManager::SyncEnd &syncEnd)
942 {
943 ZLOGD("start.");
944 std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
945 if (deviceUuids.empty()) {
946 ZLOGE("not found deviceIds.");
947 return Status::ERROR;
948 }
949 bool isSuccess = false;
950 DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
951 if (!isSuccess) {
952 ZLOGE("StringToDbQuery failed.");
953 return Status::INVALID_ARGUMENT;
954 }
955 ZLOGD("StringToDbQuery success.");
956 DistributedDB::DBStatus status;
957 {
958 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
959 if (kvStoreNbDelegate_ == nullptr) {
960 ZLOGE("kvstore is not open");
961 return Status::ILLEGAL_STATE;
962 }
963 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
964 status = kvStoreNbDelegate_->UnSubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false);
965 ZLOGD("end: %d", static_cast<uint32_t>(status));
966 }
967 return ConvertDbStatus(status);
968 }
969
AddSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint32_t delayMs,uint64_t sequenceId)970 Status SingleKvStoreImpl::AddSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
971 uint32_t delayMs, uint64_t sequenceId)
972 {
973 ZLOGD("start.");
974 return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
975 std::bind(&SingleKvStoreImpl::DoSubscribe, this, deviceIds, query, std::placeholders::_1),
976 std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
977 }
978
AddUnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint32_t delayMs,uint64_t sequenceId)979 Status SingleKvStoreImpl::AddUnSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
980 uint32_t delayMs, uint64_t sequenceId)
981 {
982 ZLOGD("start.");
983 return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
984 std::bind(&SingleKvStoreImpl::DoUnSubscribe, this, deviceIds, query, std::placeholders::_1),
985 std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
986 }
987
Subscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint64_t sequenceId)988 Status SingleKvStoreImpl::Subscribe(const std::vector<std::string> &deviceIds,
989 const std::string &query, uint64_t sequenceId)
990 {
991 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
992 ZLOGD("start.");
993 if (!flowCtrl_.IsTokenEnough()) {
994 ZLOGE("flow control denied");
995 return Status::EXCEED_MAX_ACCESS_RATE;
996 }
997 uint32_t delayMs = GetSyncDelayTime(0);
998 return AddSubscribe(deviceIds, query, delayMs, sequenceId);
999 }
1000
UnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint64_t sequenceId)1001 Status SingleKvStoreImpl::UnSubscribe(const std::vector<std::string> &deviceIds,
1002 const std::string &query, uint64_t sequenceId)
1003 {
1004 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1005 ZLOGD("start.");
1006 if (!flowCtrl_.IsTokenEnough()) {
1007 ZLOGE("flow control denied");
1008 return Status::EXCEED_MAX_ACCESS_RATE;
1009 }
1010 uint32_t delayMs = GetSyncDelayTime(0);
1011 return AddUnSubscribe(deviceIds, query, delayMs, sequenceId);
1012 }
1013
Close(DistributedDB::KvStoreDelegateManager * kvStoreDelegateManager)1014 InnerStatus SingleKvStoreImpl::Close(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager)
1015 {
1016 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1017
1018 ZLOGW("start Close");
1019 if (openCount_ > 1) {
1020 openCount_--;
1021 return InnerStatus::DECREASE_REFCOUNT;
1022 }
1023 Status status = ForceClose(kvStoreDelegateManager);
1024 if (status == Status::SUCCESS) {
1025 return InnerStatus::SUCCESS;
1026 }
1027 return InnerStatus::ERROR;
1028 }
1029
ForceClose(DistributedDB::KvStoreDelegateManager * kvStoreDelegateManager)1030 Status SingleKvStoreImpl::ForceClose(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager)
1031 {
1032 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1033
1034 ZLOGI("start, current openCount is %d.", openCount_);
1035 std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1036 if (kvStoreNbDelegate_ == nullptr || kvStoreDelegateManager == nullptr) {
1037 ZLOGW("get nullptr");
1038 return Status::INVALID_ARGUMENT;
1039 }
1040 RemoveAllSyncOperation();
1041 ZLOGI("start to clean observer");
1042 std::lock_guard<std::mutex> observerMapLockGuard(observerMapMutex_);
1043 for (auto observer = observerMap_.begin(); observer != observerMap_.end();) {
1044 DistributedDB::DBStatus dbStatus = kvStoreNbDelegate_->UnRegisterObserver(observer->second);
1045 if (dbStatus == DistributedDB::DBStatus::OK) {
1046 delete observer->second;
1047 observer = observerMap_.erase(observer);
1048 } else {
1049 ZLOGW("UnSubscribeKvStore failed during ForceClose, status %d.", dbStatus);
1050 return Status::ERROR;
1051 }
1052 }
1053 ZLOGI("start to clean resultset");
1054 std::lock_guard<std::mutex> lg(storeResultSetMutex_);
1055 for (auto resultSetPair = storeResultSetMap_.begin(); resultSetPair != storeResultSetMap_.end();) {
1056 Status status = (resultSetPair->second)->CloseResultSet(kvStoreNbDelegate_);
1057 if (status != Status::SUCCESS) {
1058 ZLOGW("CloseResultSet failed during ForceClose, errCode %d", status);
1059 return status;
1060 }
1061 resultSetPair = storeResultSetMap_.erase(resultSetPair);
1062 }
1063 DistributedDB::DBStatus status = kvStoreDelegateManager->CloseKvStore(kvStoreNbDelegate_);
1064 if (status == DistributedDB::DBStatus::OK) {
1065 kvStoreNbDelegate_ = nullptr;
1066 ZLOGI("end.");
1067 return Status::SUCCESS;
1068 }
1069 ZLOGI("failed with error code %d.", status);
1070 return Status::ERROR;
1071 }
1072
MigrateKvStore(const std::string & harmonyAccountId,const std::string & kvStoreDataDir,DistributedDB::KvStoreDelegateManager * oldDelegateMgr,DistributedDB::KvStoreDelegateManager * & newDelegateMgr)1073 Status SingleKvStoreImpl::MigrateKvStore(const std::string &harmonyAccountId,
1074 const std::string &kvStoreDataDir,
1075 DistributedDB::KvStoreDelegateManager *oldDelegateMgr,
1076 DistributedDB::KvStoreDelegateManager *&newDelegateMgr)
1077 {
1078 ZLOGI("begin.");
1079 std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1080 if (oldDelegateMgr == nullptr) {
1081 ZLOGW("kvStore delegate manager is nullptr.");
1082 return Status::INVALID_ARGUMENT;
1083 }
1084
1085 ZLOGI("create new KvStore.");
1086 std::vector<uint8_t> secretKey; // expected get secret key from meta kvstore successful when encrypt flag is true.
1087 std::unique_ptr<std::vector<uint8_t>, void(*)(std::vector<uint8_t>*)> cleanGuard(
1088 &secretKey, [](std::vector<uint8_t> *ptr) { ptr->assign(ptr->size(), 0); });
1089 bool outdated = false; // ignore outdated flag during rebuild kvstore.
1090 auto metaSecretKey = KvStoreMetaManager::GetMetaKey(deviceAccountId_, "default", bundleName_, storeId_,
1091 "SINGLE_KEY");
1092 if (options_.encrypt) {
1093 KvStoreMetaManager::GetInstance().GetSecretKeyFromMeta(metaSecretKey, secretKey, outdated);
1094 if (secretKey.empty()) {
1095 ZLOGE("Get secret key from meta kvstore failed.");
1096 return Status::CRYPT_ERROR;
1097 }
1098 }
1099
1100 DistributedDB::DBStatus dbStatus;
1101 DistributedDB::KvStoreNbDelegate::Option dbOption;
1102 Status status = KvStoreAppManager::InitNbDbOption(options_, secretKey, dbOption);
1103 if (status != Status::SUCCESS) {
1104 ZLOGE("InitNbDbOption failed.");
1105 return status;
1106 }
1107
1108 if (newDelegateMgr == nullptr) {
1109 if (appId_.empty()) {
1110 ZLOGE("Get appId by bundle name failed.");
1111 return Status::MIGRATION_KVSTORE_FAILED;
1112 }
1113 newDelegateMgr = new (std::nothrow) DistributedDB::KvStoreDelegateManager(appId_, harmonyAccountId);
1114 if (newDelegateMgr == nullptr) {
1115 ZLOGE("new KvStoreDelegateManager failed.");
1116 return Status::MIGRATION_KVSTORE_FAILED;
1117 }
1118 DistributedDB::KvStoreConfig kvStoreConfig;
1119 kvStoreConfig.dataDir = kvStoreDataDir;
1120 newDelegateMgr->SetKvStoreConfig(kvStoreConfig);
1121 }
1122 DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate = nullptr; // new KvStoreNbDelegate get from distributed DB.
1123 newDelegateMgr->GetKvStore(
1124 storeId_, dbOption,
1125 [&](DistributedDB::DBStatus status, DistributedDB::KvStoreNbDelegate *delegate) {
1126 kvStoreNbDelegate = delegate;
1127 dbStatus = status;
1128 });
1129 if (kvStoreNbDelegate == nullptr) {
1130 ZLOGE("storeDelegate is nullptr, dbStatusTmp: %d", static_cast<int>(dbStatus));
1131 return Status::DB_ERROR;
1132 }
1133
1134 if (options_.autoSync) {
1135 bool autoSync = true;
1136 auto data = static_cast<DistributedDB::PragmaData>(&autoSync);
1137 auto pragmaStatus = kvStoreNbDelegate->Pragma(DistributedDB::PragmaCmd::AUTO_SYNC, data);
1138 if (pragmaStatus != DistributedDB::DBStatus::OK) {
1139 ZLOGE("pragmaStatus: %d", static_cast<int>(pragmaStatus));
1140 }
1141 }
1142
1143 status = RebuildKvStoreObserver(kvStoreNbDelegate);
1144 if (status != Status::SUCCESS) {
1145 ZLOGI("rebuild KvStore observer failed, errCode %d.", static_cast<int>(status));
1146 // skip this failed, continue to do other rebuild process.
1147 }
1148
1149 status = RebuildKvStoreResultSet();
1150 if (status != Status::SUCCESS) {
1151 ZLOGI("rebuild KvStore resultset failed, errCode %d.", static_cast<int>(status));
1152 // skip this failed, continue to do close kvstore process.
1153 }
1154
1155 ZLOGI("close old KvStore.");
1156 dbStatus = oldDelegateMgr->CloseKvStore(kvStoreNbDelegate_);
1157 if (dbStatus != DistributedDB::DBStatus::OK) {
1158 ZLOGI("rebuild KvStore failed during close KvStore, errCode %d.", static_cast<int>(status));
1159 newDelegateMgr->CloseKvStore(kvStoreNbDelegate);
1160 return Status::DB_ERROR;
1161 }
1162
1163 ZLOGI("update kvstore delegate.");
1164 kvStoreNbDelegate_ = kvStoreNbDelegate;
1165 return Status::SUCCESS;
1166 }
1167
RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate * kvStoreNbDelegate)1168 Status SingleKvStoreImpl::RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate)
1169 {
1170 ZLOGI("rebuild observer.");
1171 if (kvStoreNbDelegate_ == nullptr || kvStoreNbDelegate == nullptr) {
1172 ZLOGI("RebuildKvStoreObserver illlegal.");
1173 return Status::ILLEGAL_STATE;
1174 }
1175 std::lock_guard<std::mutex> observerMapLockGuard(observerMapMutex_);
1176 Status status = Status::SUCCESS;
1177 DistributedDB::DBStatus dbStatus;
1178 DistributedDB::Key emptyKey;
1179 for (const auto &observerPair : observerMap_) {
1180 dbStatus = kvStoreNbDelegate_->UnRegisterObserver(observerPair.second);
1181 if (dbStatus != DistributedDB::OK) {
1182 status = Status::DB_ERROR;
1183 ZLOGW("rebuild observer failed during UnRegisterObserver, status %d.", static_cast<int>(dbStatus));
1184 continue;
1185 }
1186 dbStatus = kvStoreNbDelegate->RegisterObserver(emptyKey,
1187 static_cast<unsigned int>(ConvertToDbObserverMode(observerPair.second->GetSubscribeType())),
1188 observerPair.second);
1189 if (dbStatus != DistributedDB::OK) {
1190 status = Status::DB_ERROR;
1191 ZLOGW("rebuild observer failed during RegisterObserver, status %d.", static_cast<int>(dbStatus));
1192 continue;
1193 }
1194 }
1195 return status;
1196 }
1197
RebuildKvStoreResultSet()1198 Status SingleKvStoreImpl::RebuildKvStoreResultSet()
1199 {
1200 if (kvStoreNbDelegate_ == nullptr) {
1201 return Status::INVALID_ARGUMENT;
1202 }
1203 ZLOGI("rebuild resultset");
1204 std::lock_guard<std::mutex> lg(storeResultSetMutex_);
1205 Status retStatus = Status::SUCCESS;
1206 for (const auto &resultSetPair : storeResultSetMap_) {
1207 Status status = (resultSetPair.second)->MigrateKvStore(kvStoreNbDelegate_);
1208 if (status != Status::SUCCESS) {
1209 retStatus = status;
1210 ZLOGW("rebuild resultset failed, errCode %d", static_cast<int>(status));
1211 continue;
1212 }
1213 }
1214 return retStatus;
1215 }
1216
ReKey(const std::vector<uint8_t> & key)1217 Status SingleKvStoreImpl::ReKey(const std::vector<uint8_t> &key)
1218 {
1219 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1220
1221 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1222 if (kvStoreNbDelegate_ == nullptr) {
1223 ZLOGE("delegate is null.");
1224 return Status::DB_ERROR;
1225 }
1226 DistributedDB::CipherPassword password;
1227 auto status = password.SetValue(key.data(), key.size());
1228 if (status != DistributedDB::CipherPassword::ErrorCode::OK) {
1229 ZLOGE("Failed to set the passwd.");
1230 return Status::DB_ERROR;
1231 }
1232 DistributedDB::DBStatus dbStatus;
1233 {
1234 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1235 dbStatus = kvStoreNbDelegate_->Rekey(password);
1236 }
1237 if (dbStatus == DistributedDB::DBStatus::OK) {
1238 return Status::SUCCESS;
1239 }
1240 return Status::ERROR;
1241 }
1242
RegisterSyncCallback(sptr<IKvStoreSyncCallback> callback)1243 Status SingleKvStoreImpl::RegisterSyncCallback(sptr<IKvStoreSyncCallback> callback)
1244 {
1245 syncCallback_ = std::move(callback);
1246 return Status::SUCCESS;
1247 }
1248
UnRegisterSyncCallback()1249 Status SingleKvStoreImpl::UnRegisterSyncCallback()
1250 {
1251 syncCallback_ = nullptr;
1252 return Status::SUCCESS;
1253 }
1254
PutBatch(const std::vector<Entry> & entries)1255 Status SingleKvStoreImpl::PutBatch(const std::vector<Entry> &entries)
1256 {
1257 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1258
1259 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1260 if (kvStoreNbDelegate_ == nullptr) {
1261 ZLOGE("delegate is null.");
1262 return Status::DB_ERROR;
1263 }
1264 if (!flowCtrl_.IsTokenEnough()) {
1265 ZLOGE("flow control denied");
1266 return Status::EXCEED_MAX_ACCESS_RATE;
1267 }
1268
1269 // temporary transform.
1270 std::vector<DistributedDB::Entry> dbEntries;
1271 for (auto &entry : entries) {
1272 DistributedDB::Entry dbEntry;
1273
1274 std::vector<uint8_t> keyData = Constant::TrimCopy<std::vector<uint8_t>>(entry.key.Data());
1275 if (keyData.size() == 0 || keyData.size() > Constant::MAX_KEY_LENGTH) {
1276 ZLOGE("invalid key.");
1277 return Status::INVALID_ARGUMENT;
1278 }
1279
1280 dbEntry.key = keyData;
1281 dbEntry.value = entry.value.Data();
1282 dbEntries.push_back(dbEntry);
1283 }
1284 DistributedDB::DBStatus status;
1285 {
1286 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1287 status = kvStoreNbDelegate_->PutBatch(dbEntries);
1288 }
1289 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1290 ZLOGE("PutBatch failed, distributeddb need recover.");
1291 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1292 }
1293
1294 if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
1295 status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
1296 ZLOGE("delegate PutBatch failed.");
1297 return Status::SECURITY_LEVEL_ERROR;
1298 }
1299
1300 if (status != DistributedDB::DBStatus::OK) {
1301 ZLOGE("delegate PutBatch failed.");
1302 return Status::DB_ERROR;
1303 }
1304
1305 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1306 return Status::SUCCESS;
1307 }
1308
DeleteBatch(const std::vector<Key> & keys)1309 Status SingleKvStoreImpl::DeleteBatch(const std::vector<Key> &keys)
1310 {
1311 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1312
1313 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1314 if (kvStoreNbDelegate_ == nullptr) {
1315 ZLOGE("delegate is null.");
1316 return Status::DB_ERROR;
1317 }
1318 if (!flowCtrl_.IsTokenEnough()) {
1319 ZLOGE("flow control denied");
1320 return Status::EXCEED_MAX_ACCESS_RATE;
1321 }
1322
1323 // temporary transform.
1324 std::vector<DistributedDB::Key> dbKeys;
1325 for (auto &key : keys) {
1326 std::vector<uint8_t> keyData = Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
1327 if (keyData.size() == 0 || keyData.size() > Constant::MAX_KEY_LENGTH) {
1328 ZLOGE("invalid key.");
1329 return Status::INVALID_ARGUMENT;
1330 }
1331
1332 DistributedDB::Key keyTmp = keyData;
1333 dbKeys.push_back(keyTmp);
1334 }
1335 DistributedDB::DBStatus status;
1336 {
1337 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1338 status = kvStoreNbDelegate_->DeleteBatch(dbKeys);
1339 }
1340 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1341 ZLOGE("DeleteBatch failed, distributeddb need recover.");
1342 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1343 }
1344
1345 if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
1346 status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
1347 ZLOGE("delegate DeleteBatch failed.");
1348 return Status::SECURITY_LEVEL_ERROR;
1349 }
1350
1351 if (status != DistributedDB::DBStatus::OK) {
1352 ZLOGE("delegate DeleteBatch failed.");
1353 return Status::DB_ERROR;
1354 }
1355
1356 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1357 return Status::SUCCESS;
1358 }
1359
StartTransaction()1360 Status SingleKvStoreImpl::StartTransaction()
1361 {
1362 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1363
1364 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1365 if (kvStoreNbDelegate_ == nullptr) {
1366 ZLOGE("delegate is null.");
1367 return Status::DB_ERROR;
1368 }
1369 if (!flowCtrl_.IsTokenEnough()) {
1370 ZLOGE("flow control denied");
1371 return Status::EXCEED_MAX_ACCESS_RATE;
1372 }
1373 DistributedDB::DBStatus status;
1374 {
1375 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1376 status = kvStoreNbDelegate_->StartTransaction();
1377 }
1378 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1379 ZLOGE("StartTransaction failed, distributeddb need recover.");
1380 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1381 }
1382 if (status != DistributedDB::DBStatus::OK) {
1383 ZLOGE("delegate return error.");
1384 return Status::DB_ERROR;
1385 }
1386 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1387 return Status::SUCCESS;
1388 }
1389
Commit()1390 Status SingleKvStoreImpl::Commit()
1391 {
1392 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1393
1394 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1395 if (kvStoreNbDelegate_ == nullptr) {
1396 ZLOGE("delegate is null.");
1397 return Status::DB_ERROR;
1398 }
1399 if (!flowCtrl_.IsTokenEnough()) {
1400 ZLOGE("flow control denied");
1401 return Status::EXCEED_MAX_ACCESS_RATE;
1402 }
1403 DistributedDB::DBStatus status;
1404 {
1405 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1406 status = kvStoreNbDelegate_->Commit();
1407 }
1408 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1409 ZLOGE("Commit failed, distributeddb need recover.");
1410 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1411 }
1412 if (status != DistributedDB::DBStatus::OK) {
1413 ZLOGE("delegate return error.");
1414 return Status::DB_ERROR;
1415 }
1416
1417 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1418 return Status::SUCCESS;
1419 }
1420
Rollback()1421 Status SingleKvStoreImpl::Rollback()
1422 {
1423 DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1424
1425 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1426 if (kvStoreNbDelegate_ == nullptr) {
1427 ZLOGE("delegate is null.");
1428 return Status::DB_ERROR;
1429 }
1430 if (!flowCtrl_.IsTokenEnough()) {
1431 ZLOGE("flow control denied");
1432 return Status::EXCEED_MAX_ACCESS_RATE;
1433 }
1434 DistributedDB::DBStatus status;
1435 {
1436 DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1437 status = kvStoreNbDelegate_->Rollback();
1438 }
1439 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1440 ZLOGE("Rollback failed, distributeddb need recover.");
1441 return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1442 }
1443 if (status != DistributedDB::DBStatus::OK) {
1444 ZLOGE("delegate return error.");
1445 return Status::DB_ERROR;
1446 }
1447
1448 Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1449 return Status::SUCCESS;
1450 }
1451
Control(KvControlCmd cmd,const KvParam & inputParam,sptr<KvParam> & output)1452 Status SingleKvStoreImpl::Control(KvControlCmd cmd, const KvParam &inputParam, sptr<KvParam> &output)
1453 {
1454 output = nullptr;
1455 switch (cmd) {
1456 case KvControlCmd::SET_SYNC_PARAM: {
1457 if (inputParam.Size() != sizeof(uint32_t)) {
1458 return Status::IPC_ERROR;
1459 }
1460 uint32_t allowedDelayMs = TransferByteArrayToType<uint32_t>(inputParam.Data());
1461 ZLOGE("SET_SYNC_PARAM in %{public}d ms", allowedDelayMs);
1462 if (allowedDelayMs > 0 && allowedDelayMs < KvStoreSyncManager::SYNC_MIN_DELAY_MS) {
1463 return Status::INVALID_ARGUMENT;
1464 }
1465 if (allowedDelayMs > KvStoreSyncManager::SYNC_MAX_DELAY_MS) {
1466 return Status::INVALID_ARGUMENT;
1467 }
1468 defaultSyncDelayMs_ = allowedDelayMs;
1469 ZLOGE("SET_SYNC_PARAM save %{public}d ms", defaultSyncDelayMs_);
1470 return Status::SUCCESS;
1471 }
1472 case KvControlCmd::GET_SYNC_PARAM: {
1473 output = new KvParam(TransferTypeToByteArray<uint32_t>(defaultSyncDelayMs_));
1474 ZLOGE("GET_SYNC_PARAM read %{public}d ms", defaultSyncDelayMs_);
1475 return Status::SUCCESS;
1476 }
1477 default: {
1478 ZLOGE("control invalid command.");
1479 return Status::ERROR;
1480 }
1481 }
1482 }
1483
IncreaseOpenCount()1484 void SingleKvStoreImpl::IncreaseOpenCount()
1485 {
1486 openCount_++;
1487 }
1488
Import(const std::string & bundleName) const1489 bool SingleKvStoreImpl::Import(const std::string &bundleName) const
1490 {
1491 ZLOGI("Single KvStoreImpl Import start");
1492 const std::string harmonyAccountId = AccountDelegate::GetInstance()->GetCurrentAccountId();
1493 auto sKey = KvStoreMetaManager::GetMetaKey(deviceAccountId_, harmonyAccountId, bundleName, storeId_, "SINGLE_KEY");
1494 std::vector<uint8_t> secretKey;
1495 bool outdated = false;
1496 KvStoreMetaManager::GetInstance().GetSecretKeyFromMeta(sKey, secretKey, outdated);
1497 MetaData metaData{0};
1498 metaData.kvStoreMetaData.deviceAccountId = deviceAccountId_;
1499 metaData.kvStoreMetaData.userId = harmonyAccountId;
1500 metaData.kvStoreMetaData.bundleName = bundleName;
1501 metaData.kvStoreMetaData.appId = appId_;
1502 metaData.kvStoreMetaData.storeId = storeId_;
1503 metaData.kvStoreMetaData.securityLevel = options_.securityLevel;
1504 metaData.secretKeyMetaData.secretKey = secretKey;
1505 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1506 return std::make_unique<BackupHandler>()->SingleKvStoreRecover(metaData, kvStoreNbDelegate_);
1507 }
1508
SetCapabilityEnabled(bool enabled)1509 Status SingleKvStoreImpl::SetCapabilityEnabled(bool enabled)
1510 {
1511 ZLOGD("begin.");
1512 if (!flowCtrl_.IsTokenEnough()) {
1513 ZLOGE("flow control denied");
1514 return Status::EXCEED_MAX_ACCESS_RATE;
1515 }
1516
1517 std::string key;
1518 std::string devId = DeviceKvStoreImpl::GetLocalDeviceId();
1519 if (devId.empty()) {
1520 ZLOGE("get device id empty.");
1521 return Status::ERROR;
1522 }
1523
1524 StrategyMeta params = {devId, deviceAccountId_, Constant::DEFAULT_GROUP_ID, bundleName_, storeId_};
1525 KvStoreMetaManager::GetInstance().GetStrategyMetaKey(params, key);
1526 if (key.empty()) {
1527 ZLOGE("get key empty.");
1528 return Status::ERROR;
1529 }
1530 ZLOGD("end.");
1531 return KvStoreMetaManager::GetInstance().SaveStrategyMetaEnable(key, enabled);
1532 }
1533
SetCapabilityRange(const std::vector<std::string> & localLabels,const std::vector<std::string> & remoteSupportLabels)1534 Status SingleKvStoreImpl::SetCapabilityRange(const std::vector<std::string> &localLabels,
1535 const std::vector<std::string> &remoteSupportLabels)
1536 {
1537 if (!flowCtrl_.IsTokenEnough()) {
1538 ZLOGE("flow control denied");
1539 return Status::EXCEED_MAX_ACCESS_RATE;
1540 }
1541
1542 std::string key;
1543 std::string devId = DeviceKvStoreImpl::GetLocalDeviceId();
1544 if (devId.empty()) {
1545 ZLOGE("get device id empty.");
1546 return Status::ERROR;
1547 }
1548
1549 StrategyMeta params = {devId, deviceAccountId_, Constant::DEFAULT_GROUP_ID, bundleName_, storeId_};
1550 KvStoreMetaManager::GetInstance().GetStrategyMetaKey(params, key);
1551 if (key.empty()) {
1552 ZLOGE("get key empty.");
1553 return Status::ERROR;
1554 }
1555
1556 return KvStoreMetaManager::GetInstance().SaveStrategyMetaLabels(key, localLabels, remoteSupportLabels);
1557 }
1558
GetSecurityLevel(SecurityLevel & securityLevel)1559 Status SingleKvStoreImpl::GetSecurityLevel(SecurityLevel &securityLevel)
1560 {
1561 std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1562 if (kvStoreNbDelegate_ == nullptr) {
1563 return Status::STORE_NOT_OPEN;
1564 }
1565
1566 DistributedDB::SecurityOption option;
1567 auto status = kvStoreNbDelegate_->GetSecurityOption(option);
1568 if (status == DistributedDB::DBStatus::NOT_SUPPORT) {
1569 return Status::NOT_SUPPORT;
1570 }
1571
1572 if (status != DistributedDB::DBStatus::OK) {
1573 return Status::DB_ERROR;
1574 }
1575
1576 switch (option.securityLabel) {
1577 case DistributedDB::NOT_SET:
1578 case DistributedDB::S0:
1579 case DistributedDB::S1:
1580 case DistributedDB::S2:
1581 securityLevel = static_cast<SecurityLevel>(option.securityLabel);
1582 break;
1583 case DistributedDB::S3:
1584 securityLevel = option.securityFlag ? S3 : S3_EX;
1585 break;
1586 case DistributedDB::S4:
1587 securityLevel = S4;
1588 break;
1589 default:
1590 break;
1591 }
1592 return Status::SUCCESS;
1593 }
1594
OnDump(int fd) const1595 void SingleKvStoreImpl::OnDump(int fd) const
1596 {
1597 const std::string prefix(12, ' ');
1598 dprintf(fd, "%s------------------------------------------------------\n", prefix.c_str());
1599 dprintf(fd, "%sStoreID : %s\n", prefix.c_str(), storeId_.c_str());
1600 dprintf(fd, "%sStorePath : %s\n", prefix.c_str(), storePath_.c_str());
1601
1602 dprintf(fd, "%sOptions :\n", prefix.c_str());
1603 dprintf(fd, "%s backup : %d\n", prefix.c_str(), static_cast<int>(options_.backup));
1604 dprintf(fd, "%s encrypt : %d\n", prefix.c_str(), static_cast<int>(options_.encrypt));
1605 dprintf(fd, "%s autoSync : %d\n", prefix.c_str(), static_cast<int>(options_.autoSync));
1606 dprintf(fd, "%s persistant : %d\n", prefix.c_str(), static_cast<int>(options_.persistant));
1607 dprintf(fd, "%s kvStoreType : %d\n", prefix.c_str(), static_cast<int>(options_.kvStoreType));
1608 dprintf(fd, "%s createIfMissing : %d\n", prefix.c_str(), static_cast<int>(options_.createIfMissing));
1609 dprintf(fd, "%s schema : %s\n", prefix.c_str(), options_.schema.c_str());
1610 }
GetStoreId()1611 std::string SingleKvStoreImpl::GetStoreId()
1612 {
1613 return storeId_;
1614 }
1615
SetCompatibleIdentify(const std::string & changedDevice)1616 void SingleKvStoreImpl::SetCompatibleIdentify(const std::string &changedDevice)
1617 {
1618 bool flag = false;
1619 auto capability = UpgradeManager::GetInstance().GetCapability(changedDevice, flag);
1620 if (!flag || capability.version >= CapMetaData::CURRENT_VERSION) {
1621 ZLOGE("get peer capability %{public}d, or not older version", flag);
1622 return;
1623 }
1624
1625 auto peerUserId = 0; // peer user id reversed here
1626 auto groupType =
1627 AuthDelegate::GetInstance()->GetGroupType(std::stoi(deviceAccountId_), peerUserId, changedDevice, appId_);
1628 flag = false;
1629 std::string compatibleUserId = UpgradeManager::GetIdentifierByType(groupType, flag);
1630 if (!flag) {
1631 ZLOGE("failed to get identifier by group type %{public}d", groupType);
1632 return;
1633 }
1634 // older version use bound account syncIdentifier instead of user syncIdentifier
1635 ZLOGI("compatible user:%{public}s", compatibleUserId.c_str());
1636 auto syncIdentifier =
1637 DistributedDB::KvStoreDelegateManager::GetKvStoreIdentifier(compatibleUserId, appId_, storeId_);
1638 kvStoreNbDelegate_->SetEqualIdentifier(syncIdentifier, { changedDevice });
1639 }
1640
SetCompatibleIdentify()1641 void SingleKvStoreImpl::SetCompatibleIdentify()
1642 {
1643 KvStoreTuple tuple = { deviceAccountId_, appId_, storeId_ };
1644 UpgradeManager::SetCompatibleIdentifyByType(kvStoreNbDelegate_, tuple, IDENTICAL_ACCOUNT_GROUP);
1645 UpgradeManager::SetCompatibleIdentifyByType(kvStoreNbDelegate_, tuple, PEER_TO_PEER_GROUP);
1646 }
1647 } // namespace OHOS::DistributedKv
1648