• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "SingleKvStoreImpl"
17 
18 #include "single_kvstore_impl.h"
19 #include <fstream>
20 #include "account_delegate.h"
21 #include "backup_handler.h"
22 #include "checker/checker_manager.h"
23 #include "constant.h"
24 #include "dds_trace.h"
25 #include "device_kvstore_impl.h"
26 #include "auth/auth_delegate.h"
27 #include "kvstore_data_service.h"
28 #include "kvstore_utils.h"
29 #include "ipc_skeleton.h"
30 #include "log_print.h"
31 #include "permission_validator.h"
32 #include "query_helper.h"
33 #include "reporter.h"
34 #include "upgrade_manager.h"
35 
36 namespace OHOS::DistributedKv {
37 using namespace OHOS::DistributedData;
TaskIsBackground(pid_t pid)38 static bool TaskIsBackground(pid_t pid)
39 {
40     std::ifstream ifs("/proc/" + std::to_string(pid) + "/cgroup", std::ios::in);
41     ZLOGD("pid %d open %d", pid, ifs.good());
42     if (!ifs.good()) {
43         return false;
44     }
45 
46     while (!ifs.eof()) {
47         const int cgroupLineLen = 256; // enough
48         char buffer[cgroupLineLen] = { 0 };
49         ifs.getline(buffer, sizeof(buffer));
50         std::string line = buffer;
51 
52         size_t pos = line.find("background");
53         if (pos != std::string::npos) {
54             ifs.close();
55             return true;
56         }
57     }
58     ifs.close();
59     return false;
60 }
61 
~SingleKvStoreImpl()62 SingleKvStoreImpl::~SingleKvStoreImpl()
63 {
64     RemoveAllSyncOperation();
65     ZLOGI("destructor");
66 }
67 
SingleKvStoreImpl(const Options & options,const std::string & userId,const std::string & bundleName,const std::string & storeId,const std::string & appId,const std::string & directory,DistributedDB::KvStoreNbDelegate * delegate)68 SingleKvStoreImpl::SingleKvStoreImpl(const Options &options, const std::string &userId, const std::string &bundleName,
69     const std::string &storeId, const std::string &appId, const std::string &directory,
70     DistributedDB::KvStoreNbDelegate *delegate)
71     : options_(options), deviceAccountId_(userId), bundleName_(bundleName), storeId_(storeId), appId_(appId),
72       storePath_(Constant::Concatenate({ directory, storeId })), kvStoreNbDelegate_(delegate), observerMapMutex_(),
73       observerMap_(), storeResultSetMutex_(), storeResultSetMap_(), openCount_(1),
74       flowCtrl_(BURST_CAPACITY, SUSTAINED_CAPACITY)
75 {
76 }
77 
Put(const Key & key,const Value & value)78 Status SingleKvStoreImpl::Put(const Key &key, const Value &value)
79 {
80     if (!flowCtrl_.IsTokenEnough()) {
81         ZLOGE("flow control denied");
82         return Status::EXCEED_MAX_ACCESS_RATE;
83     }
84     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
85 
86     auto trimmedKey = Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
87     // Restrict key and value size to interface specification.
88     if (trimmedKey.size() == 0 || trimmedKey.size() > Constant::MAX_KEY_LENGTH ||
89         value.Size() > Constant::MAX_VALUE_LENGTH) {
90         ZLOGW("invalid_argument.");
91         return Status::INVALID_ARGUMENT;
92     }
93     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
94     if (kvStoreNbDelegate_ == nullptr) {
95         ZLOGE("kvstore is not open");
96         return Status::ILLEGAL_STATE;
97     }
98     DistributedDB::Key tmpKey = trimmedKey;
99     DistributedDB::Value tmpValue = value.Data();
100     DistributedDB::DBStatus status;
101     {
102         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
103         status = kvStoreNbDelegate_->Put(tmpKey, tmpValue);
104     }
105     if (status == DistributedDB::DBStatus::OK) {
106         ZLOGD("succeed.");
107         return Status::SUCCESS;
108     }
109     ZLOGW("failed status: %d.", static_cast<int>(status));
110 
111     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
112         ZLOGI("Put failed, distributeddb need recover.");
113         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
114     }
115 
116     return ConvertDbStatus(status);
117 }
118 
ConvertDbStatus(DistributedDB::DBStatus status)119 Status SingleKvStoreImpl::ConvertDbStatus(DistributedDB::DBStatus status)
120 {
121     switch (status) {
122         case DistributedDB::DBStatus::BUSY: // fallthrough
123         case DistributedDB::DBStatus::DB_ERROR:
124             return Status::DB_ERROR;
125         case DistributedDB::DBStatus::OK:
126             return Status::SUCCESS;
127         case DistributedDB::DBStatus::INVALID_ARGS:
128             return Status::INVALID_ARGUMENT;
129         case DistributedDB::DBStatus::NOT_FOUND:
130             return Status::KEY_NOT_FOUND;
131         case DistributedDB::DBStatus::INVALID_VALUE_FIELDS:
132             return Status::INVALID_VALUE_FIELDS;
133         case DistributedDB::DBStatus::INVALID_FIELD_TYPE:
134             return Status::INVALID_FIELD_TYPE;
135         case DistributedDB::DBStatus::CONSTRAIN_VIOLATION:
136             return Status::CONSTRAIN_VIOLATION;
137         case DistributedDB::DBStatus::INVALID_FORMAT:
138             return Status::INVALID_FORMAT;
139         case DistributedDB::DBStatus::INVALID_QUERY_FORMAT:
140             return Status::INVALID_QUERY_FORMAT;
141         case DistributedDB::DBStatus::INVALID_QUERY_FIELD:
142             return Status::INVALID_QUERY_FIELD;
143         case DistributedDB::DBStatus::NOT_SUPPORT:
144             return Status::NOT_SUPPORT;
145         case DistributedDB::DBStatus::TIME_OUT:
146             return Status::TIME_OUT;
147         case DistributedDB::DBStatus::OVER_MAX_LIMITS:
148             return Status::OVER_MAX_SUBSCRIBE_LIMITS;
149         case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
150         case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
151             return Status::SECURITY_LEVEL_ERROR;
152         default:
153             break;
154     }
155     return Status::ERROR;
156 }
157 
Delete(const Key & key)158 Status SingleKvStoreImpl::Delete(const Key &key)
159 {
160     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
161     if (!flowCtrl_.IsTokenEnough()) {
162         ZLOGE("flow control denied");
163         return Status::EXCEED_MAX_ACCESS_RATE;
164     }
165     auto trimmedKey = DistributedKv::Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
166     if (trimmedKey.size() == 0 || trimmedKey.size() > Constant::MAX_KEY_LENGTH) {
167         ZLOGW("invalid argument.");
168         return Status::INVALID_ARGUMENT;
169     }
170     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
171     if (kvStoreNbDelegate_ == nullptr) {
172         ZLOGE("kvstore is not open");
173         return Status::ILLEGAL_STATE;
174     }
175     DistributedDB::Key tmpKey = trimmedKey;
176     DistributedDB::DBStatus status;
177     {
178         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
179         status = kvStoreNbDelegate_->Delete(tmpKey);
180     }
181     if (status == DistributedDB::DBStatus::OK) {
182         ZLOGD("succeed.");
183         return Status::SUCCESS;
184     }
185     ZLOGW("failed status: %d.", static_cast<int>(status));
186     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
187         ZLOGI("Delete failed, distributeddb need recover.");
188         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
189     }
190     return ConvertDbStatus(status);
191 }
192 
Get(const Key & key,Value & value)193 Status SingleKvStoreImpl::Get(const Key &key, Value &value)
194 {
195     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
196     if (!flowCtrl_.IsTokenEnough()) {
197         ZLOGE("flow control denied");
198         return Status::EXCEED_MAX_ACCESS_RATE;
199     }
200     auto trimmedKey = DistributedKv::Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
201     if (trimmedKey.empty() || trimmedKey.size() > DistributedKv::Constant::MAX_KEY_LENGTH) {
202         return Status::INVALID_ARGUMENT;
203     }
204     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
205     if (kvStoreNbDelegate_ == nullptr) {
206         ZLOGE("kvstore is not open");
207         return Status::ILLEGAL_STATE;
208     }
209     DistributedDB::Key tmpKey = trimmedKey;
210     DistributedDB::Value tmpValue;
211     DistributedDB::DBStatus status;
212     {
213         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
214         status = kvStoreNbDelegate_->Get(tmpKey, tmpValue);
215     }
216     ZLOGD("status: %d.", static_cast<int>(status));
217     if (status == DistributedDB::DBStatus::OK) {
218         Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
219         // Value don't have other write method.
220         Value tmpValueForCopy(tmpValue);
221         value = tmpValueForCopy;
222         return Status::SUCCESS;
223     }
224     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
225         ZLOGI("Get failed, distributeddb need recover.");
226         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
227     }
228     return ConvertDbStatus(status);
229 }
230 
SubscribeKvStore(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)231 Status SingleKvStoreImpl::SubscribeKvStore(const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
232 {
233     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
234     ZLOGD("start.");
235     if (!flowCtrl_.IsTokenEnough()) {
236         ZLOGE("flow control denied");
237         return Status::EXCEED_MAX_ACCESS_RATE;
238     }
239     if (observer == nullptr) {
240         return Status::INVALID_ARGUMENT;
241     }
242     std::shared_lock<std::shared_mutex> sharedLock(storeNbDelegateMutex_);
243     if (kvStoreNbDelegate_ == nullptr) {
244         ZLOGE("kvstore is not open");
245         return Status::ILLEGAL_STATE;
246     }
247     KvStoreObserverImpl *nbObserver = CreateObserver(subscribeType, observer);
248     if (nbObserver == nullptr) {
249         ZLOGW("new KvStoreObserverNbImpl failed");
250         return Status::ERROR;
251     }
252 
253     std::lock_guard<std::mutex> lock(observerMapMutex_);
254     IRemoteObject *objectPtr = observer->AsObject().GetRefPtr();
255     bool alreadySubscribed = (observerMap_.find(objectPtr) != observerMap_.end());
256     if (alreadySubscribed) {
257         delete nbObserver;
258         return Status::STORE_ALREADY_SUBSCRIBE;
259     }
260     int dbObserverMode = ConvertToDbObserverMode(subscribeType);
261     DistributedDB::Key emptyKey;
262     DistributedDB::DBStatus dbStatus;
263     {
264         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
265         dbStatus = kvStoreNbDelegate_->RegisterObserver(emptyKey, dbObserverMode, nbObserver);
266     }
267     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
268     if (dbStatus == DistributedDB::DBStatus::OK) {
269         observerMap_.insert(std::pair<IRemoteObject *, KvStoreObserverImpl *>(objectPtr, nbObserver));
270         return Status::SUCCESS;
271     }
272 
273     delete nbObserver;
274     if (dbStatus == DistributedDB::DBStatus::INVALID_ARGS) {
275         return Status::INVALID_ARGUMENT;
276     }
277     if (dbStatus == DistributedDB::DBStatus::DB_ERROR) {
278         return Status::DB_ERROR;
279     }
280     return Status::ERROR;
281 }
282 
CreateObserver(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)283 KvStoreObserverImpl *SingleKvStoreImpl::CreateObserver(
284     const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
285 {
286     return new (std::nothrow) KvStoreObserverImpl(subscribeType, observer);
287 }
288 // Convert KvStore subscribe type to DistributeDB observer mode.
ConvertToDbObserverMode(const SubscribeType subscribeType) const289 int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType) const
290 {
291     int dbObserverMode;
292     if (subscribeType == SubscribeType::SUBSCRIBE_TYPE_LOCAL) {
293         dbObserverMode = DistributedDB::OBSERVER_CHANGES_NATIVE;
294     } else if (subscribeType == SubscribeType::SUBSCRIBE_TYPE_REMOTE) {
295         dbObserverMode = DistributedDB::OBSERVER_CHANGES_FOREIGN;
296     } else {
297         dbObserverMode = DistributedDB::OBSERVER_CHANGES_FOREIGN | DistributedDB::OBSERVER_CHANGES_NATIVE;
298     }
299     return dbObserverMode;
300 }
301 
302 // Convert KvStore sync mode to DistributeDB sync mode.
ConvertToDbSyncMode(SyncMode syncMode) const303 DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const
304 {
305     DistributedDB::SyncMode dbSyncMode;
306     if (syncMode == SyncMode::PUSH) {
307         dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY;
308     } else if (syncMode == SyncMode::PULL) {
309         dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY;
310     } else {
311         dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
312     }
313     return dbSyncMode;
314 }
315 
UnSubscribeKvStore(const SubscribeType subscribeType,sptr<IKvStoreObserver> observer)316 Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr<IKvStoreObserver> observer)
317 {
318     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
319     if (!flowCtrl_.IsTokenEnough()) {
320         ZLOGE("flow control denied");
321         return Status::EXCEED_MAX_ACCESS_RATE;
322     }
323     if (observer == nullptr) {
324         ZLOGW("observer invalid.");
325         return Status::INVALID_ARGUMENT;
326     }
327     std::shared_lock<std::shared_mutex> sharedLock(storeNbDelegateMutex_);
328     std::lock_guard<std::mutex> lock(observerMapMutex_);
329     if (kvStoreNbDelegate_ == nullptr) {
330         ZLOGE("kvstore is not open");
331         return Status::ILLEGAL_STATE;
332     }
333     IRemoteObject *objectPtr = observer->AsObject().GetRefPtr();
334     auto nbObserver = observerMap_.find(objectPtr);
335     if (nbObserver == observerMap_.end()) {
336         ZLOGW("No existing observer to unsubscribe. Return success.");
337         return Status::SUCCESS;
338     }
339     DistributedDB::DBStatus dbStatus;
340     {
341         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
342         dbStatus = kvStoreNbDelegate_->UnRegisterObserver(nbObserver->second);
343     }
344     if (dbStatus == DistributedDB::DBStatus::OK) {
345         delete nbObserver->second;
346         observerMap_.erase(objectPtr);
347     }
348     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
349     if (dbStatus == DistributedDB::DBStatus::OK) {
350         return Status::SUCCESS;
351     }
352 
353     ZLOGW("failed code=%d.", static_cast<int>(dbStatus));
354     if (dbStatus == DistributedDB::DBStatus::NOT_FOUND) {
355         return Status::STORE_NOT_SUBSCRIBE;
356     }
357     if (dbStatus == DistributedDB::DBStatus::INVALID_ARGS) {
358         return Status::INVALID_ARGUMENT;
359     }
360     return Status::ERROR;
361 }
362 
GetEntries(const Key & prefixKey,std::vector<Entry> & entries)363 Status SingleKvStoreImpl::GetEntries(const Key &prefixKey, std::vector<Entry> &entries)
364 {
365     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
366     if (!flowCtrl_.IsTokenEnough()) {
367         ZLOGE("flow control denied");
368         return Status::EXCEED_MAX_ACCESS_RATE;
369     }
370     auto trimmedPrefix = Constant::TrimCopy<std::vector<uint8_t>>(prefixKey.Data());
371     if (trimmedPrefix.size() > Constant::MAX_KEY_LENGTH) {
372         return Status::INVALID_ARGUMENT;
373     }
374     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
375     if (kvStoreNbDelegate_ == nullptr) {
376         ZLOGE("kvstore is not open");
377         return Status::ILLEGAL_STATE;
378     }
379     DistributedDB::Key tmpKeyPrefix = trimmedPrefix;
380     std::vector<DistributedDB::Entry> dbEntries;
381     DistributedDB::DBStatus status;
382     {
383         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
384         status = kvStoreNbDelegate_->GetEntries(tmpKeyPrefix, dbEntries);
385     }
386     ZLOGI("result DBStatus: %d", static_cast<int>(status));
387     if (status == DistributedDB::DBStatus::OK) {
388         entries.reserve(dbEntries.size());
389         ZLOGD("vector size: %zu status: %d.", dbEntries.size(), static_cast<int>(status));
390         for (auto const &dbEntry : dbEntries) {
391             Key tmpKey(dbEntry.key);
392             Value tmpValue(dbEntry.value);
393             Entry entry;
394             entry.key = tmpKey;
395             entry.value = tmpValue;
396             entries.push_back(entry);
397         }
398         return Status::SUCCESS;
399     }
400     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
401         ZLOGE("GetEntries failed, distributeddb need recover.");
402         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
403     }
404     if (status == DistributedDB::DBStatus::BUSY || status == DistributedDB::DBStatus::DB_ERROR) {
405         return Status::DB_ERROR;
406     }
407     if (status == DistributedDB::DBStatus::NOT_FOUND) {
408         ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list.");
409         return Status::SUCCESS;
410     }
411     if (status == DistributedDB::DBStatus::INVALID_ARGS) {
412         return Status::INVALID_ARGUMENT;
413     }
414     if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
415         status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
416         return Status::SECURITY_LEVEL_ERROR;
417     }
418     return Status::ERROR;
419 }
420 
GetEntriesWithQuery(const std::string & query,std::vector<Entry> & entries)421 Status SingleKvStoreImpl::GetEntriesWithQuery(const std::string &query, std::vector<Entry> &entries)
422 {
423     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
424     if (!flowCtrl_.IsTokenEnough()) {
425         ZLOGE("flow control denied");
426         return Status::EXCEED_MAX_ACCESS_RATE;
427     }
428     ZLOGI("begin");
429     bool isSuccess = false;
430     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
431     if (!isSuccess) {
432         ZLOGE("StringToDbQuery failed.");
433         return Status::INVALID_ARGUMENT;
434     } else {
435         ZLOGD("StringToDbQuery success.");
436     }
437     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
438     if (kvStoreNbDelegate_ == nullptr) {
439         ZLOGE("kvstore is not open");
440         return Status::ILLEGAL_STATE;
441     }
442     std::vector<DistributedDB::Entry> dbEntries;
443     DistributedDB::DBStatus status;
444     {
445         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
446         status = kvStoreNbDelegate_->GetEntries(dbQuery, dbEntries);
447     }
448     ZLOGI("result DBStatus: %d", static_cast<int>(status));
449     if (status == DistributedDB::DBStatus::OK) {
450         entries.reserve(dbEntries.size());
451         ZLOGD("vector size: %zu status: %d.", dbEntries.size(), static_cast<int>(status));
452         for (auto const &dbEntry : dbEntries) {
453             Key tmpKey(dbEntry.key);
454             Value tmpValue(dbEntry.value);
455             Entry entry;
456             entry.key = tmpKey;
457             entry.value = tmpValue;
458             entries.push_back(entry);
459         }
460         return Status::SUCCESS;
461     }
462     if (status == DistributedDB::DBStatus::NOT_FOUND) {
463         ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list.");
464         return Status::SUCCESS;
465     }
466     return ConvertDbStatus(status);
467 }
468 
GetResultSet(const Key & prefixKey,std::function<void (Status,sptr<IKvStoreResultSet>)> callback)469 void SingleKvStoreImpl::GetResultSet(const Key &prefixKey,
470                                      std::function<void(Status, sptr<IKvStoreResultSet>)> callback)
471 {
472     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
473     if (!flowCtrl_.IsTokenEnough()) {
474         ZLOGE("flow control denied");
475         callback(Status::EXCEED_MAX_ACCESS_RATE, nullptr);
476         return;
477     }
478     auto trimmedPrefix = Constant::TrimCopy<std::vector<uint8_t>>(prefixKey.Data());
479     if (trimmedPrefix.size() > Constant::MAX_KEY_LENGTH) {
480         callback(Status::INVALID_ARGUMENT, nullptr);
481         return;
482     }
483 
484     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
485     if (kvStoreNbDelegate_ == nullptr) {
486         ZLOGE("kvstore is not open");
487         callback(Status::ILLEGAL_STATE, nullptr);
488         return;
489     }
490     DistributedDB::Key tmpKeyPrefix = trimmedPrefix;
491     DistributedDB::KvStoreResultSet *dbResultSet = nullptr;
492     DistributedDB::DBStatus status;
493     {
494         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
495         status = kvStoreNbDelegate_->GetEntries(tmpKeyPrefix, dbResultSet);
496     }
497     ZLOGI("result DBStatus: %d", static_cast<int>(status));
498     if (status == DistributedDB::DBStatus::OK) {
499         std::lock_guard<std::mutex> lg(storeResultSetMutex_);
500         sptr<KvStoreResultSetImpl> storeResultSet = CreateResultSet(dbResultSet, tmpKeyPrefix);
501         callback(Status::SUCCESS, storeResultSet);
502         storeResultSetMap_.emplace(storeResultSet.GetRefPtr(), storeResultSet);
503         return;
504     }
505     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
506         bool success = Import(bundleName_);
507         callback(success ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED, nullptr);
508         return;
509     }
510     callback(ConvertDbStatus(status), nullptr);
511 }
512 
GetResultSetWithQuery(const std::string & query,std::function<void (Status,sptr<IKvStoreResultSet>)> callback)513 void SingleKvStoreImpl::GetResultSetWithQuery(const std::string &query,
514                                               std::function<void(Status, sptr<IKvStoreResultSet>)> callback)
515 {
516     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
517     if (!flowCtrl_.IsTokenEnough()) {
518         ZLOGE("flow control denied");
519         callback(Status::EXCEED_MAX_ACCESS_RATE, nullptr);
520         return;
521     }
522     bool isSuccess = false;
523     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
524     if (!isSuccess) {
525         ZLOGE("StringToDbQuery failed.");
526         return;
527     } else {
528         ZLOGD("StringToDbQuery success.");
529     }
530     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
531     if (kvStoreNbDelegate_ == nullptr) {
532         ZLOGE("kvstore is not open");
533         callback(Status::ILLEGAL_STATE, nullptr);
534         return;
535     }
536     DistributedDB::KvStoreResultSet *dbResultSet = nullptr;
537     DistributedDB::DBStatus status;
538     {
539         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
540         status = kvStoreNbDelegate_->GetEntries(dbQuery, dbResultSet);
541     }
542     ZLOGI("result DBStatus: %d", static_cast<int>(status));
543     if (status == DistributedDB::DBStatus::OK) {
544         std::lock_guard<std::mutex> lg(storeResultSetMutex_);
545         sptr<KvStoreResultSetImpl> storeResultSet = CreateResultSet(dbResultSet, {});
546         callback(Status::SUCCESS, storeResultSet);
547         storeResultSetMap_.emplace(storeResultSet.GetRefPtr(), storeResultSet);
548         return;
549     }
550     switch (status) {
551         case DistributedDB::DBStatus::BUSY:
552         case DistributedDB::DBStatus::DB_ERROR: {
553             callback(Status::DB_ERROR, nullptr);
554             break;
555         }
556         case DistributedDB::DBStatus::INVALID_ARGS: {
557             callback(Status::INVALID_ARGUMENT, nullptr);
558             break;
559         }
560         case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: {
561             callback(Status::INVALID_QUERY_FORMAT, nullptr);
562             break;
563         }
564         case DistributedDB::DBStatus::INVALID_QUERY_FIELD: {
565             callback(Status::INVALID_QUERY_FIELD, nullptr);
566             break;
567         }
568         case DistributedDB::DBStatus::NOT_SUPPORT: {
569             callback(Status::NOT_SUPPORT, nullptr);
570             break;
571         }
572         case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
573         case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
574             callback(Status::SECURITY_LEVEL_ERROR, nullptr);
575             break;
576         default: {
577             callback(Status::ERROR, nullptr);
578             break;
579         }
580     }
581 }
582 
CreateResultSet(DistributedDB::KvStoreResultSet * resultSet,const DistributedDB::Key & prix)583 KvStoreResultSetImpl *SingleKvStoreImpl::CreateResultSet(
584     DistributedDB::KvStoreResultSet *resultSet, const DistributedDB::Key &prix)
585 {
586     return new (std::nothrow) KvStoreResultSetImpl(resultSet, prix);
587 }
588 
GetCountWithQuery(const std::string & query,int & result)589 Status SingleKvStoreImpl::GetCountWithQuery(const std::string &query, int &result)
590 {
591     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
592     if (!flowCtrl_.IsTokenEnough()) {
593         ZLOGE("flow control denied");
594         return Status::EXCEED_MAX_ACCESS_RATE;
595     }
596     ZLOGI("begin");
597     bool isSuccess = false;
598     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
599     if (!isSuccess) {
600         ZLOGE("StringToDbQuery failed.");
601         return Status::INVALID_ARGUMENT;
602     } else {
603         ZLOGD("StringToDbQuery success.");
604     }
605     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
606     if (kvStoreNbDelegate_ == nullptr) {
607         ZLOGE("kvstore is not open");
608         return Status::ILLEGAL_STATE;
609     }
610     DistributedDB::DBStatus status;
611     {
612         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
613         status = kvStoreNbDelegate_->GetCount(dbQuery, result);
614     }
615     ZLOGI("result DBStatus: %d", static_cast<int>(status));
616     switch (status) {
617         case DistributedDB::DBStatus::OK: {
618             return Status::SUCCESS;
619         }
620         case DistributedDB::DBStatus::BUSY:
621         case DistributedDB::DBStatus::DB_ERROR: {
622             return Status::DB_ERROR;
623         }
624         case DistributedDB::DBStatus::INVALID_ARGS: {
625             return Status::INVALID_ARGUMENT;
626         }
627         case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: {
628             return Status::INVALID_QUERY_FORMAT;
629         }
630         case DistributedDB::DBStatus::INVALID_QUERY_FIELD: {
631             return Status::INVALID_QUERY_FIELD;
632         }
633         case DistributedDB::DBStatus::NOT_SUPPORT: {
634             return Status::NOT_SUPPORT;
635         }
636         case DistributedDB::DBStatus::NOT_FOUND: {
637             ZLOGE("DB return NOT_FOUND, no matching result. Return success with count 0.");
638             result = 0;
639             return Status::SUCCESS;
640         }
641         case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough
642         case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR:
643             return Status::SECURITY_LEVEL_ERROR;
644         default: {
645             return Status::ERROR;
646         }
647     }
648 }
649 
CloseResultSet(sptr<IKvStoreResultSet> resultSet)650 Status SingleKvStoreImpl::CloseResultSet(sptr<IKvStoreResultSet> resultSet)
651 {
652     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
653     if (resultSet == nullptr) {
654         return Status::INVALID_ARGUMENT;
655     }
656     if (!flowCtrl_.IsTokenEnough()) {
657         ZLOGE("flow control denied");
658         return Status::EXCEED_MAX_ACCESS_RATE;
659     }
660     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
661     std::lock_guard<std::mutex> lg(storeResultSetMutex_);
662     Status status;
663     auto it = storeResultSetMap_.find(resultSet.GetRefPtr());
664     if (it == storeResultSetMap_.end()) {
665         ZLOGE("ResultSet not found in this store.");
666         return Status::INVALID_ARGUMENT;
667     }
668     {
669         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
670         status = it->second->CloseResultSet(kvStoreNbDelegate_);
671     }
672     if (status == Status::SUCCESS) {
673         storeResultSetMap_.erase(it);
674     } else {
675         ZLOGE("CloseResultSet failed.");
676     }
677     return status;
678 }
679 
RemoveDeviceData(const std::string & device)680 Status SingleKvStoreImpl::RemoveDeviceData(const std::string &device)
681 {
682     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
683     if (!flowCtrl_.IsTokenEnough()) {
684         ZLOGE("flow control denied");
685         return Status::EXCEED_MAX_ACCESS_RATE;
686     }
687     // map UUID to UDID
688     std::string deviceUDID = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(device);
689     if (deviceUDID.empty()) {
690         ZLOGE("can't get nodeid");
691         return Status::ERROR;
692     }
693     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
694     if (kvStoreNbDelegate_ == nullptr) {
695         ZLOGE("kvstore is not open");
696         return Status::ILLEGAL_STATE;
697     }
698     DistributedDB::DBStatus status;
699     {
700         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
701         status = kvStoreNbDelegate_->RemoveDeviceData(deviceUDID);
702     }
703     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
704         ZLOGE("RemoveDeviceData failed, distributeddb need recover.");
705         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
706     }
707     if (status == DistributedDB::DBStatus::OK) {
708         return Status::SUCCESS;
709     }
710     if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
711         status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
712         return Status::SECURITY_LEVEL_ERROR;
713     }
714     return Status::ERROR;
715 }
716 
Sync(const std::vector<std::string> & deviceIds,SyncMode mode,uint32_t allowedDelayMs,uint64_t sequenceId)717 Status SingleKvStoreImpl::Sync(const std::vector<std::string> &deviceIds, SyncMode mode,
718                                uint32_t allowedDelayMs, uint64_t sequenceId)
719 {
720     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
721     ZLOGD("start.");
722     if (!flowCtrl_.IsTokenEnough()) {
723         ZLOGE("flow control denied");
724         return Status::EXCEED_MAX_ACCESS_RATE;
725     }
726     uint32_t delayMs = GetSyncDelayTime(allowedDelayMs);
727     {
728         std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
729         if ((waitingSyncCount_ > 0) &&
730             (lastSyncDeviceIds_ == deviceIds) && (lastSyncMode_ == mode) && (lastSyncDelayMs_ == delayMs)) {
731             return Status::SUCCESS;
732         }
733         lastSyncDeviceIds_ = deviceIds;
734         lastSyncMode_ = mode;
735         lastSyncDelayMs_ = delayMs;
736     }
737     return AddSync(deviceIds, mode, delayMs, sequenceId);
738 }
739 
Sync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,uint64_t sequenceId)740 Status SingleKvStoreImpl::Sync(const std::vector<std::string> &deviceIds, SyncMode mode,
741                                const std::string &query, uint64_t sequenceId)
742 {
743     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
744     ZLOGD("start.");
745     if (!flowCtrl_.IsTokenEnough()) {
746         ZLOGE("flow control denied");
747         return Status::EXCEED_MAX_ACCESS_RATE;
748     }
749     uint32_t delayMs = GetSyncDelayTime(0);
750     return AddSync(deviceIds, mode, query, delayMs, sequenceId);
751 }
752 
AddSync(const std::vector<std::string> & deviceIds,SyncMode mode,uint32_t delayMs,uint64_t sequenceId)753 Status SingleKvStoreImpl::AddSync(const std::vector<std::string> &deviceIds, SyncMode mode, uint32_t delayMs,
754                                   uint64_t sequenceId)
755 {
756     ZLOGD("start.");
757     waitingSyncCount_++;
758     return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
759         std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1, sequenceId),
760         std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
761 }
762 
AddSync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,uint32_t delayMs,uint64_t sequenceId)763 Status SingleKvStoreImpl::AddSync(const std::vector<std::string> &deviceIds, SyncMode mode,
764                                   const std::string &query, uint32_t delayMs, uint64_t sequenceId)
765 {
766     ZLOGD("start.");
767     waitingSyncCount_++;
768     return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
769         std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1, sequenceId),
770         std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query, sequenceId));
771 }
772 
GetSyncDelayTime(uint32_t allowedDelayMs) const773 uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const
774 {
775     uint32_t delayMs = allowedDelayMs;
776     if (delayMs == 0) {
777         bool isBackground = TaskIsBackground(IPCSkeleton::GetCallingPid());
778         if (isBackground) {
779             // delay schedule
780             delayMs = defaultSyncDelayMs_ ? defaultSyncDelayMs_ : KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS;
781         }
782     } else {
783         if (delayMs < KvStoreSyncManager::SYNC_MIN_DELAY_MS) {
784             delayMs = KvStoreSyncManager::SYNC_MIN_DELAY_MS;
785         }
786         if (delayMs > KvStoreSyncManager::SYNC_MAX_DELAY_MS) {
787             delayMs = KvStoreSyncManager::SYNC_MAX_DELAY_MS;
788         }
789     }
790     return delayMs;
791 }
792 
RemoveAllSyncOperation()793 Status SingleKvStoreImpl::RemoveAllSyncOperation()
794 {
795     return KvStoreSyncManager::GetInstance()->RemoveSyncOperation(reinterpret_cast<uintptr_t>(this));
796 }
797 
DoSyncComplete(const std::map<std::string,DistributedDB::DBStatus> & devicesSyncResult,const std::string & query,uint64_t sequenceId)798 void SingleKvStoreImpl::DoSyncComplete(const std::map<std::string, DistributedDB::DBStatus> &devicesSyncResult,
799                                        const std::string &query, uint64_t sequenceId)
800 {
801     DdsTrace trace(std::string("DdsTrace " LOG_TAG "::") + std::string(__FUNCTION__));
802     std::map<std::string, Status> resultMap;
803     for (auto device : devicesSyncResult) {
804         resultMap[device.first] = ConvertDbStatus(device.second);
805     }
806     syncRetries_ = 0;
807     ZLOGD("callback.");
808     if (syncCallback_ != nullptr) {
809         syncCallback_->SyncCompleted(resultMap, sequenceId);
810     }
811 }
812 
DoQuerySync(const std::vector<std::string> & deviceIds,SyncMode mode,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd,uint64_t sequenceId)813 Status SingleKvStoreImpl::DoQuerySync(const std::vector<std::string> &deviceIds, SyncMode mode,
814     const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId)
815 {
816     ZLOGD("start.");
817     std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
818     if (deviceUuids.empty()) {
819         ZLOGE("not found deviceIds.");
820         return Status::ERROR;
821     }
822     DistributedDB::SyncMode dbMode;
823     if (mode == SyncMode::PUSH) {
824         dbMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY;
825     } else if (mode == SyncMode::PULL) {
826         dbMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY;
827     } else {
828         dbMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
829     }
830     bool isSuccess = false;
831     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
832     if (!isSuccess) {
833         ZLOGE("StringToDbQuery failed.");
834         return Status::INVALID_ARGUMENT;
835     }
836     ZLOGD("StringToDbQuery success.");
837     DistributedDB::DBStatus status;
838     {
839         std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
840         if (kvStoreNbDelegate_ == nullptr) {
841             ZLOGE("kvstore is not open");
842             return Status::ILLEGAL_STATE;
843         }
844         waitingSyncCount_--;
845         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
846         status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd, dbQuery, false);
847         ZLOGD("end: %d", static_cast<int>(status));
848     }
849     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
850     if (status == DistributedDB::DBStatus::BUSY) {
851         if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) {
852             syncRetries_++;
853             auto addStatus = AddSync(deviceIds, mode, query, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId);
854             if (addStatus == Status::SUCCESS) {
855                 return addStatus;
856             }
857         }
858     }
859     return ConvertDbStatus(status);
860 }
861 
DoSync(const std::vector<std::string> & deviceIds,SyncMode mode,const KvStoreSyncManager::SyncEnd & syncEnd,uint64_t sequenceId)862 Status SingleKvStoreImpl::DoSync(const std::vector<std::string> &deviceIds, SyncMode mode,
863                                  const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId)
864 {
865     ZLOGD("start.");
866     std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
867     if (deviceUuids.empty()) {
868         ZLOGE("not found deviceIds.");
869         return Status::ERROR;
870     }
871     DistributedDB::SyncMode dbMode = ConvertToDbSyncMode(mode);
872     DistributedDB::DBStatus status;
873     {
874         std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
875         if (kvStoreNbDelegate_ == nullptr) {
876             ZLOGE("kvstore is not open");
877             return Status::ILLEGAL_STATE;
878         }
879         waitingSyncCount_--;
880         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
881         status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd);
882         ZLOGD("end: %d", static_cast<uint32_t>(status));
883     }
884     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
885     if (status == DistributedDB::DBStatus::BUSY) {
886         if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) {
887             syncRetries_++;
888             auto addStatus = AddSync(deviceUuids, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId);
889             if (addStatus == Status::SUCCESS) {
890                 return addStatus;
891             }
892         }
893     }
894     return ConvertDbStatus(status);
895 }
896 
MapNodeIdToUuids(const std::vector<std::string> & deviceIds)897 std::vector<std::string> SingleKvStoreImpl::MapNodeIdToUuids(const std::vector<std::string> &deviceIds)
898 {
899     std::vector<std::string> deviceUuids;
900     for (auto const &nodeId : deviceIds) {
901         std::string uuid = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(nodeId);
902         if (!uuid.empty()) {
903             deviceUuids.push_back(uuid);
904         }
905     }
906     return deviceUuids;
907 }
908 
DoSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd)909 Status SingleKvStoreImpl::DoSubscribe(const std::vector<std::string> &deviceIds,
910                                       const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd)
911 {
912     ZLOGD("start.");
913     std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
914     if (deviceUuids.empty()) {
915         ZLOGE("not found deviceIds.");
916         return Status::ERROR;
917     }
918     bool isSuccess = false;
919     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
920     if (!isSuccess) {
921         ZLOGE("StringToDbQuery failed.");
922         return Status::INVALID_ARGUMENT;
923     }
924     ZLOGD("StringToDbQuery success.");
925     DistributedDB::DBStatus status;
926     {
927         std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
928         if (kvStoreNbDelegate_ == nullptr) {
929             ZLOGE("kvstore is not open");
930             return Status::ILLEGAL_STATE;
931         }
932         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
933         status = kvStoreNbDelegate_->SubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false);
934         ZLOGD("end: %d", static_cast<uint32_t>(status));
935     }
936     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
937     return ConvertDbStatus(status);
938 }
939 
DoUnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,const KvStoreSyncManager::SyncEnd & syncEnd)940 Status SingleKvStoreImpl::DoUnSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
941                                         const KvStoreSyncManager::SyncEnd &syncEnd)
942 {
943     ZLOGD("start.");
944     std::vector<std::string> deviceUuids = MapNodeIdToUuids(deviceIds);
945     if (deviceUuids.empty()) {
946         ZLOGE("not found deviceIds.");
947         return Status::ERROR;
948     }
949     bool isSuccess = false;
950     DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess);
951     if (!isSuccess) {
952         ZLOGE("StringToDbQuery failed.");
953         return Status::INVALID_ARGUMENT;
954     }
955     ZLOGD("StringToDbQuery success.");
956     DistributedDB::DBStatus status;
957     {
958         std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
959         if (kvStoreNbDelegate_ == nullptr) {
960             ZLOGE("kvstore is not open");
961             return Status::ILLEGAL_STATE;
962         }
963         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
964         status = kvStoreNbDelegate_->UnSubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false);
965         ZLOGD("end: %d", static_cast<uint32_t>(status));
966     }
967     return ConvertDbStatus(status);
968 }
969 
AddSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint32_t delayMs,uint64_t sequenceId)970 Status SingleKvStoreImpl::AddSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
971                                        uint32_t delayMs, uint64_t sequenceId)
972 {
973     ZLOGD("start.");
974     return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
975         std::bind(&SingleKvStoreImpl::DoSubscribe, this, deviceIds, query, std::placeholders::_1),
976         std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
977 }
978 
AddUnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint32_t delayMs,uint64_t sequenceId)979 Status SingleKvStoreImpl::AddUnSubscribe(const std::vector<std::string> &deviceIds, const std::string &query,
980                                          uint32_t delayMs, uint64_t sequenceId)
981 {
982     ZLOGD("start.");
983     return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast<uintptr_t>(this), delayMs,
984         std::bind(&SingleKvStoreImpl::DoUnSubscribe, this, deviceIds, query, std::placeholders::_1),
985         std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId));
986 }
987 
Subscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint64_t sequenceId)988 Status SingleKvStoreImpl::Subscribe(const std::vector<std::string> &deviceIds,
989                                     const std::string &query, uint64_t sequenceId)
990 {
991     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
992     ZLOGD("start.");
993     if (!flowCtrl_.IsTokenEnough()) {
994         ZLOGE("flow control denied");
995         return Status::EXCEED_MAX_ACCESS_RATE;
996     }
997     uint32_t delayMs = GetSyncDelayTime(0);
998     return AddSubscribe(deviceIds, query, delayMs, sequenceId);
999 }
1000 
UnSubscribe(const std::vector<std::string> & deviceIds,const std::string & query,uint64_t sequenceId)1001 Status SingleKvStoreImpl::UnSubscribe(const std::vector<std::string> &deviceIds,
1002                                       const std::string &query, uint64_t sequenceId)
1003 {
1004     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1005     ZLOGD("start.");
1006     if (!flowCtrl_.IsTokenEnough()) {
1007         ZLOGE("flow control denied");
1008         return Status::EXCEED_MAX_ACCESS_RATE;
1009     }
1010     uint32_t delayMs = GetSyncDelayTime(0);
1011     return AddUnSubscribe(deviceIds, query, delayMs, sequenceId);
1012 }
1013 
Close(DistributedDB::KvStoreDelegateManager * kvStoreDelegateManager)1014 InnerStatus SingleKvStoreImpl::Close(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager)
1015 {
1016     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1017 
1018     ZLOGW("start Close");
1019     if (openCount_ > 1) {
1020         openCount_--;
1021         return InnerStatus::DECREASE_REFCOUNT;
1022     }
1023     Status status = ForceClose(kvStoreDelegateManager);
1024     if (status == Status::SUCCESS) {
1025         return InnerStatus::SUCCESS;
1026     }
1027     return InnerStatus::ERROR;
1028 }
1029 
ForceClose(DistributedDB::KvStoreDelegateManager * kvStoreDelegateManager)1030 Status SingleKvStoreImpl::ForceClose(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager)
1031 {
1032     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1033 
1034     ZLOGI("start, current openCount is %d.", openCount_);
1035     std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1036     if (kvStoreNbDelegate_ == nullptr || kvStoreDelegateManager == nullptr) {
1037         ZLOGW("get nullptr");
1038         return Status::INVALID_ARGUMENT;
1039     }
1040     RemoveAllSyncOperation();
1041     ZLOGI("start to clean observer");
1042     std::lock_guard<std::mutex> observerMapLockGuard(observerMapMutex_);
1043     for (auto observer = observerMap_.begin(); observer != observerMap_.end();) {
1044         DistributedDB::DBStatus dbStatus = kvStoreNbDelegate_->UnRegisterObserver(observer->second);
1045         if (dbStatus == DistributedDB::DBStatus::OK) {
1046             delete observer->second;
1047             observer = observerMap_.erase(observer);
1048         } else {
1049             ZLOGW("UnSubscribeKvStore failed during ForceClose, status %d.", dbStatus);
1050             return Status::ERROR;
1051         }
1052     }
1053     ZLOGI("start to clean resultset");
1054     std::lock_guard<std::mutex> lg(storeResultSetMutex_);
1055     for (auto resultSetPair = storeResultSetMap_.begin(); resultSetPair != storeResultSetMap_.end();) {
1056         Status status = (resultSetPair->second)->CloseResultSet(kvStoreNbDelegate_);
1057         if (status != Status::SUCCESS) {
1058             ZLOGW("CloseResultSet failed during ForceClose, errCode %d", status);
1059             return status;
1060         }
1061         resultSetPair = storeResultSetMap_.erase(resultSetPair);
1062     }
1063     DistributedDB::DBStatus status = kvStoreDelegateManager->CloseKvStore(kvStoreNbDelegate_);
1064     if (status == DistributedDB::DBStatus::OK) {
1065         kvStoreNbDelegate_ = nullptr;
1066         ZLOGI("end.");
1067         return Status::SUCCESS;
1068     }
1069     ZLOGI("failed with error code %d.", status);
1070     return Status::ERROR;
1071 }
1072 
MigrateKvStore(const std::string & harmonyAccountId,const std::string & kvStoreDataDir,DistributedDB::KvStoreDelegateManager * oldDelegateMgr,DistributedDB::KvStoreDelegateManager * & newDelegateMgr)1073 Status SingleKvStoreImpl::MigrateKvStore(const std::string &harmonyAccountId,
1074                                          const std::string &kvStoreDataDir,
1075                                          DistributedDB::KvStoreDelegateManager *oldDelegateMgr,
1076                                          DistributedDB::KvStoreDelegateManager *&newDelegateMgr)
1077 {
1078     ZLOGI("begin.");
1079     std::unique_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1080     if (oldDelegateMgr == nullptr) {
1081         ZLOGW("kvStore delegate manager is nullptr.");
1082         return Status::INVALID_ARGUMENT;
1083     }
1084 
1085     ZLOGI("create new KvStore.");
1086     std::vector<uint8_t> secretKey; // expected get secret key from meta kvstore successful when encrypt flag is true.
1087     std::unique_ptr<std::vector<uint8_t>, void(*)(std::vector<uint8_t>*)> cleanGuard(
1088             &secretKey, [](std::vector<uint8_t> *ptr) { ptr->assign(ptr->size(), 0); });
1089     bool outdated = false; // ignore outdated flag during rebuild kvstore.
1090     auto metaSecretKey = KvStoreMetaManager::GetMetaKey(deviceAccountId_, "default", bundleName_, storeId_,
1091                                                         "SINGLE_KEY");
1092     if (options_.encrypt) {
1093         KvStoreMetaManager::GetInstance().GetSecretKeyFromMeta(metaSecretKey, secretKey, outdated);
1094         if (secretKey.empty()) {
1095             ZLOGE("Get secret key from meta kvstore failed.");
1096             return Status::CRYPT_ERROR;
1097         }
1098     }
1099 
1100     DistributedDB::DBStatus dbStatus;
1101     DistributedDB::KvStoreNbDelegate::Option dbOption;
1102     Status status = KvStoreAppManager::InitNbDbOption(options_, secretKey, dbOption);
1103     if (status != Status::SUCCESS) {
1104         ZLOGE("InitNbDbOption failed.");
1105         return status;
1106     }
1107 
1108     if (newDelegateMgr == nullptr) {
1109         if (appId_.empty()) {
1110             ZLOGE("Get appId by bundle name failed.");
1111             return Status::MIGRATION_KVSTORE_FAILED;
1112         }
1113         newDelegateMgr = new (std::nothrow) DistributedDB::KvStoreDelegateManager(appId_, harmonyAccountId);
1114         if (newDelegateMgr == nullptr) {
1115             ZLOGE("new KvStoreDelegateManager failed.");
1116             return Status::MIGRATION_KVSTORE_FAILED;
1117         }
1118         DistributedDB::KvStoreConfig kvStoreConfig;
1119         kvStoreConfig.dataDir = kvStoreDataDir;
1120         newDelegateMgr->SetKvStoreConfig(kvStoreConfig);
1121     }
1122     DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate = nullptr; // new KvStoreNbDelegate get from distributed DB.
1123     newDelegateMgr->GetKvStore(
1124         storeId_, dbOption,
1125         [&](DistributedDB::DBStatus status, DistributedDB::KvStoreNbDelegate *delegate) {
1126             kvStoreNbDelegate = delegate;
1127             dbStatus = status;
1128         });
1129     if (kvStoreNbDelegate == nullptr) {
1130         ZLOGE("storeDelegate is nullptr, dbStatusTmp: %d", static_cast<int>(dbStatus));
1131         return Status::DB_ERROR;
1132     }
1133 
1134     if (options_.autoSync) {
1135         bool autoSync = true;
1136         auto data = static_cast<DistributedDB::PragmaData>(&autoSync);
1137         auto pragmaStatus = kvStoreNbDelegate->Pragma(DistributedDB::PragmaCmd::AUTO_SYNC, data);
1138         if (pragmaStatus != DistributedDB::DBStatus::OK) {
1139             ZLOGE("pragmaStatus: %d", static_cast<int>(pragmaStatus));
1140         }
1141     }
1142 
1143     status = RebuildKvStoreObserver(kvStoreNbDelegate);
1144     if (status != Status::SUCCESS) {
1145         ZLOGI("rebuild KvStore observer failed, errCode %d.", static_cast<int>(status));
1146         // skip this failed, continue to do other rebuild process.
1147     }
1148 
1149     status = RebuildKvStoreResultSet();
1150     if (status != Status::SUCCESS) {
1151         ZLOGI("rebuild KvStore resultset failed, errCode %d.", static_cast<int>(status));
1152         // skip this failed, continue to do close kvstore process.
1153     }
1154 
1155     ZLOGI("close old KvStore.");
1156     dbStatus = oldDelegateMgr->CloseKvStore(kvStoreNbDelegate_);
1157     if (dbStatus != DistributedDB::DBStatus::OK) {
1158         ZLOGI("rebuild KvStore failed during close KvStore, errCode %d.", static_cast<int>(status));
1159         newDelegateMgr->CloseKvStore(kvStoreNbDelegate);
1160         return Status::DB_ERROR;
1161     }
1162 
1163     ZLOGI("update kvstore delegate.");
1164     kvStoreNbDelegate_ = kvStoreNbDelegate;
1165     return Status::SUCCESS;
1166 }
1167 
RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate * kvStoreNbDelegate)1168 Status SingleKvStoreImpl::RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate)
1169 {
1170     ZLOGI("rebuild observer.");
1171     if (kvStoreNbDelegate_ == nullptr || kvStoreNbDelegate == nullptr) {
1172         ZLOGI("RebuildKvStoreObserver illlegal.");
1173         return Status::ILLEGAL_STATE;
1174     }
1175     std::lock_guard<std::mutex> observerMapLockGuard(observerMapMutex_);
1176     Status status = Status::SUCCESS;
1177     DistributedDB::DBStatus dbStatus;
1178     DistributedDB::Key emptyKey;
1179     for (const auto &observerPair : observerMap_) {
1180         dbStatus = kvStoreNbDelegate_->UnRegisterObserver(observerPair.second);
1181         if (dbStatus != DistributedDB::OK) {
1182             status = Status::DB_ERROR;
1183             ZLOGW("rebuild observer failed during UnRegisterObserver, status %d.", static_cast<int>(dbStatus));
1184             continue;
1185         }
1186         dbStatus = kvStoreNbDelegate->RegisterObserver(emptyKey,
1187             static_cast<unsigned int>(ConvertToDbObserverMode(observerPair.second->GetSubscribeType())),
1188             observerPair.second);
1189         if (dbStatus != DistributedDB::OK) {
1190             status = Status::DB_ERROR;
1191             ZLOGW("rebuild observer failed during RegisterObserver, status %d.", static_cast<int>(dbStatus));
1192             continue;
1193         }
1194     }
1195     return status;
1196 }
1197 
RebuildKvStoreResultSet()1198 Status SingleKvStoreImpl::RebuildKvStoreResultSet()
1199 {
1200     if (kvStoreNbDelegate_ == nullptr) {
1201         return Status::INVALID_ARGUMENT;
1202     }
1203     ZLOGI("rebuild resultset");
1204     std::lock_guard<std::mutex> lg(storeResultSetMutex_);
1205     Status retStatus = Status::SUCCESS;
1206     for (const auto &resultSetPair : storeResultSetMap_) {
1207         Status status = (resultSetPair.second)->MigrateKvStore(kvStoreNbDelegate_);
1208         if (status != Status::SUCCESS) {
1209             retStatus = status;
1210             ZLOGW("rebuild resultset failed, errCode %d", static_cast<int>(status));
1211             continue;
1212         }
1213     }
1214     return retStatus;
1215 }
1216 
ReKey(const std::vector<uint8_t> & key)1217 Status SingleKvStoreImpl::ReKey(const std::vector<uint8_t> &key)
1218 {
1219     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1220 
1221     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1222     if (kvStoreNbDelegate_ == nullptr) {
1223         ZLOGE("delegate is null.");
1224         return Status::DB_ERROR;
1225     }
1226     DistributedDB::CipherPassword password;
1227     auto status = password.SetValue(key.data(), key.size());
1228     if (status != DistributedDB::CipherPassword::ErrorCode::OK) {
1229         ZLOGE("Failed to set the passwd.");
1230         return Status::DB_ERROR;
1231     }
1232     DistributedDB::DBStatus dbStatus;
1233     {
1234         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1235         dbStatus = kvStoreNbDelegate_->Rekey(password);
1236     }
1237     if (dbStatus == DistributedDB::DBStatus::OK) {
1238         return Status::SUCCESS;
1239     }
1240     return Status::ERROR;
1241 }
1242 
RegisterSyncCallback(sptr<IKvStoreSyncCallback> callback)1243 Status SingleKvStoreImpl::RegisterSyncCallback(sptr<IKvStoreSyncCallback> callback)
1244 {
1245     syncCallback_ = std::move(callback);
1246     return Status::SUCCESS;
1247 }
1248 
UnRegisterSyncCallback()1249 Status SingleKvStoreImpl::UnRegisterSyncCallback()
1250 {
1251     syncCallback_ = nullptr;
1252     return Status::SUCCESS;
1253 }
1254 
PutBatch(const std::vector<Entry> & entries)1255 Status SingleKvStoreImpl::PutBatch(const std::vector<Entry> &entries)
1256 {
1257     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1258 
1259     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1260     if (kvStoreNbDelegate_ == nullptr) {
1261         ZLOGE("delegate is null.");
1262         return Status::DB_ERROR;
1263     }
1264     if (!flowCtrl_.IsTokenEnough()) {
1265         ZLOGE("flow control denied");
1266         return Status::EXCEED_MAX_ACCESS_RATE;
1267     }
1268 
1269     // temporary transform.
1270     std::vector<DistributedDB::Entry> dbEntries;
1271     for (auto &entry : entries) {
1272         DistributedDB::Entry dbEntry;
1273 
1274         std::vector<uint8_t> keyData = Constant::TrimCopy<std::vector<uint8_t>>(entry.key.Data());
1275         if (keyData.size() == 0 || keyData.size() > Constant::MAX_KEY_LENGTH) {
1276             ZLOGE("invalid key.");
1277             return Status::INVALID_ARGUMENT;
1278         }
1279 
1280         dbEntry.key = keyData;
1281         dbEntry.value = entry.value.Data();
1282         dbEntries.push_back(dbEntry);
1283     }
1284     DistributedDB::DBStatus status;
1285     {
1286         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1287         status = kvStoreNbDelegate_->PutBatch(dbEntries);
1288     }
1289     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1290         ZLOGE("PutBatch failed, distributeddb need recover.");
1291         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1292     }
1293 
1294     if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
1295         status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
1296         ZLOGE("delegate PutBatch failed.");
1297         return Status::SECURITY_LEVEL_ERROR;
1298     }
1299 
1300     if (status != DistributedDB::DBStatus::OK) {
1301         ZLOGE("delegate PutBatch failed.");
1302         return Status::DB_ERROR;
1303     }
1304 
1305     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1306     return Status::SUCCESS;
1307 }
1308 
DeleteBatch(const std::vector<Key> & keys)1309 Status SingleKvStoreImpl::DeleteBatch(const std::vector<Key> &keys)
1310 {
1311     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1312 
1313     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1314     if (kvStoreNbDelegate_ == nullptr) {
1315         ZLOGE("delegate is null.");
1316         return Status::DB_ERROR;
1317     }
1318     if (!flowCtrl_.IsTokenEnough()) {
1319         ZLOGE("flow control denied");
1320         return Status::EXCEED_MAX_ACCESS_RATE;
1321     }
1322 
1323     // temporary transform.
1324     std::vector<DistributedDB::Key> dbKeys;
1325     for (auto &key : keys) {
1326         std::vector<uint8_t> keyData = Constant::TrimCopy<std::vector<uint8_t>>(key.Data());
1327         if (keyData.size() == 0 || keyData.size() > Constant::MAX_KEY_LENGTH) {
1328             ZLOGE("invalid key.");
1329             return Status::INVALID_ARGUMENT;
1330         }
1331 
1332         DistributedDB::Key keyTmp = keyData;
1333         dbKeys.push_back(keyTmp);
1334     }
1335     DistributedDB::DBStatus status;
1336     {
1337         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1338         status = kvStoreNbDelegate_->DeleteBatch(dbKeys);
1339     }
1340     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1341         ZLOGE("DeleteBatch failed, distributeddb need recover.");
1342         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1343     }
1344 
1345     if (status == DistributedDB::DBStatus::EKEYREVOKED_ERROR ||
1346         status == DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR) {
1347         ZLOGE("delegate DeleteBatch failed.");
1348         return Status::SECURITY_LEVEL_ERROR;
1349     }
1350 
1351     if (status != DistributedDB::DBStatus::OK) {
1352         ZLOGE("delegate DeleteBatch failed.");
1353         return Status::DB_ERROR;
1354     }
1355 
1356     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1357     return Status::SUCCESS;
1358 }
1359 
StartTransaction()1360 Status SingleKvStoreImpl::StartTransaction()
1361 {
1362     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1363 
1364     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1365     if (kvStoreNbDelegate_ == nullptr) {
1366         ZLOGE("delegate is null.");
1367         return Status::DB_ERROR;
1368     }
1369     if (!flowCtrl_.IsTokenEnough()) {
1370         ZLOGE("flow control denied");
1371         return Status::EXCEED_MAX_ACCESS_RATE;
1372     }
1373     DistributedDB::DBStatus status;
1374     {
1375         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1376         status = kvStoreNbDelegate_->StartTransaction();
1377     }
1378     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1379         ZLOGE("StartTransaction failed, distributeddb need recover.");
1380         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1381     }
1382     if (status != DistributedDB::DBStatus::OK) {
1383         ZLOGE("delegate return error.");
1384         return Status::DB_ERROR;
1385     }
1386     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1387     return Status::SUCCESS;
1388 }
1389 
Commit()1390 Status SingleKvStoreImpl::Commit()
1391 {
1392     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1393 
1394     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1395     if (kvStoreNbDelegate_ == nullptr) {
1396         ZLOGE("delegate is null.");
1397         return Status::DB_ERROR;
1398     }
1399     if (!flowCtrl_.IsTokenEnough()) {
1400         ZLOGE("flow control denied");
1401         return Status::EXCEED_MAX_ACCESS_RATE;
1402     }
1403     DistributedDB::DBStatus status;
1404     {
1405         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1406         status = kvStoreNbDelegate_->Commit();
1407     }
1408     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1409         ZLOGE("Commit failed, distributeddb need recover.");
1410         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1411     }
1412     if (status != DistributedDB::DBStatus::OK) {
1413         ZLOGE("delegate return error.");
1414         return Status::DB_ERROR;
1415     }
1416 
1417     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1418     return Status::SUCCESS;
1419 }
1420 
Rollback()1421 Status SingleKvStoreImpl::Rollback()
1422 {
1423     DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__));
1424 
1425     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1426     if (kvStoreNbDelegate_ == nullptr) {
1427         ZLOGE("delegate is null.");
1428         return Status::DB_ERROR;
1429     }
1430     if (!flowCtrl_.IsTokenEnough()) {
1431         ZLOGE("flow control denied");
1432         return Status::EXCEED_MAX_ACCESS_RATE;
1433     }
1434     DistributedDB::DBStatus status;
1435     {
1436         DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__));
1437         status = kvStoreNbDelegate_->Rollback();
1438     }
1439     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
1440         ZLOGE("Rollback failed, distributeddb need recover.");
1441         return (Import(bundleName_) ? Status::RECOVER_SUCCESS : Status::RECOVER_FAILED);
1442     }
1443     if (status != DistributedDB::DBStatus::OK) {
1444         ZLOGE("delegate return error.");
1445         return Status::DB_ERROR;
1446     }
1447 
1448     Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__});
1449     return Status::SUCCESS;
1450 }
1451 
Control(KvControlCmd cmd,const KvParam & inputParam,sptr<KvParam> & output)1452 Status SingleKvStoreImpl::Control(KvControlCmd cmd, const KvParam &inputParam, sptr<KvParam> &output)
1453 {
1454     output = nullptr;
1455     switch (cmd) {
1456         case KvControlCmd::SET_SYNC_PARAM: {
1457             if (inputParam.Size() != sizeof(uint32_t)) {
1458                 return Status::IPC_ERROR;
1459             }
1460             uint32_t allowedDelayMs = TransferByteArrayToType<uint32_t>(inputParam.Data());
1461             ZLOGE("SET_SYNC_PARAM in %{public}d ms", allowedDelayMs);
1462             if (allowedDelayMs > 0 && allowedDelayMs < KvStoreSyncManager::SYNC_MIN_DELAY_MS) {
1463                 return Status::INVALID_ARGUMENT;
1464             }
1465             if (allowedDelayMs > KvStoreSyncManager::SYNC_MAX_DELAY_MS) {
1466                 return Status::INVALID_ARGUMENT;
1467             }
1468             defaultSyncDelayMs_ = allowedDelayMs;
1469             ZLOGE("SET_SYNC_PARAM save %{public}d ms", defaultSyncDelayMs_);
1470             return Status::SUCCESS;
1471         }
1472         case KvControlCmd::GET_SYNC_PARAM: {
1473             output = new KvParam(TransferTypeToByteArray<uint32_t>(defaultSyncDelayMs_));
1474             ZLOGE("GET_SYNC_PARAM read %{public}d ms", defaultSyncDelayMs_);
1475             return Status::SUCCESS;
1476         }
1477         default: {
1478             ZLOGE("control invalid command.");
1479             return Status::ERROR;
1480         }
1481     }
1482 }
1483 
IncreaseOpenCount()1484 void SingleKvStoreImpl::IncreaseOpenCount()
1485 {
1486     openCount_++;
1487 }
1488 
Import(const std::string & bundleName) const1489 bool SingleKvStoreImpl::Import(const std::string &bundleName) const
1490 {
1491     ZLOGI("Single KvStoreImpl Import start");
1492     const std::string harmonyAccountId = AccountDelegate::GetInstance()->GetCurrentAccountId();
1493     auto sKey = KvStoreMetaManager::GetMetaKey(deviceAccountId_, harmonyAccountId, bundleName, storeId_, "SINGLE_KEY");
1494     std::vector<uint8_t> secretKey;
1495     bool outdated = false;
1496     KvStoreMetaManager::GetInstance().GetSecretKeyFromMeta(sKey, secretKey, outdated);
1497     MetaData metaData{0};
1498     metaData.kvStoreMetaData.deviceAccountId = deviceAccountId_;
1499     metaData.kvStoreMetaData.userId = harmonyAccountId;
1500     metaData.kvStoreMetaData.bundleName = bundleName;
1501     metaData.kvStoreMetaData.appId = appId_;
1502     metaData.kvStoreMetaData.storeId = storeId_;
1503     metaData.kvStoreMetaData.securityLevel = options_.securityLevel;
1504     metaData.secretKeyMetaData.secretKey = secretKey;
1505     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1506     return std::make_unique<BackupHandler>()->SingleKvStoreRecover(metaData, kvStoreNbDelegate_);
1507 }
1508 
SetCapabilityEnabled(bool enabled)1509 Status SingleKvStoreImpl::SetCapabilityEnabled(bool enabled)
1510 {
1511     ZLOGD("begin.");
1512     if (!flowCtrl_.IsTokenEnough()) {
1513         ZLOGE("flow control denied");
1514         return Status::EXCEED_MAX_ACCESS_RATE;
1515     }
1516 
1517     std::string key;
1518     std::string devId = DeviceKvStoreImpl::GetLocalDeviceId();
1519     if (devId.empty()) {
1520         ZLOGE("get device id empty.");
1521         return Status::ERROR;
1522     }
1523 
1524     StrategyMeta params = {devId, deviceAccountId_, Constant::DEFAULT_GROUP_ID, bundleName_, storeId_};
1525     KvStoreMetaManager::GetInstance().GetStrategyMetaKey(params, key);
1526     if (key.empty()) {
1527         ZLOGE("get key empty.");
1528         return Status::ERROR;
1529     }
1530     ZLOGD("end.");
1531     return KvStoreMetaManager::GetInstance().SaveStrategyMetaEnable(key, enabled);
1532 }
1533 
SetCapabilityRange(const std::vector<std::string> & localLabels,const std::vector<std::string> & remoteSupportLabels)1534 Status SingleKvStoreImpl::SetCapabilityRange(const std::vector<std::string> &localLabels,
1535                                              const std::vector<std::string> &remoteSupportLabels)
1536 {
1537     if (!flowCtrl_.IsTokenEnough()) {
1538         ZLOGE("flow control denied");
1539         return Status::EXCEED_MAX_ACCESS_RATE;
1540     }
1541 
1542     std::string key;
1543     std::string devId = DeviceKvStoreImpl::GetLocalDeviceId();
1544     if (devId.empty()) {
1545         ZLOGE("get device id empty.");
1546         return Status::ERROR;
1547     }
1548 
1549     StrategyMeta params = {devId, deviceAccountId_, Constant::DEFAULT_GROUP_ID, bundleName_, storeId_};
1550     KvStoreMetaManager::GetInstance().GetStrategyMetaKey(params, key);
1551     if (key.empty()) {
1552         ZLOGE("get key empty.");
1553         return Status::ERROR;
1554     }
1555 
1556     return KvStoreMetaManager::GetInstance().SaveStrategyMetaLabels(key, localLabels, remoteSupportLabels);
1557 }
1558 
GetSecurityLevel(SecurityLevel & securityLevel)1559 Status SingleKvStoreImpl::GetSecurityLevel(SecurityLevel &securityLevel)
1560 {
1561     std::shared_lock<std::shared_mutex> lock(storeNbDelegateMutex_);
1562     if (kvStoreNbDelegate_ == nullptr) {
1563         return Status::STORE_NOT_OPEN;
1564     }
1565 
1566     DistributedDB::SecurityOption option;
1567     auto status = kvStoreNbDelegate_->GetSecurityOption(option);
1568     if (status == DistributedDB::DBStatus::NOT_SUPPORT) {
1569         return Status::NOT_SUPPORT;
1570     }
1571 
1572     if (status != DistributedDB::DBStatus::OK) {
1573         return Status::DB_ERROR;
1574     }
1575 
1576     switch (option.securityLabel) {
1577         case DistributedDB::NOT_SET:
1578         case DistributedDB::S0:
1579         case DistributedDB::S1:
1580         case DistributedDB::S2:
1581             securityLevel = static_cast<SecurityLevel>(option.securityLabel);
1582             break;
1583         case DistributedDB::S3:
1584             securityLevel = option.securityFlag ? S3 : S3_EX;
1585             break;
1586         case DistributedDB::S4:
1587             securityLevel = S4;
1588             break;
1589         default:
1590             break;
1591     }
1592     return Status::SUCCESS;
1593 }
1594 
OnDump(int fd) const1595 void SingleKvStoreImpl::OnDump(int fd) const
1596 {
1597     const std::string prefix(12, ' ');
1598     dprintf(fd, "%s------------------------------------------------------\n", prefix.c_str());
1599     dprintf(fd, "%sStoreID    : %s\n", prefix.c_str(), storeId_.c_str());
1600     dprintf(fd, "%sStorePath  : %s\n", prefix.c_str(), storePath_.c_str());
1601 
1602     dprintf(fd, "%sOptions :\n", prefix.c_str());
1603     dprintf(fd, "%s    backup          : %d\n", prefix.c_str(), static_cast<int>(options_.backup));
1604     dprintf(fd, "%s    encrypt         : %d\n", prefix.c_str(), static_cast<int>(options_.encrypt));
1605     dprintf(fd, "%s    autoSync        : %d\n", prefix.c_str(), static_cast<int>(options_.autoSync));
1606     dprintf(fd, "%s    persistant      : %d\n", prefix.c_str(), static_cast<int>(options_.persistant));
1607     dprintf(fd, "%s    kvStoreType     : %d\n", prefix.c_str(), static_cast<int>(options_.kvStoreType));
1608     dprintf(fd, "%s    createIfMissing : %d\n", prefix.c_str(), static_cast<int>(options_.createIfMissing));
1609     dprintf(fd, "%s    schema          : %s\n", prefix.c_str(), options_.schema.c_str());
1610 }
GetStoreId()1611 std::string SingleKvStoreImpl::GetStoreId()
1612 {
1613     return storeId_;
1614 }
1615 
SetCompatibleIdentify(const std::string & changedDevice)1616 void SingleKvStoreImpl::SetCompatibleIdentify(const std::string &changedDevice)
1617 {
1618     bool flag = false;
1619     auto capability = UpgradeManager::GetInstance().GetCapability(changedDevice, flag);
1620     if (!flag || capability.version >= CapMetaData::CURRENT_VERSION) {
1621         ZLOGE("get peer capability %{public}d, or not older version", flag);
1622         return;
1623     }
1624 
1625     auto peerUserId = 0; // peer user id reversed here
1626     auto groupType =
1627         AuthDelegate::GetInstance()->GetGroupType(std::stoi(deviceAccountId_), peerUserId, changedDevice, appId_);
1628     flag = false;
1629     std::string compatibleUserId = UpgradeManager::GetIdentifierByType(groupType, flag);
1630     if (!flag) {
1631         ZLOGE("failed to get identifier by group type %{public}d", groupType);
1632         return;
1633     }
1634     // older version use bound account syncIdentifier instead of user syncIdentifier
1635     ZLOGI("compatible user:%{public}s", compatibleUserId.c_str());
1636     auto syncIdentifier =
1637         DistributedDB::KvStoreDelegateManager::GetKvStoreIdentifier(compatibleUserId, appId_, storeId_);
1638     kvStoreNbDelegate_->SetEqualIdentifier(syncIdentifier, { changedDevice });
1639 }
1640 
SetCompatibleIdentify()1641 void SingleKvStoreImpl::SetCompatibleIdentify()
1642 {
1643     KvStoreTuple tuple = { deviceAccountId_, appId_, storeId_ };
1644     UpgradeManager::SetCompatibleIdentifyByType(kvStoreNbDelegate_, tuple, IDENTICAL_ACCOUNT_GROUP);
1645     UpgradeManager::SetCompatibleIdentifyByType(kvStoreNbDelegate_, tuple, PEER_TO_PEER_GROUP);
1646 }
1647 }  // namespace OHOS::DistributedKv
1648