• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "kv_store_nb_delegate_impl.h"
17 
18 #include <functional>
19 #include <string>
20 
21 #include "db_common.h"
22 #include "db_constant.h"
23 #include "db_errno.h"
24 #include "db_types.h"
25 #include "kv_store_changed_data_impl.h"
26 #include "kv_store_errno.h"
27 #include "kv_store_nb_conflict_data_impl.h"
28 #include "kv_store_observer.h"
29 #include "kv_store_result_set_impl.h"
30 #include "kvdb_manager.h"
31 #include "kvdb_pragma.h"
32 #include "log_print.h"
33 #include "param_check_utils.h"
34 #include "performance_analysis.h"
35 #include "platform_specific.h"
36 #include "store_types.h"
37 #include "sync_operation.h"
38 
39 namespace DistributedDB {
40 namespace {
41     struct PragmaCmdPair {
42         int externCmd = 0;
43         int innerCmd = 0;
44     };
45 
46     const PragmaCmdPair g_pragmaMap[] = {
47         {GET_DEVICE_IDENTIFIER_OF_ENTRY, PRAGMA_GET_DEVICE_IDENTIFIER_OF_ENTRY},
48         {AUTO_SYNC, PRAGMA_AUTO_SYNC},
49         {PERFORMANCE_ANALYSIS_GET_REPORT, PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT},
50         {PERFORMANCE_ANALYSIS_OPEN, PRAGMA_PERFORMANCE_ANALYSIS_OPEN},
51         {PERFORMANCE_ANALYSIS_CLOSE, PRAGMA_PERFORMANCE_ANALYSIS_CLOSE},
52         {PERFORMANCE_ANALYSIS_SET_REPORTFILENAME, PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME},
53         {GET_IDENTIFIER_OF_DEVICE, PRAGMA_GET_IDENTIFIER_OF_DEVICE},
54         {GET_QUEUED_SYNC_SIZE, PRAGMA_GET_QUEUED_SYNC_SIZE},
55         {SET_QUEUED_SYNC_LIMIT, PRAGMA_SET_QUEUED_SYNC_LIMIT},
56         {GET_QUEUED_SYNC_LIMIT, PRAGMA_GET_QUEUED_SYNC_LIMIT},
57         {SET_WIPE_POLICY, PRAGMA_SET_WIPE_POLICY},
58         {RESULT_SET_CACHE_MODE, PRAGMA_RESULT_SET_CACHE_MODE},
59         {RESULT_SET_CACHE_MAX_SIZE, PRAGMA_RESULT_SET_CACHE_MAX_SIZE},
60         {SET_SYNC_RETRY, PRAGMA_SET_SYNC_RETRY},
61         {SET_MAX_LOG_LIMIT, PRAGMA_SET_MAX_LOG_LIMIT},
62         {EXEC_CHECKPOINT, PRAGMA_EXEC_CHECKPOINT},
63         {SET_MAX_VALUE_SIZE, PRAGMA_SET_MAX_VALUE_SIZE},
64     };
65 
66     constexpr const char *INVALID_CONNECTION = "[KvStoreNbDelegate] Invalid connection for operation";
67 }
68 
KvStoreNbDelegateImpl(IKvDBConnection * conn,const std::string & storeId)69 KvStoreNbDelegateImpl::KvStoreNbDelegateImpl(IKvDBConnection *conn, const std::string &storeId)
70     : conn_(conn),
71       storeId_(storeId),
72       releaseFlag_(false)
73 {}
74 
~KvStoreNbDelegateImpl()75 KvStoreNbDelegateImpl::~KvStoreNbDelegateImpl()
76 {
77     if (!releaseFlag_) {
78         LOGF("[KvStoreNbDelegate] Can't release directly");
79         return;
80     }
81     conn_ = nullptr;
82 #ifndef _WIN32
83     std::lock_guard<std::mutex> lock(libMutex_);
84     DBCommon::UnLoadGrdLib(dlHandle_);
85     dlHandle_ = nullptr;
86 #endif
87 }
88 
Get(const Key & key,Value & value) const89 DBStatus KvStoreNbDelegateImpl::Get(const Key &key, Value &value) const
90 {
91     IOption option;
92     option.dataType = IOption::SYNC_DATA;
93     return GetInner(option, key, value);
94 }
95 
GetEntries(const Key & keyPrefix,std::vector<Entry> & entries) const96 DBStatus KvStoreNbDelegateImpl::GetEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
97 {
98     IOption option;
99     option.dataType = IOption::SYNC_DATA;
100     return GetEntriesInner(option, keyPrefix, entries);
101 }
102 
GetEntries(const Key & keyPrefix,KvStoreResultSet * & resultSet) const103 DBStatus KvStoreNbDelegateImpl::GetEntries(const Key &keyPrefix, KvStoreResultSet *&resultSet) const
104 {
105     if (conn_ == nullptr) {
106         LOGE("%s", INVALID_CONNECTION);
107         return DB_ERROR;
108     }
109 
110     IOption option;
111     option.dataType = IOption::SYNC_DATA;
112     IKvDBResultSet *kvDbResultSet = nullptr;
113     int errCode = conn_->GetResultSet(option, keyPrefix, kvDbResultSet);
114     if (errCode == E_OK) {
115         resultSet = new (std::nothrow) KvStoreResultSetImpl(kvDbResultSet);
116         if (resultSet != nullptr) {
117             return OK;
118         }
119 
120         LOGE("[KvStoreNbDelegate] Alloc result set failed.");
121         conn_->ReleaseResultSet(kvDbResultSet);
122         kvDbResultSet = nullptr;
123         return DB_ERROR;
124     }
125 
126     LOGE("[KvStoreNbDelegate] Get result set failed: %d", errCode);
127     return TransferDBErrno(errCode);
128 }
129 
GetEntries(const Query & query,std::vector<Entry> & entries) const130 DBStatus KvStoreNbDelegateImpl::GetEntries(const Query &query, std::vector<Entry> &entries) const
131 {
132     IOption option;
133     option.dataType = IOption::SYNC_DATA;
134     if (conn_ != nullptr) {
135         int errCode = conn_->GetEntries(option, query, entries);
136         if (errCode == E_OK) {
137             return OK;
138         } else if (errCode == -E_NOT_FOUND) {
139             LOGD("[KvStoreNbDelegate] Not found the data by query");
140             return NOT_FOUND;
141         }
142 
143         LOGE("[KvStoreNbDelegate] Get the batch data by query err:%d", errCode);
144         return TransferDBErrno(errCode);
145     }
146 
147     LOGE("%s", INVALID_CONNECTION);
148     return DB_ERROR;
149 }
150 
GetEntries(const Query & query,KvStoreResultSet * & resultSet) const151 DBStatus KvStoreNbDelegateImpl::GetEntries(const Query &query, KvStoreResultSet *&resultSet) const
152 {
153     if (conn_ == nullptr) {
154         LOGE("%s", INVALID_CONNECTION);
155         return DB_ERROR;
156     }
157 
158     IOption option;
159     option.dataType = IOption::SYNC_DATA;
160     IKvDBResultSet *kvDbResultSet = nullptr;
161     int errCode = conn_->GetResultSet(option, query, kvDbResultSet);
162     if (errCode == E_OK) {
163         resultSet = new (std::nothrow) KvStoreResultSetImpl(kvDbResultSet);
164         if (resultSet != nullptr) {
165             return OK;
166         }
167 
168         LOGE("[KvStoreNbDelegate] Alloc result set failed.");
169         conn_->ReleaseResultSet(kvDbResultSet);
170         kvDbResultSet = nullptr;
171         return DB_ERROR;
172     }
173 
174     LOGE("[KvStoreNbDelegate] Get result set for query failed: %d", errCode);
175     return TransferDBErrno(errCode);
176 }
177 
GetCount(const Query & query,int & count) const178 DBStatus KvStoreNbDelegateImpl::GetCount(const Query &query, int &count) const
179 {
180     if (conn_ == nullptr) {
181         LOGE("%s", INVALID_CONNECTION);
182         return DB_ERROR;
183     }
184 
185     IOption option;
186     option.dataType = IOption::SYNC_DATA;
187     int errCode = conn_->GetCount(option, query, count);
188     if (errCode == E_OK) {
189         if (count == 0) {
190             return NOT_FOUND;
191         }
192         return OK;
193     }
194 
195     LOGE("[KvStoreNbDelegate] Get count for query failed: %d", errCode);
196     return TransferDBErrno(errCode);
197 }
198 
CloseResultSet(KvStoreResultSet * & resultSet)199 DBStatus KvStoreNbDelegateImpl::CloseResultSet(KvStoreResultSet *&resultSet)
200 {
201     if (resultSet == nullptr) {
202         return INVALID_ARGS;
203     }
204 
205     if (conn_ == nullptr) {
206         LOGE("%s", INVALID_CONNECTION);
207         return DB_ERROR;
208     }
209 
210     // release inner result set
211     IKvDBResultSet *kvDbResultSet = nullptr;
212     (static_cast<KvStoreResultSetImpl *>(resultSet))->GetResultSet(kvDbResultSet);
213     conn_->ReleaseResultSet(kvDbResultSet);
214     // release external result set
215     delete resultSet;
216     resultSet = nullptr;
217     return OK;
218 }
219 
Put(const Key & key,const Value & value)220 DBStatus KvStoreNbDelegateImpl::Put(const Key &key, const Value &value)
221 {
222     IOption option;
223     option.dataType = IOption::SYNC_DATA;
224     return PutInner(option, key, value);
225 }
226 
PutBatch(const std::vector<Entry> & entries)227 DBStatus KvStoreNbDelegateImpl::PutBatch(const std::vector<Entry> &entries)
228 {
229     if (conn_ != nullptr) {
230         IOption option;
231         option.dataType = IOption::SYNC_DATA;
232         int errCode = conn_->PutBatch(option, entries);
233         if (errCode == E_OK) {
234             return OK;
235         }
236 
237         LOGE("[KvStoreNbDelegate] Put batch data failed:%d", errCode);
238         return TransferDBErrno(errCode);
239     }
240 
241     LOGE("%s", INVALID_CONNECTION);
242     return DB_ERROR;
243 }
244 
DeleteBatch(const std::vector<Key> & keys)245 DBStatus KvStoreNbDelegateImpl::DeleteBatch(const std::vector<Key> &keys)
246 {
247     if (conn_ == nullptr) {
248         LOGE("%s", INVALID_CONNECTION);
249         return DB_ERROR;
250     }
251 
252     IOption option;
253     option.dataType = IOption::SYNC_DATA;
254     int errCode = conn_->DeleteBatch(option, keys);
255     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
256         return OK;
257     }
258 
259     LOGE("[KvStoreNbDelegate] Delete batch data failed:%d", errCode);
260     return TransferDBErrno(errCode);
261 }
262 
Delete(const Key & key)263 DBStatus KvStoreNbDelegateImpl::Delete(const Key &key)
264 {
265     IOption option;
266     option.dataType = IOption::SYNC_DATA;
267     return DeleteInner(option, key);
268 }
269 
GetLocal(const Key & key,Value & value) const270 DBStatus KvStoreNbDelegateImpl::GetLocal(const Key &key, Value &value) const
271 {
272     IOption option;
273     option.dataType = IOption::LOCAL_DATA;
274     return GetInner(option, key, value);
275 }
276 
GetLocalEntries(const Key & keyPrefix,std::vector<Entry> & entries) const277 DBStatus KvStoreNbDelegateImpl::GetLocalEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
278 {
279     IOption option;
280     option.dataType = IOption::LOCAL_DATA;
281     return GetEntriesInner(option, keyPrefix, entries);
282 }
283 
PutLocal(const Key & key,const Value & value)284 DBStatus KvStoreNbDelegateImpl::PutLocal(const Key &key, const Value &value)
285 {
286     IOption option;
287     option.dataType = IOption::LOCAL_DATA;
288     return PutInner(option, key, value);
289 }
290 
DeleteLocal(const Key & key)291 DBStatus KvStoreNbDelegateImpl::DeleteLocal(const Key &key)
292 {
293     IOption option;
294     option.dataType = IOption::LOCAL_DATA;
295     return DeleteInner(option, key);
296 }
297 
PublishLocal(const Key & key,bool deleteLocal,bool updateTimestamp,const KvStoreNbPublishOnConflict & onConflict)298 DBStatus KvStoreNbDelegateImpl::PublishLocal(const Key &key, bool deleteLocal, bool updateTimestamp,
299     const KvStoreNbPublishOnConflict &onConflict)
300 {
301     if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
302         LOGW("[KvStoreNbDelegate][Publish] Invalid para");
303         return INVALID_ARGS;
304     }
305 
306     if (conn_ != nullptr) {
307         PragmaPublishInfo publishInfo{ key, deleteLocal, updateTimestamp, onConflict };
308         int errCode = conn_->Pragma(PRAGMA_PUBLISH_LOCAL, static_cast<PragmaData>(&publishInfo));
309         if (errCode != E_OK) {
310             LOGE("[KvStoreNbDelegate] Publish local err:%d", errCode);
311         }
312         return TransferDBErrno(errCode);
313     }
314 
315     LOGE("%s", INVALID_CONNECTION);
316     return DB_ERROR;
317 }
318 
UnpublishToLocal(const Key & key,bool deletePublic,bool updateTimestamp)319 DBStatus KvStoreNbDelegateImpl::UnpublishToLocal(const Key &key, bool deletePublic, bool updateTimestamp)
320 {
321     if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
322         LOGW("[KvStoreNbDelegate][Unpublish] Invalid para");
323         return INVALID_ARGS;
324     }
325 
326     if (conn_ != nullptr) {
327         PragmaUnpublishInfo unpublishInfo{ key, deletePublic, updateTimestamp };
328         int errCode = conn_->Pragma(PRAGMA_UNPUBLISH_SYNC, static_cast<PragmaData>(&unpublishInfo));
329         if (errCode != E_OK) {
330             LOGE("[KvStoreNbDelegate] Unpublish result:%d", errCode);
331         }
332         return TransferDBErrno(errCode);
333     }
334 
335     LOGE("%s", INVALID_CONNECTION);
336     return DB_ERROR;
337 }
338 
PutLocalBatch(const std::vector<Entry> & entries)339 DBStatus KvStoreNbDelegateImpl::PutLocalBatch(const std::vector<Entry> &entries)
340 {
341     if (conn_ == nullptr) {
342         LOGE("%s", INVALID_CONNECTION);
343         return DB_ERROR;
344     }
345 
346     IOption option;
347     option.dataType = IOption::LOCAL_DATA;
348     int errCode = conn_->PutBatch(option, entries);
349     if (errCode != E_OK) {
350         LOGE("[KvStoreNbDelegate] Put local batch data failed:%d", errCode);
351     }
352     return TransferDBErrno(errCode);
353 }
354 
DeleteLocalBatch(const std::vector<Key> & keys)355 DBStatus KvStoreNbDelegateImpl::DeleteLocalBatch(const std::vector<Key> &keys)
356 {
357     if (conn_ == nullptr) {
358         LOGE("%s", INVALID_CONNECTION);
359         return DB_ERROR;
360     }
361 
362     IOption option;
363     option.dataType = IOption::LOCAL_DATA;
364     int errCode = conn_->DeleteBatch(option, keys);
365     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
366         return OK;
367     }
368 
369     LOGE("[KvStoreNbDelegate] Delete local batch data failed:%d", errCode);
370     return TransferDBErrno(errCode);
371 }
372 
RegisterObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)373 DBStatus KvStoreNbDelegateImpl::RegisterObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
374 {
375     if (key.size() > DBConstant::MAX_KEY_SIZE) {
376         return INVALID_ARGS;
377     }
378     if (observer == nullptr) {
379         LOGE("[KvStoreNbDelegate][RegisterObserver] Observer is null");
380         return INVALID_ARGS;
381     }
382     if (conn_ == nullptr) {
383         LOGE("[RegisterObserver]%s", INVALID_CONNECTION);
384         return DB_ERROR;
385     }
386 
387     uint64_t rawMode = DBCommon::EraseBit(mode, DBConstant::OBSERVER_CHANGES_MASK);
388     if (rawMode == static_cast<uint64_t>(ObserverMode::OBSERVER_CHANGES_CLOUD)) {
389         return RegisterCloudObserver(key, mode, observer);
390     }
391     return RegisterDeviceObserver(key, static_cast<unsigned int>(rawMode), observer);
392 }
393 
CheckDeviceObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)394 DBStatus KvStoreNbDelegateImpl::CheckDeviceObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
395 {
396     if (!ParamCheckUtils::CheckObserver(key, mode)) {
397         LOGE("[KvStoreNbDelegate][CheckDeviceObserver] Register nb observer by illegal mode or key size!");
398         return INVALID_ARGS;
399     }
400 
401     if (observerMap_.size() >= DBConstant::MAX_OBSERVER_COUNT) {
402         LOGE("[KvStoreNbDelegate][CheckDeviceObserver] The number of kv observers has been over limit, storeId[%.3s]",
403             storeId_.c_str());
404         return OVER_MAX_LIMITS;
405     }
406     if (observerMap_.find(observer) != observerMap_.end()) {
407         LOGE("[KvStoreNbDelegate][CheckDeviceObserver] Observer has been already registered!");
408         return ALREADY_SET;
409     }
410     return OK;
411 }
412 
RegisterDeviceObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)413 DBStatus KvStoreNbDelegateImpl::RegisterDeviceObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
414 {
415     if (conn_->IsTransactionStarted()) {
416         LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Transaction unfinished");
417         return BUSY;
418     }
419     std::lock_guard<std::mutex> lockGuard(observerMapLock_);
420     DBStatus status = CheckDeviceObserver(key, mode, observer);
421     if (status != OK) {
422         LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Observer map cannot be registered, status:%d", status);
423         return status;
424     }
425 
426     int errCode = E_OK;
427     auto storeId = storeId_;
428     KvDBObserverHandle *observerHandle = conn_->RegisterObserver(
429         mode, key,
430         [observer, storeId](const KvDBCommitNotifyData &notifyData) {
431             KvStoreChangedDataImpl data(&notifyData);
432             LOGD("[KvStoreNbDelegate][RegisterDeviceObserver] Trigger on change");
433             observer->OnChange(data);
434         },
435         errCode);
436 
437     if (errCode != E_OK || observerHandle == nullptr) {
438         LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Register device observer failed:%d!", errCode);
439         return DB_ERROR;
440     }
441 
442     observerMap_.insert(std::pair<const KvStoreObserver *, const KvDBObserverHandle *>(observer, observerHandle));
443     LOGI("[KvStoreNbDelegate][RegisterDeviceObserver] Register device observer ok mode:%u", mode);
444     return OK;
445 }
446 
CheckCloudObserver(KvStoreObserver * observer)447 DBStatus KvStoreNbDelegateImpl::CheckCloudObserver(KvStoreObserver *observer)
448 {
449     if (cloudObserverMap_.size() >= DBConstant::MAX_OBSERVER_COUNT) {
450         LOGE("[KvStoreNbDelegate][CheckCloudObserver] The number of kv cloud observers over limit, storeId[%.3s]",
451             storeId_.c_str());
452         return OVER_MAX_LIMITS;
453     }
454     if (cloudObserverMap_.find(observer) != cloudObserverMap_.end()) {
455         LOGE("[KvStoreNbDelegate][CheckCloudObserver] Cloud observer has been already registered!");
456         return ALREADY_SET;
457     }
458     return OK;
459 }
460 
RegisterCloudObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)461 DBStatus KvStoreNbDelegateImpl::RegisterCloudObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
462 {
463     std::lock_guard<std::mutex> lockGuard(observerMapLock_);
464     DBStatus status = CheckCloudObserver(observer);
465     if (status != OK) {
466         LOGE("[KvStoreNbDelegate][RegisterCloudObserver] Cloud observer map cannot be registered, status:%d", status);
467         return status;
468     }
469 
470     auto storeId = storeId_;
471     ObserverAction action = [observer, storeId](
472                                 const std::string &device, ChangedData &&changedData, bool isChangedData) {
473         if (isChangedData) {
474             LOGD("[KvStoreNbDelegate][RegisterCloudObserver] Trigger on change");
475             observer->OnChange(Origin::ORIGIN_CLOUD, device, std::move(changedData));
476         }
477     };
478     int errCode = conn_->RegisterObserverAction(observer, action);
479     if (errCode != E_OK) {
480         LOGE("[KvStoreNbDelegate][RegisterCloudObserver] Register cloud observer failed:%d!", errCode);
481         return DB_ERROR;
482     }
483     cloudObserverMap_[observer] = mode;
484     LOGI("[KvStoreNbDelegate][RegisterCloudObserver] Register cloud observer ok mode:%u", mode);
485     return OK;
486 }
487 
UnRegisterObserver(const KvStoreObserver * observer)488 DBStatus KvStoreNbDelegateImpl::UnRegisterObserver(const KvStoreObserver *observer)
489 {
490     if (observer == nullptr) {
491         return INVALID_ARGS;
492     }
493 
494     if (conn_ == nullptr) {
495         LOGE("%s", INVALID_CONNECTION);
496         return DB_ERROR;
497     }
498 
499     DBStatus cloudRet = UnRegisterCloudObserver(observer);
500     DBStatus devRet = UnRegisterDeviceObserver(observer);
501     if (cloudRet == OK || devRet == OK) {
502         return OK;
503     }
504     return devRet;
505 }
506 
UnRegisterDeviceObserver(const KvStoreObserver * observer)507 DBStatus KvStoreNbDelegateImpl::UnRegisterDeviceObserver(const KvStoreObserver *observer)
508 {
509     std::lock_guard<std::mutex> lockGuard(observerMapLock_);
510     auto iter = observerMap_.find(observer);
511     if (iter == observerMap_.end()) {
512         LOGE("[KvStoreNbDelegate] Observer has not been registered!");
513         return NOT_FOUND;
514     }
515 
516     const KvDBObserverHandle *observerHandle = iter->second;
517     int errCode = conn_->UnRegisterObserver(observerHandle);
518     if (errCode != E_OK) {
519         LOGE("[KvStoreNbDelegate] UnRegistObserver failed:%d!", errCode);
520         return DB_ERROR;
521     }
522     observerMap_.erase(iter);
523     return OK;
524 }
525 
UnRegisterCloudObserver(const KvStoreObserver * observer)526 DBStatus KvStoreNbDelegateImpl::UnRegisterCloudObserver(const KvStoreObserver *observer)
527 {
528     std::lock_guard<std::mutex> lockGuard(observerMapLock_);
529     auto iter = cloudObserverMap_.find(observer);
530     if (iter == cloudObserverMap_.end()) {
531         LOGW("[KvStoreNbDelegate] CloudObserver has not been registered!");
532         return NOT_FOUND;
533     }
534     int errCode = conn_->UnRegisterObserverAction(observer);
535     if (errCode != E_OK) {
536         LOGE("[KvStoreNbDelegate] UnRegisterCloudObserver failed:%d!", errCode);
537         return DB_ERROR;
538     }
539     cloudObserverMap_.erase(iter);
540     return OK;
541 }
542 
RemoveDeviceData(const std::string & device)543 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device)
544 {
545     if (conn_ == nullptr) {
546         LOGE("%s", INVALID_CONNECTION);
547         return DB_ERROR;
548     }
549     if (device.empty() || device.length() > DBConstant::MAX_DEV_LENGTH) {
550         return INVALID_ARGS;
551     }
552     int errCode = conn_->Pragma(PRAGMA_RM_DEVICE_DATA,
553         const_cast<void *>(static_cast<const void *>(&device)));
554     if (errCode != E_OK) {
555         LOGE("[KvStoreNbDelegate][%.3s] Remove specific device data failed:%d", storeId_.c_str(), errCode);
556     } else {
557         LOGI("[KvStoreNbDelegate][%.3s] Remove specific device data OK", storeId_.c_str());
558     }
559     return TransferDBErrno(errCode);
560 }
561 
GetStoreId() const562 std::string KvStoreNbDelegateImpl::GetStoreId() const
563 {
564     return storeId_;
565 }
566 
Sync(const std::vector<std::string> & devices,SyncMode mode,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,bool wait=false)567 DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
568     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
569     bool wait = false)
570 {
571     if (conn_ == nullptr) {
572         LOGE("%s", INVALID_CONNECTION);
573         return DB_ERROR;
574     }
575     if (mode > SYNC_MODE_PUSH_PULL) {
576         LOGE("not support other mode");
577         return NOT_SUPPORT;
578     }
579 
580     PragmaSync pragmaData(
581         devices, mode, [this, onComplete](const std::map<std::string, int> &statuses) {
582             OnSyncComplete(statuses, onComplete);
583         }, wait);
584     int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
585     if (errCode < E_OK) {
586         LOGE("[KvStoreNbDelegate] Sync data failed:%d", errCode);
587         return TransferDBErrno(errCode);
588     }
589     return OK;
590 }
591 
Sync(const std::vector<std::string> & devices,SyncMode mode,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)592 DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
593     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
594     const Query &query, bool wait)
595 {
596     DeviceSyncOption option;
597     option.devices = devices;
598     option.mode = mode;
599     option.isQuery = true;
600     option.query = query;
601     option.isWait = wait;
602     return Sync(option, onComplete);
603 }
604 
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & processMap,const DeviceSyncProcessCallback & onProcess) const605 void KvStoreNbDelegateImpl::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
606     const DeviceSyncProcessCallback &onProcess) const
607 {
608     std::map<std::string, DeviceSyncProcess> result;
609     for (const auto &pair : processMap) {
610         DeviceSyncProcess info = pair.second;
611         int status = info.errCode;
612         info.errCode = SyncOperation::DBStatusTrans(status);
613         info.process = SyncOperation::DBStatusTransProcess(status);
614         result.insert(std::pair<std::string, DeviceSyncProcess>(pair.first, info));
615     }
616     if (onProcess) {
617         onProcess(result);
618     }
619 }
620 
Sync(const DeviceSyncOption & option,const DeviceSyncProcessCallback & onProcess)621 DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
622 {
623     if (conn_ == nullptr) {
624         LOGE("%s", INVALID_CONNECTION);
625         return DB_ERROR;
626     }
627     if (option.mode != SYNC_MODE_PULL_ONLY) {
628         LOGE("Not support other mode");
629         return NOT_SUPPORT;
630     }
631     DeviceSyncProcessCallback onSyncProcess = [this, onProcess](const std::map<std::string, DeviceSyncProcess> &map) {
632         OnDeviceSyncProcess(map, onProcess);
633     };
634     int errCode = E_OK;
635     if (option.isQuery) {
636         QuerySyncObject querySyncObj(option.query);
637         if (!querySyncObj.GetRelationTableNames().empty()) {
638             LOGE("Sync with option and check query table names from tables failed!");
639             return NOT_SUPPORT;
640         }
641         if (!DBCommon::CheckQueryWithoutMultiTable(option.query)) {
642             LOGE("Not support for invalid query");
643             return NOT_SUPPORT;
644         }
645 
646         if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
647             LOGE("Not support order by timestamp and query by range");
648             return NOT_SUPPORT;
649         }
650         PragmaSync pragmaData(option, querySyncObj, onSyncProcess);
651         errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
652     } else {
653         PragmaSync pragmaData(option, onSyncProcess);
654         errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
655     }
656 
657     if (errCode != E_OK) {
658         LOGE("[KvStoreNbDelegate] DeviceSync data failed:%d", errCode);
659         return TransferDBErrno(errCode);
660     }
661     return OK;
662 }
663 
Sync(const DeviceSyncOption & option,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete)664 DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncOption &option,
665     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete)
666 {
667     if (conn_ == nullptr) {
668         LOGE("%s", INVALID_CONNECTION);
669         return DB_ERROR;
670     }
671     if (option.mode > SYNC_MODE_PUSH_PULL) {
672         LOGE("not support other mode");
673         return NOT_SUPPORT;
674     }
675 
676     int errCode = E_OK;
677     if (option.isQuery) {
678         QuerySyncObject querySyncObj(option.query);
679         if (!querySyncObj.GetRelationTableNames().empty()) {
680             LOGE("check query table names from tables failed!");
681             return NOT_SUPPORT;
682         }
683 
684         if (!DBCommon::CheckQueryWithoutMultiTable(option.query)) {
685             LOGE("not support for invalid query");
686             return NOT_SUPPORT;
687         }
688         if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
689             LOGE("not support order by timestamp and query by range");
690             return NOT_SUPPORT;
691         }
692         PragmaSync pragmaData(option, querySyncObj, [this, onComplete](const std::map<std::string, int> &statuses) {
693             OnSyncComplete(statuses, onComplete);
694         });
695         errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
696     } else {
697         PragmaSync pragmaData(option, [this, onComplete](const std::map<std::string, int> &statuses) {
698             OnSyncComplete(statuses, onComplete);
699         });
700         errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
701     }
702     if (errCode != E_OK) {
703         LOGE("[KvStoreNbDelegate] QuerySync data failed:%d", errCode);
704         return TransferDBErrno(errCode);
705     }
706     return OK;
707 }
708 
CancelSync(uint32_t syncId)709 DBStatus KvStoreNbDelegateImpl::CancelSync(uint32_t syncId)
710 {
711     if (conn_ == nullptr) {
712         LOGE("%s", INVALID_CONNECTION);
713         return DB_ERROR;
714     }
715     uint32_t tempSyncId = syncId;
716     int errCode = conn_->Pragma(PRAGMA_CANCEL_SYNC_DEVICES, &tempSyncId);
717     if (errCode != E_OK) {
718         LOGE("[KvStoreNbDelegate] CancelSync failed:%d", errCode);
719         return TransferDBErrno(errCode);
720     }
721     return OK;
722 }
723 
Pragma(PragmaCmd cmd,PragmaData & paramData)724 DBStatus KvStoreNbDelegateImpl::Pragma(PragmaCmd cmd, PragmaData &paramData)
725 {
726     if (conn_ == nullptr) {
727         LOGE("%s", INVALID_CONNECTION);
728         return DB_ERROR;
729     }
730 
731     int errCode = -E_NOT_SUPPORT;
732     for (const auto &item : g_pragmaMap) {
733         if (item.externCmd == cmd) {
734             errCode = conn_->Pragma(item.innerCmd, paramData);
735             break;
736         }
737     }
738 
739     if (errCode != E_OK) {
740         LOGE("[KvStoreNbDelegate] Pragma failed:%d", errCode);
741     }
742     return TransferDBErrno(errCode);
743 }
744 
SetConflictNotifier(int conflictType,const KvStoreNbConflictNotifier & notifier)745 DBStatus KvStoreNbDelegateImpl::SetConflictNotifier(int conflictType, const KvStoreNbConflictNotifier &notifier)
746 {
747     if (conn_ == nullptr) {
748         LOGE("%s", INVALID_CONNECTION);
749         return DB_ERROR;
750     }
751 
752     if (!ParamCheckUtils::CheckConflictNotifierType(conflictType)) {
753         LOGE("%s", INVALID_CONNECTION);
754         return INVALID_ARGS;
755     }
756 
757     int errCode;
758     if (!notifier) {
759         errCode = conn_->SetConflictNotifier(conflictType, nullptr);
760         goto END;
761     }
762 
763     errCode = conn_->SetConflictNotifier(conflictType,
764         [conflictType, notifier](const KvDBCommitNotifyData &data) {
765             int resultCode;
766             const std::list<KvDBConflictEntry> entries = data.GetCommitConflicts(resultCode);
767             if (resultCode != E_OK) {
768                 LOGE("Get commit conflicted entries failed:%d!", resultCode);
769                 return;
770             }
771 
772             for (const auto &entry : entries) {
773                 // Prohibit signed numbers to perform bit operations
774                 uint32_t entryType = static_cast<uint32_t>(entry.type);
775                 uint32_t type = static_cast<uint32_t>(conflictType);
776                 if (entryType & type) {
777                     KvStoreNbConflictDataImpl dataImpl;
778                     dataImpl.SetConflictData(entry);
779                     notifier(dataImpl);
780                 }
781             }
782         });
783 
784 END:
785     if (errCode != E_OK) {
786         LOGE("[KvStoreNbDelegate] Register conflict failed:%d!", errCode);
787     }
788     return TransferDBErrno(errCode);
789 }
790 
Rekey(const CipherPassword & password)791 DBStatus KvStoreNbDelegateImpl::Rekey(const CipherPassword &password)
792 {
793     if (conn_ == nullptr) {
794         LOGE("%s", INVALID_CONNECTION);
795         return DB_ERROR;
796     }
797 
798     int errCode = conn_->Rekey(password);
799     if (errCode == E_OK) {
800         return OK;
801     }
802 
803     LOGE("[KvStoreNbDelegate] Rekey failed:%d", errCode);
804     return TransferDBErrno(errCode);
805 }
806 
Export(const std::string & filePath,const CipherPassword & passwd,bool force)807 DBStatus KvStoreNbDelegateImpl::Export(const std::string &filePath, const CipherPassword &passwd, bool force)
808 {
809     if (conn_ == nullptr) {
810         LOGE("%s", INVALID_CONNECTION);
811         return DB_ERROR;
812     }
813 
814     std::string fileDir;
815     std::string fileName;
816     OS::SplitFilePath(filePath, fileDir, fileName);
817 
818     std::string canonicalUrl;
819     if (!ParamCheckUtils::CheckDataDir(fileDir, canonicalUrl)) {
820         return INVALID_ARGS;
821     }
822 
823     if (!OS::CheckPathExistence(canonicalUrl)) {
824         return NO_PERMISSION;
825     }
826 
827     canonicalUrl = canonicalUrl + "/" + fileName;
828     if (!force && OS::CheckPathExistence(canonicalUrl)) {
829         return FILE_ALREADY_EXISTED;
830     }
831 
832     int errCode = conn_->Export(canonicalUrl, passwd);
833     if (errCode == E_OK) {
834         return OK;
835     }
836     LOGE("[KvStoreNbDelegate] Export failed:%d", errCode);
837     return TransferDBErrno(errCode);
838 }
839 
Import(const std::string & filePath,const CipherPassword & passwd,bool isNeedIntegrityCheck)840 DBStatus KvStoreNbDelegateImpl::Import(const std::string &filePath, const CipherPassword &passwd,
841     bool isNeedIntegrityCheck)
842 {
843     if (conn_ == nullptr) {
844         LOGE("%s", INVALID_CONNECTION);
845         return DB_ERROR;
846     }
847 
848     std::string fileDir;
849     std::string fileName;
850     OS::SplitFilePath(filePath, fileDir, fileName);
851 
852     std::string canonicalUrl;
853     if (!ParamCheckUtils::CheckDataDir(fileDir, canonicalUrl)) {
854         return INVALID_ARGS;
855     }
856 
857     canonicalUrl = canonicalUrl + "/" + fileName;
858     if (!OS::CheckPathExistence(canonicalUrl)) {
859         LOGE("Import file path err, DBStatus = INVALID_FILE errno = [%d]", errno);
860         return INVALID_FILE;
861     }
862 
863     int errCode = conn_->Import(canonicalUrl, passwd, isNeedIntegrityCheck);
864     if (errCode == E_OK) {
865         LOGI("[KvStoreNbDelegate] Import ok");
866         return OK;
867     }
868 
869     LOGE("[KvStoreNbDelegate] Import failed:%d", errCode);
870     return TransferDBErrno(errCode);
871 }
872 
StartTransaction()873 DBStatus KvStoreNbDelegateImpl::StartTransaction()
874 {
875     if (conn_ == nullptr) {
876         LOGE("%s", INVALID_CONNECTION);
877         return DB_ERROR;
878     }
879 
880     int errCode = conn_->StartTransaction();
881     if (errCode != E_OK) {
882         LOGE("[KvStoreNbDelegate] StartTransaction failed:%d", errCode);
883     }
884     return TransferDBErrno(errCode);
885 }
886 
Commit()887 DBStatus KvStoreNbDelegateImpl::Commit()
888 {
889     if (conn_ == nullptr) {
890         LOGE("%s", INVALID_CONNECTION);
891         return DB_ERROR;
892     }
893 
894     int errCode = conn_->Commit();
895     if (errCode != E_OK) {
896         LOGE("[KvStoreNbDelegate] Commit failed:%d", errCode);
897     }
898     return TransferDBErrno(errCode);
899 }
900 
Rollback()901 DBStatus KvStoreNbDelegateImpl::Rollback()
902 {
903     if (conn_ == nullptr) {
904         LOGE("%s", INVALID_CONNECTION);
905         return DB_ERROR;
906     }
907 
908     int errCode = conn_->RollBack();
909     if (errCode != E_OK) {
910         LOGE("[KvStoreNbDelegate] Rollback failed:%d", errCode);
911     }
912     return TransferDBErrno(errCode);
913 }
914 
SetReleaseFlag(bool flag)915 void KvStoreNbDelegateImpl::SetReleaseFlag(bool flag)
916 {
917     releaseFlag_ = flag;
918 }
919 
Close(bool isCloseImmediately)920 DBStatus KvStoreNbDelegateImpl::Close(bool isCloseImmediately)
921 {
922     if (conn_ != nullptr) {
923         int errCode = KvDBManager::ReleaseDatabaseConnection(conn_, isCloseImmediately);
924         if (errCode == -E_BUSY) {
925             LOGI("[KvStoreNbDelegate] Busy for close");
926             return BUSY;
927         }
928         conn_ = nullptr;
929     }
930     return OK;
931 }
932 
CheckIntegrity() const933 DBStatus KvStoreNbDelegateImpl::CheckIntegrity() const
934 {
935     if (conn_ == nullptr) {
936         LOGE("%s", INVALID_CONNECTION);
937         return DB_ERROR;
938     }
939 
940     return TransferDBErrno(conn_->CheckIntegrity());
941 }
942 
GetSecurityOption(SecurityOption & option) const943 DBStatus KvStoreNbDelegateImpl::GetSecurityOption(SecurityOption &option) const
944 {
945     if (conn_ == nullptr) {
946         LOGE("%s", INVALID_CONNECTION);
947         return DB_ERROR;
948     }
949     return TransferDBErrno(conn_->GetSecurityOption(option.securityLabel, option.securityFlag));
950 }
951 
SetRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier)952 DBStatus KvStoreNbDelegateImpl::SetRemotePushFinishedNotify(const RemotePushFinishedNotifier &notifier)
953 {
954     if (conn_ == nullptr) {
955         LOGE("%s", INVALID_CONNECTION);
956         return DB_ERROR;
957     }
958 
959     PragmaRemotePushNotify notify(notifier);
960     int errCode = conn_->Pragma(PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, reinterpret_cast<void *>(&notify));
961     if (errCode != E_OK) {
962         LOGE("[KvStoreNbDelegate] Set remote push finished notify failed : %d", errCode);
963     }
964     return TransferDBErrno(errCode);
965 }
966 
GetInner(const IOption & option,const Key & key,Value & value) const967 DBStatus KvStoreNbDelegateImpl::GetInner(const IOption &option, const Key &key, Value &value) const
968 {
969     if (conn_ == nullptr) {
970         LOGE("%s", INVALID_CONNECTION);
971         return DB_ERROR;
972     }
973 
974     int errCode = conn_->Get(option, key, value);
975     if (errCode == E_OK) {
976         return OK;
977     }
978 
979     if (errCode != -E_NOT_FOUND) {
980         LOGE("[KvStoreNbDelegate] Get the data failed:%d", errCode);
981     }
982     return TransferDBErrno(errCode);
983 }
984 
GetEntriesInner(const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const985 DBStatus KvStoreNbDelegateImpl::GetEntriesInner(const IOption &option,
986     const Key &keyPrefix, std::vector<Entry> &entries) const
987 {
988     if (conn_ == nullptr) {
989         LOGE("%s", INVALID_CONNECTION);
990         return DB_ERROR;
991     }
992 
993     int errCode = conn_->GetEntries(option, keyPrefix, entries);
994     if (errCode == E_OK) {
995         return OK;
996     }
997     LOGE("[KvStoreNbDelegate] Get the batch data failed:%d", errCode);
998     return TransferDBErrno(errCode);
999 }
1000 
PutInner(const IOption & option,const Key & key,const Value & value)1001 DBStatus KvStoreNbDelegateImpl::PutInner(const IOption &option, const Key &key, const Value &value)
1002 {
1003     if (conn_ == nullptr) {
1004         LOGE("%s", INVALID_CONNECTION);
1005         return DB_ERROR;
1006     }
1007 
1008     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1009     if (performance != nullptr) {
1010         performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_PUT_DATA);
1011     }
1012 
1013     int errCode = conn_->Put(option, key, value);
1014     if (performance != nullptr) {
1015         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_PUT_DATA);
1016     }
1017 
1018     if (errCode == E_OK) {
1019         return OK;
1020     }
1021     LOGE("[KvStoreNbDelegate] Put the data failed:%d", errCode);
1022     return TransferDBErrno(errCode);
1023 }
1024 
DeleteInner(const IOption & option,const Key & key)1025 DBStatus KvStoreNbDelegateImpl::DeleteInner(const IOption &option, const Key &key)
1026 {
1027     if (conn_ == nullptr) {
1028         LOGE("%s", INVALID_CONNECTION);
1029         return DB_ERROR;
1030     }
1031 
1032     int errCode = conn_->Delete(option, key);
1033     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
1034         return OK;
1035     }
1036 
1037     LOGE("[KvStoreNbDelegate] Delete the data failed:%d", errCode);
1038     return TransferDBErrno(errCode);
1039 }
1040 
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete) const1041 void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<std::string, int> &statuses,
1042     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete) const
1043 {
1044     std::map<std::string, DBStatus> result;
1045     for (const auto &pair : statuses) {
1046         DBStatus status = SyncOperation::DBStatusTrans(pair.second);
1047         result.insert(std::pair<std::string, DBStatus>(pair.first, status));
1048     }
1049     if (onComplete) {
1050         onComplete(result);
1051     }
1052 }
1053 
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)1054 DBStatus KvStoreNbDelegateImpl::SetEqualIdentifier(const std::string &identifier,
1055     const std::vector<std::string> &targets)
1056 {
1057     if (conn_ == nullptr) {
1058         LOGE("%s", INVALID_CONNECTION);
1059         return DB_ERROR;
1060     }
1061 
1062     PragmaSetEqualIdentifier pragma(identifier, targets);
1063     int errCode = conn_->Pragma(PRAGMA_ADD_EQUAL_IDENTIFIER, reinterpret_cast<void *>(&pragma));
1064     if (errCode != E_OK) {
1065         LOGE("[KvStoreNbDelegate] Set store equal identifier failed : %d", errCode);
1066     }
1067 
1068     return TransferDBErrno(errCode);
1069 }
1070 
SetPushDataInterceptor(const PushDataInterceptor & interceptor)1071 DBStatus KvStoreNbDelegateImpl::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
1072 {
1073     if (conn_ == nullptr) {
1074         LOGE("%s", INVALID_CONNECTION);
1075         return DB_ERROR;
1076     }
1077 
1078     PushDataInterceptor notify = interceptor;
1079     int errCode = conn_->Pragma(PRAGMA_INTERCEPT_SYNC_DATA, static_cast<void *>(&notify));
1080     if (errCode != E_OK) {
1081         LOGE("[KvStoreNbDelegate] Set data interceptor notify failed : %d", errCode);
1082     }
1083     return TransferDBErrno(errCode);
1084 }
1085 
SubscribeRemoteQuery(const std::vector<std::string> & devices,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)1086 DBStatus KvStoreNbDelegateImpl::SubscribeRemoteQuery(const std::vector<std::string> &devices,
1087     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
1088     const Query &query, bool wait)
1089 {
1090     if (conn_ == nullptr) {
1091         LOGE("%s", INVALID_CONNECTION);
1092         return DB_ERROR;
1093     }
1094 
1095     QuerySyncObject querySyncObj(query);
1096     if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
1097         LOGE("not support order by timestamp and query by range");
1098         return NOT_SUPPORT;
1099     }
1100     PragmaSync pragmaData(devices, SyncModeType::SUBSCRIBE_QUERY, querySyncObj,
1101         [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
1102     int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
1103     if (errCode < E_OK) {
1104         LOGE("[KvStoreNbDelegate] Subscribe remote data with query failed:%d", errCode);
1105         return TransferDBErrno(errCode);
1106     }
1107     return OK;
1108 }
1109 
UnSubscribeRemoteQuery(const std::vector<std::string> & devices,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)1110 DBStatus KvStoreNbDelegateImpl::UnSubscribeRemoteQuery(const std::vector<std::string> &devices,
1111     const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
1112     const Query &query, bool wait)
1113 {
1114     if (conn_ == nullptr) {
1115         LOGE("%s", INVALID_CONNECTION);
1116         return DB_ERROR;
1117     }
1118 
1119     QuerySyncObject querySyncObj(query);
1120     if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
1121         LOGE("not support order by timestamp and query by range");
1122         return NOT_SUPPORT;
1123     }
1124     PragmaSync pragmaData(devices, SyncModeType::UNSUBSCRIBE_QUERY, querySyncObj,
1125         [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
1126     int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
1127     if (errCode < E_OK) {
1128         LOGE("[KvStoreNbDelegate] Unsubscribe remote data with query failed:%d", errCode);
1129         return TransferDBErrno(errCode);
1130     }
1131     return OK;
1132 }
1133 
RemoveDeviceData()1134 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData()
1135 {
1136     if (conn_ == nullptr) {
1137         LOGE("%s", INVALID_CONNECTION);
1138         return DB_ERROR;
1139     }
1140 
1141     std::string device; // Empty device for remove all device data
1142     int errCode = conn_->Pragma(PRAGMA_RM_DEVICE_DATA,
1143         const_cast<void *>(static_cast<const void *>(&device)));
1144     if (errCode != E_OK) {
1145         LOGE("[KvStoreNbDelegate][%.3s] Remove device data failed:%d", storeId_.c_str(), errCode);
1146     } else {
1147         LOGI("[KvStoreNbDelegate][%.3s] Remove device data OK", storeId_.c_str());
1148     }
1149     return TransferDBErrno(errCode);
1150 }
1151 
GetKeys(const Key & keyPrefix,std::vector<Key> & keys) const1152 DBStatus KvStoreNbDelegateImpl::GetKeys(const Key &keyPrefix, std::vector<Key> &keys) const
1153 {
1154     if (conn_ == nullptr) {
1155         LOGE("%s", INVALID_CONNECTION);
1156         return DB_ERROR;
1157     }
1158     IOption option;
1159     option.dataType = IOption::SYNC_DATA;
1160     int errCode = conn_->GetKeys(option, keyPrefix, keys);
1161     if (errCode == E_OK) {
1162         return OK;
1163     }
1164     LOGE("[KvStoreNbDelegate] Get the keys failed:%d", errCode);
1165     return TransferDBErrno(errCode);
1166 }
1167 
GetSyncDataSize(const std::string & device) const1168 size_t KvStoreNbDelegateImpl::GetSyncDataSize(const std::string &device) const
1169 {
1170     if (conn_ == nullptr) {
1171         LOGE("%s", INVALID_CONNECTION);
1172         return 0;
1173     }
1174     if (device.empty()) {
1175         LOGE("device len is invalid");
1176         return 0;
1177     }
1178     size_t size = 0;
1179     int errCode = conn_->GetSyncDataSize(device, size);
1180     if (errCode != E_OK) {
1181         LOGE("[KvStoreNbDelegate] calculate sync data size failed : %d", errCode);
1182         return 0;
1183     }
1184     return size;
1185 }
1186 
UpdateKey(const UpdateKeyCallback & callback)1187 DBStatus KvStoreNbDelegateImpl::UpdateKey(const UpdateKeyCallback &callback)
1188 {
1189     if (conn_ == nullptr) {
1190         LOGE("%s", INVALID_CONNECTION);
1191         return DB_ERROR;
1192     }
1193     if (callback == nullptr) {
1194         LOGE("[KvStoreNbDelegate] Invalid callback for operation");
1195         return INVALID_ARGS;
1196     }
1197     int errCode = conn_->UpdateKey(callback);
1198     if (errCode == E_OK) {
1199         LOGI("[KvStoreNbDelegate] update keys success");
1200         return OK;
1201     }
1202     LOGE("[KvStoreNbDelegate] update keys failed:%d", errCode);
1203     return TransferDBErrno(errCode);
1204 }
1205 
GetWatermarkInfo(const std::string & device)1206 std::pair<DBStatus, WatermarkInfo> KvStoreNbDelegateImpl::GetWatermarkInfo(const std::string &device)
1207 {
1208     std::pair<DBStatus, WatermarkInfo> res;
1209     if (device.empty() || device.size() > DBConstant::MAX_DEV_LENGTH) {
1210         LOGE("[KvStoreNbDelegate] device invalid length %zu", device.size());
1211         res.first = INVALID_ARGS;
1212         return res;
1213     }
1214     if (conn_ == nullptr) {
1215         LOGE("%s", INVALID_CONNECTION);
1216         res.first = DB_ERROR;
1217         return res;
1218     }
1219     int errCode = conn_->GetWatermarkInfo(device, res.second);
1220     if (errCode == E_OK) {
1221         LOGI("[KvStoreNbDelegate] get watermark info success");
1222     } else {
1223         LOGE("[KvStoreNbDelegate] get watermark info failed:%d", errCode);
1224     }
1225     res.first = TransferDBErrno(errCode);
1226     return res;
1227 }
1228 
RemoveDeviceData(const std::string & device,ClearMode mode)1229 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device, ClearMode mode)
1230 {
1231     if (conn_ == nullptr) {
1232         LOGE("%s", INVALID_CONNECTION);
1233         return DB_ERROR;
1234     }
1235     int errCode = conn_->RemoveDeviceData(device, mode);
1236     if (errCode != E_OK) {
1237         LOGE("[KvStoreNbDelegate][%.3s] Remove device data res %d", storeId_.c_str(), errCode);
1238     } else {
1239         LOGI("[KvStoreNbDelegate][%.3s] Remove device data OK, mode[%d]", storeId_.c_str(), static_cast<int>(mode));
1240     }
1241     return TransferDBErrno(errCode);
1242 }
1243 
RemoveDeviceData(const std::string & device,const std::string & user,ClearMode mode)1244 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &user,
1245     ClearMode mode)
1246 {
1247     if (conn_ == nullptr) {
1248         LOGE("%s", INVALID_CONNECTION);
1249         return DB_ERROR;
1250     }
1251     if (user.empty() && mode != ClearMode::DEFAULT) {
1252         LOGE("[KvStoreNbDelegate] Remove device data with empty user!");
1253         return INVALID_ARGS;
1254     }
1255     int errCode = conn_->RemoveDeviceData(device, user, mode);
1256     if (errCode != E_OK) {
1257         LOGE("[KvStoreNbDelegate][%.3s] Remove device data with user res %d", storeId_.c_str(), errCode);
1258     } else {
1259         LOGI("[KvStoreNbDelegate][%.3s] Remove device data with user OK, mode[%d]",
1260             storeId_.c_str(), static_cast<int>(mode));
1261     }
1262     return TransferDBErrno(errCode);
1263 }
1264 
GetTaskCount()1265 int32_t KvStoreNbDelegateImpl::GetTaskCount()
1266 {
1267     if (conn_ == nullptr) {
1268         LOGE("%s", INVALID_CONNECTION);
1269         return DB_ERROR;
1270     }
1271     return conn_->GetTaskCount();
1272 }
1273 
SetReceiveDataInterceptor(const DataInterceptor & interceptor)1274 DBStatus KvStoreNbDelegateImpl::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
1275 {
1276     if (conn_ == nullptr) {
1277         LOGE("%s", INVALID_CONNECTION);
1278         return DB_ERROR;
1279     }
1280     int errCode = conn_->SetReceiveDataInterceptor(interceptor);
1281     if (errCode != E_OK) {
1282         LOGE("[KvStoreNbDelegate] Set receive data interceptor errCode:%d", errCode);
1283     }
1284     LOGI("[KvStoreNbDelegate] Set receive data interceptor");
1285     return TransferDBErrno(errCode);
1286 }
1287 
GetDeviceEntries(const std::string & device,std::vector<Entry> & entries) const1288 DBStatus KvStoreNbDelegateImpl::GetDeviceEntries(const std::string &device, std::vector<Entry> &entries) const
1289 {
1290     if (conn_ == nullptr) {
1291         LOGE("%s", INVALID_CONNECTION);
1292         return DB_ERROR;
1293     }
1294     int errCode = conn_->GetEntries(device, entries);
1295     if (errCode == E_OK) {
1296         return OK;
1297     }
1298     LOGE("[KvStoreNbDelegate] Get the entries failed:%d", errCode);
1299     return TransferDBErrno(errCode);
1300 }
1301 
GetDatabaseStatus() const1302 KvStoreNbDelegate::DatabaseStatus KvStoreNbDelegateImpl::GetDatabaseStatus() const
1303 {
1304     KvStoreNbDelegate::DatabaseStatus status;
1305     if (conn_ == nullptr) {
1306         LOGE("%s", INVALID_CONNECTION);
1307         return status;
1308     }
1309     status.isRebuild = conn_->IsRebuild();
1310     LOGI("[KvStoreNbDelegate] rebuild %d", static_cast<int>(status.isRebuild));
1311     return status;
1312 }
1313 
1314 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)1315 DBStatus KvStoreNbDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
1316 {
1317     if (conn_ == nullptr) {
1318         LOGE("%s", INVALID_CONNECTION);
1319         return DB_ERROR;
1320     }
1321     return TransferDBErrno(conn_->Sync(option, onProcess));
1322 }
1323 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)1324 DBStatus KvStoreNbDelegateImpl::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
1325 {
1326     if (conn_ == nullptr) {
1327         LOGE("%s", INVALID_CONNECTION);
1328         return DB_ERROR;
1329     }
1330     if (cloudDBs.empty()) {
1331         LOGE("[KvStoreNbDelegate] no cloud db");
1332         return INVALID_ARGS;
1333     }
1334     return TransferDBErrno(conn_->SetCloudDB(cloudDBs));
1335 }
1336 
SetCloudDbSchema(const std::map<std::string,DataBaseSchema> & schema)1337 DBStatus KvStoreNbDelegateImpl::SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema)
1338 {
1339     if (conn_ == nullptr) {
1340         LOGE("%s", INVALID_CONNECTION);
1341         return DB_ERROR;
1342     }
1343     return TransferDBErrno(conn_->SetCloudDbSchema(schema));
1344 }
1345 
SetCloudSyncConfig(const CloudSyncConfig & config)1346 DBStatus KvStoreNbDelegateImpl::SetCloudSyncConfig(const CloudSyncConfig &config)
1347 {
1348     if (conn_ == nullptr) {
1349         LOGE("%s", INVALID_CONNECTION);
1350         return DB_ERROR;
1351     }
1352     if (!DBCommon::CheckCloudSyncConfigValid(config)) {
1353         return INVALID_ARGS;
1354     }
1355     int errCode = conn_->SetCloudSyncConfig(config);
1356     if (errCode != E_OK) {
1357         LOGE("[KvStoreNbDelegate] Set cloud sync config errCode:%d", errCode);
1358     }
1359     LOGI("[KvStoreNbDelegate] Set cloud sync config");
1360     return TransferDBErrno(errCode);
1361 }
1362 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)1363 void KvStoreNbDelegateImpl::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
1364 {
1365     if (conn_ == nullptr || callback == nullptr) {
1366         LOGD("[KvStoreNbDelegate] Invalid connection or callback for operation");
1367         return;
1368     }
1369     conn_->SetGenCloudVersionCallback(callback);
1370 }
1371 
GetCloudVersion(const std::string & device)1372 std::pair<DBStatus, std::map<std::string, std::string>> KvStoreNbDelegateImpl::GetCloudVersion(
1373     const std::string &device)
1374 {
1375     std::pair<DBStatus, std::map<std::string, std::string>> res;
1376     if (device.size() > DBConstant::MAX_DEV_LENGTH) {
1377         LOGE("[KvStoreNbDelegate] device invalid length %zu", device.size());
1378         res.first = INVALID_ARGS;
1379         return res;
1380     }
1381     if (conn_ == nullptr) {
1382         LOGE("%s", INVALID_CONNECTION);
1383         res.first = DB_ERROR;
1384         return res;
1385     }
1386     int errCode = conn_->GetCloudVersion(device, res.second);
1387     if (errCode == E_OK) {
1388         LOGI("[KvStoreNbDelegate] get cloudVersion success");
1389     } else {
1390         LOGE("[KvStoreNbDelegate] get cloudVersion failed:%d", errCode);
1391     }
1392     if (errCode == E_OK && res.second.empty()) {
1393         errCode = -E_NOT_FOUND;
1394     }
1395     res.first = TransferDBErrno(errCode);
1396     return res;
1397 }
1398 
ClearMetaData(ClearKvMetaDataOption option)1399 DBStatus KvStoreNbDelegateImpl::ClearMetaData(ClearKvMetaDataOption option)
1400 {
1401     if (option.type != ClearKvMetaOpType::CLEAN_CLOUD_WATERMARK) {
1402         return NOT_SUPPORT;
1403     }
1404     if (conn_ == nullptr) {
1405         LOGE("%s", INVALID_CONNECTION);
1406         return DB_ERROR;
1407     }
1408     int errCode = conn_->ClearCloudWatermark();
1409     if (errCode == E_OK) {
1410         LOGI("[KvStoreNbDelegate][%.3s] clear kv cloud watermark success", storeId_.c_str());
1411     } else {
1412         LOGE("[KvStoreNbDelegate][%.3s] clear kv cloud watermark failed:%d", storeId_.c_str(), errCode);
1413     }
1414     return TransferDBErrno(errCode);
1415 }
1416 #endif
1417 
OperateDataStatus(uint32_t dataOperator)1418 DBStatus KvStoreNbDelegateImpl::OperateDataStatus(uint32_t dataOperator)
1419 {
1420     if (conn_ == nullptr) {
1421         LOGE("%s", INVALID_CONNECTION);
1422         return DB_ERROR;
1423     }
1424     int errCode = conn_->OperateDataStatus(dataOperator);
1425     if (errCode != E_OK) {
1426         LOGE("[KvStoreNbDelegate] Operate data status errCode:%d", errCode);
1427     } else {
1428         LOGI("[KvStoreNbDelegate] Operate data status success");
1429     }
1430     return TransferDBErrno(errCode);
1431 }
1432 
SetHandle(void * handle)1433 void KvStoreNbDelegateImpl::SetHandle(void *handle)
1434 {
1435 #ifndef _WIN32
1436     std::lock_guard<std::mutex> lock(libMutex_);
1437     dlHandle_ = handle;
1438 #endif
1439 }
1440 } // namespace DistributedDB
1441