1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "kv_store_nb_delegate_impl.h"
17
18 #include <functional>
19 #include <string>
20
21 #include "db_common.h"
22 #include "db_constant.h"
23 #include "db_errno.h"
24 #include "db_types.h"
25 #include "kv_store_changed_data_impl.h"
26 #include "kv_store_errno.h"
27 #include "kv_store_nb_conflict_data_impl.h"
28 #include "kv_store_observer.h"
29 #include "kv_store_result_set_impl.h"
30 #include "kvdb_manager.h"
31 #include "kvdb_pragma.h"
32 #include "log_print.h"
33 #include "param_check_utils.h"
34 #include "performance_analysis.h"
35 #include "platform_specific.h"
36 #include "store_types.h"
37 #include "sync_operation.h"
38
39 namespace DistributedDB {
40 namespace {
41 struct PragmaCmdPair {
42 int externCmd = 0;
43 int innerCmd = 0;
44 };
45
46 const PragmaCmdPair g_pragmaMap[] = {
47 {GET_DEVICE_IDENTIFIER_OF_ENTRY, PRAGMA_GET_DEVICE_IDENTIFIER_OF_ENTRY},
48 {AUTO_SYNC, PRAGMA_AUTO_SYNC},
49 {PERFORMANCE_ANALYSIS_GET_REPORT, PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT},
50 {PERFORMANCE_ANALYSIS_OPEN, PRAGMA_PERFORMANCE_ANALYSIS_OPEN},
51 {PERFORMANCE_ANALYSIS_CLOSE, PRAGMA_PERFORMANCE_ANALYSIS_CLOSE},
52 {PERFORMANCE_ANALYSIS_SET_REPORTFILENAME, PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME},
53 {GET_IDENTIFIER_OF_DEVICE, PRAGMA_GET_IDENTIFIER_OF_DEVICE},
54 {GET_QUEUED_SYNC_SIZE, PRAGMA_GET_QUEUED_SYNC_SIZE},
55 {SET_QUEUED_SYNC_LIMIT, PRAGMA_SET_QUEUED_SYNC_LIMIT},
56 {GET_QUEUED_SYNC_LIMIT, PRAGMA_GET_QUEUED_SYNC_LIMIT},
57 {SET_WIPE_POLICY, PRAGMA_SET_WIPE_POLICY},
58 {RESULT_SET_CACHE_MODE, PRAGMA_RESULT_SET_CACHE_MODE},
59 {RESULT_SET_CACHE_MAX_SIZE, PRAGMA_RESULT_SET_CACHE_MAX_SIZE},
60 {SET_SYNC_RETRY, PRAGMA_SET_SYNC_RETRY},
61 {SET_MAX_LOG_LIMIT, PRAGMA_SET_MAX_LOG_LIMIT},
62 {EXEC_CHECKPOINT, PRAGMA_EXEC_CHECKPOINT},
63 {SET_MAX_VALUE_SIZE, PRAGMA_SET_MAX_VALUE_SIZE},
64 };
65
66 constexpr const char *INVALID_CONNECTION = "[KvStoreNbDelegate] Invalid connection for operation";
67 }
68
KvStoreNbDelegateImpl(IKvDBConnection * conn,const std::string & storeId)69 KvStoreNbDelegateImpl::KvStoreNbDelegateImpl(IKvDBConnection *conn, const std::string &storeId)
70 : conn_(conn),
71 storeId_(storeId),
72 releaseFlag_(false)
73 {}
74
~KvStoreNbDelegateImpl()75 KvStoreNbDelegateImpl::~KvStoreNbDelegateImpl()
76 {
77 if (!releaseFlag_) {
78 LOGF("[KvStoreNbDelegate] Can't release directly");
79 return;
80 }
81 conn_ = nullptr;
82 #ifndef _WIN32
83 std::lock_guard<std::mutex> lock(libMutex_);
84 DBCommon::UnLoadGrdLib(dlHandle_);
85 dlHandle_ = nullptr;
86 #endif
87 }
88
Get(const Key & key,Value & value) const89 DBStatus KvStoreNbDelegateImpl::Get(const Key &key, Value &value) const
90 {
91 IOption option;
92 option.dataType = IOption::SYNC_DATA;
93 return GetInner(option, key, value);
94 }
95
GetEntries(const Key & keyPrefix,std::vector<Entry> & entries) const96 DBStatus KvStoreNbDelegateImpl::GetEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
97 {
98 IOption option;
99 option.dataType = IOption::SYNC_DATA;
100 return GetEntriesInner(option, keyPrefix, entries);
101 }
102
GetEntries(const Key & keyPrefix,KvStoreResultSet * & resultSet) const103 DBStatus KvStoreNbDelegateImpl::GetEntries(const Key &keyPrefix, KvStoreResultSet *&resultSet) const
104 {
105 if (conn_ == nullptr) {
106 LOGE("%s", INVALID_CONNECTION);
107 return DB_ERROR;
108 }
109
110 IOption option;
111 option.dataType = IOption::SYNC_DATA;
112 IKvDBResultSet *kvDbResultSet = nullptr;
113 int errCode = conn_->GetResultSet(option, keyPrefix, kvDbResultSet);
114 if (errCode == E_OK) {
115 resultSet = new (std::nothrow) KvStoreResultSetImpl(kvDbResultSet);
116 if (resultSet != nullptr) {
117 return OK;
118 }
119
120 LOGE("[KvStoreNbDelegate] Alloc result set failed.");
121 conn_->ReleaseResultSet(kvDbResultSet);
122 kvDbResultSet = nullptr;
123 return DB_ERROR;
124 }
125
126 LOGE("[KvStoreNbDelegate] Get result set failed: %d", errCode);
127 return TransferDBErrno(errCode);
128 }
129
GetEntries(const Query & query,std::vector<Entry> & entries) const130 DBStatus KvStoreNbDelegateImpl::GetEntries(const Query &query, std::vector<Entry> &entries) const
131 {
132 IOption option;
133 option.dataType = IOption::SYNC_DATA;
134 if (conn_ != nullptr) {
135 int errCode = conn_->GetEntries(option, query, entries);
136 if (errCode == E_OK) {
137 return OK;
138 } else if (errCode == -E_NOT_FOUND) {
139 LOGD("[KvStoreNbDelegate] Not found the data by query");
140 return NOT_FOUND;
141 }
142
143 LOGE("[KvStoreNbDelegate] Get the batch data by query err:%d", errCode);
144 return TransferDBErrno(errCode);
145 }
146
147 LOGE("%s", INVALID_CONNECTION);
148 return DB_ERROR;
149 }
150
GetEntries(const Query & query,KvStoreResultSet * & resultSet) const151 DBStatus KvStoreNbDelegateImpl::GetEntries(const Query &query, KvStoreResultSet *&resultSet) const
152 {
153 if (conn_ == nullptr) {
154 LOGE("%s", INVALID_CONNECTION);
155 return DB_ERROR;
156 }
157
158 IOption option;
159 option.dataType = IOption::SYNC_DATA;
160 IKvDBResultSet *kvDbResultSet = nullptr;
161 int errCode = conn_->GetResultSet(option, query, kvDbResultSet);
162 if (errCode == E_OK) {
163 resultSet = new (std::nothrow) KvStoreResultSetImpl(kvDbResultSet);
164 if (resultSet != nullptr) {
165 return OK;
166 }
167
168 LOGE("[KvStoreNbDelegate] Alloc result set failed.");
169 conn_->ReleaseResultSet(kvDbResultSet);
170 kvDbResultSet = nullptr;
171 return DB_ERROR;
172 }
173
174 LOGE("[KvStoreNbDelegate] Get result set for query failed: %d", errCode);
175 return TransferDBErrno(errCode);
176 }
177
GetCount(const Query & query,int & count) const178 DBStatus KvStoreNbDelegateImpl::GetCount(const Query &query, int &count) const
179 {
180 if (conn_ == nullptr) {
181 LOGE("%s", INVALID_CONNECTION);
182 return DB_ERROR;
183 }
184
185 IOption option;
186 option.dataType = IOption::SYNC_DATA;
187 int errCode = conn_->GetCount(option, query, count);
188 if (errCode == E_OK) {
189 if (count == 0) {
190 return NOT_FOUND;
191 }
192 return OK;
193 }
194
195 LOGE("[KvStoreNbDelegate] Get count for query failed: %d", errCode);
196 return TransferDBErrno(errCode);
197 }
198
CloseResultSet(KvStoreResultSet * & resultSet)199 DBStatus KvStoreNbDelegateImpl::CloseResultSet(KvStoreResultSet *&resultSet)
200 {
201 if (resultSet == nullptr) {
202 return INVALID_ARGS;
203 }
204
205 if (conn_ == nullptr) {
206 LOGE("%s", INVALID_CONNECTION);
207 return DB_ERROR;
208 }
209
210 // release inner result set
211 IKvDBResultSet *kvDbResultSet = nullptr;
212 (static_cast<KvStoreResultSetImpl *>(resultSet))->GetResultSet(kvDbResultSet);
213 conn_->ReleaseResultSet(kvDbResultSet);
214 // release external result set
215 delete resultSet;
216 resultSet = nullptr;
217 return OK;
218 }
219
Put(const Key & key,const Value & value)220 DBStatus KvStoreNbDelegateImpl::Put(const Key &key, const Value &value)
221 {
222 IOption option;
223 option.dataType = IOption::SYNC_DATA;
224 return PutInner(option, key, value);
225 }
226
PutBatch(const std::vector<Entry> & entries)227 DBStatus KvStoreNbDelegateImpl::PutBatch(const std::vector<Entry> &entries)
228 {
229 if (conn_ != nullptr) {
230 IOption option;
231 option.dataType = IOption::SYNC_DATA;
232 int errCode = conn_->PutBatch(option, entries);
233 if (errCode == E_OK) {
234 return OK;
235 }
236
237 LOGE("[KvStoreNbDelegate] Put batch data failed:%d", errCode);
238 return TransferDBErrno(errCode);
239 }
240
241 LOGE("%s", INVALID_CONNECTION);
242 return DB_ERROR;
243 }
244
DeleteBatch(const std::vector<Key> & keys)245 DBStatus KvStoreNbDelegateImpl::DeleteBatch(const std::vector<Key> &keys)
246 {
247 if (conn_ == nullptr) {
248 LOGE("%s", INVALID_CONNECTION);
249 return DB_ERROR;
250 }
251
252 IOption option;
253 option.dataType = IOption::SYNC_DATA;
254 int errCode = conn_->DeleteBatch(option, keys);
255 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
256 return OK;
257 }
258
259 LOGE("[KvStoreNbDelegate] Delete batch data failed:%d", errCode);
260 return TransferDBErrno(errCode);
261 }
262
Delete(const Key & key)263 DBStatus KvStoreNbDelegateImpl::Delete(const Key &key)
264 {
265 IOption option;
266 option.dataType = IOption::SYNC_DATA;
267 return DeleteInner(option, key);
268 }
269
GetLocal(const Key & key,Value & value) const270 DBStatus KvStoreNbDelegateImpl::GetLocal(const Key &key, Value &value) const
271 {
272 IOption option;
273 option.dataType = IOption::LOCAL_DATA;
274 return GetInner(option, key, value);
275 }
276
GetLocalEntries(const Key & keyPrefix,std::vector<Entry> & entries) const277 DBStatus KvStoreNbDelegateImpl::GetLocalEntries(const Key &keyPrefix, std::vector<Entry> &entries) const
278 {
279 IOption option;
280 option.dataType = IOption::LOCAL_DATA;
281 return GetEntriesInner(option, keyPrefix, entries);
282 }
283
PutLocal(const Key & key,const Value & value)284 DBStatus KvStoreNbDelegateImpl::PutLocal(const Key &key, const Value &value)
285 {
286 IOption option;
287 option.dataType = IOption::LOCAL_DATA;
288 return PutInner(option, key, value);
289 }
290
DeleteLocal(const Key & key)291 DBStatus KvStoreNbDelegateImpl::DeleteLocal(const Key &key)
292 {
293 IOption option;
294 option.dataType = IOption::LOCAL_DATA;
295 return DeleteInner(option, key);
296 }
297
PublishLocal(const Key & key,bool deleteLocal,bool updateTimestamp,const KvStoreNbPublishOnConflict & onConflict)298 DBStatus KvStoreNbDelegateImpl::PublishLocal(const Key &key, bool deleteLocal, bool updateTimestamp,
299 const KvStoreNbPublishOnConflict &onConflict)
300 {
301 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
302 LOGW("[KvStoreNbDelegate][Publish] Invalid para");
303 return INVALID_ARGS;
304 }
305
306 if (conn_ != nullptr) {
307 PragmaPublishInfo publishInfo{ key, deleteLocal, updateTimestamp, onConflict };
308 int errCode = conn_->Pragma(PRAGMA_PUBLISH_LOCAL, static_cast<PragmaData>(&publishInfo));
309 if (errCode != E_OK) {
310 LOGE("[KvStoreNbDelegate] Publish local err:%d", errCode);
311 }
312 return TransferDBErrno(errCode);
313 }
314
315 LOGE("%s", INVALID_CONNECTION);
316 return DB_ERROR;
317 }
318
UnpublishToLocal(const Key & key,bool deletePublic,bool updateTimestamp)319 DBStatus KvStoreNbDelegateImpl::UnpublishToLocal(const Key &key, bool deletePublic, bool updateTimestamp)
320 {
321 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
322 LOGW("[KvStoreNbDelegate][Unpublish] Invalid para");
323 return INVALID_ARGS;
324 }
325
326 if (conn_ != nullptr) {
327 PragmaUnpublishInfo unpublishInfo{ key, deletePublic, updateTimestamp };
328 int errCode = conn_->Pragma(PRAGMA_UNPUBLISH_SYNC, static_cast<PragmaData>(&unpublishInfo));
329 if (errCode != E_OK) {
330 LOGE("[KvStoreNbDelegate] Unpublish result:%d", errCode);
331 }
332 return TransferDBErrno(errCode);
333 }
334
335 LOGE("%s", INVALID_CONNECTION);
336 return DB_ERROR;
337 }
338
PutLocalBatch(const std::vector<Entry> & entries)339 DBStatus KvStoreNbDelegateImpl::PutLocalBatch(const std::vector<Entry> &entries)
340 {
341 if (conn_ == nullptr) {
342 LOGE("%s", INVALID_CONNECTION);
343 return DB_ERROR;
344 }
345
346 IOption option;
347 option.dataType = IOption::LOCAL_DATA;
348 int errCode = conn_->PutBatch(option, entries);
349 if (errCode != E_OK) {
350 LOGE("[KvStoreNbDelegate] Put local batch data failed:%d", errCode);
351 }
352 return TransferDBErrno(errCode);
353 }
354
DeleteLocalBatch(const std::vector<Key> & keys)355 DBStatus KvStoreNbDelegateImpl::DeleteLocalBatch(const std::vector<Key> &keys)
356 {
357 if (conn_ == nullptr) {
358 LOGE("%s", INVALID_CONNECTION);
359 return DB_ERROR;
360 }
361
362 IOption option;
363 option.dataType = IOption::LOCAL_DATA;
364 int errCode = conn_->DeleteBatch(option, keys);
365 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
366 return OK;
367 }
368
369 LOGE("[KvStoreNbDelegate] Delete local batch data failed:%d", errCode);
370 return TransferDBErrno(errCode);
371 }
372
RegisterObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)373 DBStatus KvStoreNbDelegateImpl::RegisterObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
374 {
375 if (key.size() > DBConstant::MAX_KEY_SIZE) {
376 return INVALID_ARGS;
377 }
378 if (observer == nullptr) {
379 LOGE("[KvStoreNbDelegate][RegisterObserver] Observer is null");
380 return INVALID_ARGS;
381 }
382 if (conn_ == nullptr) {
383 LOGE("[RegisterObserver]%s", INVALID_CONNECTION);
384 return DB_ERROR;
385 }
386
387 uint64_t rawMode = DBCommon::EraseBit(mode, DBConstant::OBSERVER_CHANGES_MASK);
388 if (rawMode == static_cast<uint64_t>(ObserverMode::OBSERVER_CHANGES_CLOUD)) {
389 return RegisterCloudObserver(key, mode, observer);
390 }
391 return RegisterDeviceObserver(key, static_cast<unsigned int>(rawMode), observer);
392 }
393
CheckDeviceObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)394 DBStatus KvStoreNbDelegateImpl::CheckDeviceObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
395 {
396 if (!ParamCheckUtils::CheckObserver(key, mode)) {
397 LOGE("[KvStoreNbDelegate][CheckDeviceObserver] Register nb observer by illegal mode or key size!");
398 return INVALID_ARGS;
399 }
400
401 if (observerMap_.size() >= DBConstant::MAX_OBSERVER_COUNT) {
402 LOGE("[KvStoreNbDelegate][CheckDeviceObserver] The number of kv observers has been over limit, storeId[%.3s]",
403 storeId_.c_str());
404 return OVER_MAX_LIMITS;
405 }
406 if (observerMap_.find(observer) != observerMap_.end()) {
407 LOGE("[KvStoreNbDelegate][CheckDeviceObserver] Observer has been already registered!");
408 return ALREADY_SET;
409 }
410 return OK;
411 }
412
RegisterDeviceObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)413 DBStatus KvStoreNbDelegateImpl::RegisterDeviceObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
414 {
415 if (conn_->IsTransactionStarted()) {
416 LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Transaction unfinished");
417 return BUSY;
418 }
419 std::lock_guard<std::mutex> lockGuard(observerMapLock_);
420 DBStatus status = CheckDeviceObserver(key, mode, observer);
421 if (status != OK) {
422 LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Observer map cannot be registered, status:%d", status);
423 return status;
424 }
425
426 int errCode = E_OK;
427 auto storeId = storeId_;
428 KvDBObserverHandle *observerHandle = conn_->RegisterObserver(
429 mode, key,
430 [observer, storeId](const KvDBCommitNotifyData ¬ifyData) {
431 KvStoreChangedDataImpl data(¬ifyData);
432 LOGD("[KvStoreNbDelegate][RegisterDeviceObserver] Trigger on change");
433 observer->OnChange(data);
434 },
435 errCode);
436
437 if (errCode != E_OK || observerHandle == nullptr) {
438 LOGE("[KvStoreNbDelegate][RegisterDeviceObserver] Register device observer failed:%d!", errCode);
439 return DB_ERROR;
440 }
441
442 observerMap_.insert(std::pair<const KvStoreObserver *, const KvDBObserverHandle *>(observer, observerHandle));
443 LOGI("[KvStoreNbDelegate][RegisterDeviceObserver] Register device observer ok mode:%u", mode);
444 return OK;
445 }
446
CheckCloudObserver(KvStoreObserver * observer)447 DBStatus KvStoreNbDelegateImpl::CheckCloudObserver(KvStoreObserver *observer)
448 {
449 if (cloudObserverMap_.size() >= DBConstant::MAX_OBSERVER_COUNT) {
450 LOGE("[KvStoreNbDelegate][CheckCloudObserver] The number of kv cloud observers over limit, storeId[%.3s]",
451 storeId_.c_str());
452 return OVER_MAX_LIMITS;
453 }
454 if (cloudObserverMap_.find(observer) != cloudObserverMap_.end()) {
455 LOGE("[KvStoreNbDelegate][CheckCloudObserver] Cloud observer has been already registered!");
456 return ALREADY_SET;
457 }
458 return OK;
459 }
460
RegisterCloudObserver(const Key & key,unsigned int mode,KvStoreObserver * observer)461 DBStatus KvStoreNbDelegateImpl::RegisterCloudObserver(const Key &key, unsigned int mode, KvStoreObserver *observer)
462 {
463 std::lock_guard<std::mutex> lockGuard(observerMapLock_);
464 DBStatus status = CheckCloudObserver(observer);
465 if (status != OK) {
466 LOGE("[KvStoreNbDelegate][RegisterCloudObserver] Cloud observer map cannot be registered, status:%d", status);
467 return status;
468 }
469
470 auto storeId = storeId_;
471 ObserverAction action = [observer, storeId](
472 const std::string &device, ChangedData &&changedData, bool isChangedData) {
473 if (isChangedData) {
474 LOGD("[KvStoreNbDelegate][RegisterCloudObserver] Trigger on change");
475 observer->OnChange(Origin::ORIGIN_CLOUD, device, std::move(changedData));
476 }
477 };
478 int errCode = conn_->RegisterObserverAction(observer, action);
479 if (errCode != E_OK) {
480 LOGE("[KvStoreNbDelegate][RegisterCloudObserver] Register cloud observer failed:%d!", errCode);
481 return DB_ERROR;
482 }
483 cloudObserverMap_[observer] = mode;
484 LOGI("[KvStoreNbDelegate][RegisterCloudObserver] Register cloud observer ok mode:%u", mode);
485 return OK;
486 }
487
UnRegisterObserver(const KvStoreObserver * observer)488 DBStatus KvStoreNbDelegateImpl::UnRegisterObserver(const KvStoreObserver *observer)
489 {
490 if (observer == nullptr) {
491 return INVALID_ARGS;
492 }
493
494 if (conn_ == nullptr) {
495 LOGE("%s", INVALID_CONNECTION);
496 return DB_ERROR;
497 }
498
499 DBStatus cloudRet = UnRegisterCloudObserver(observer);
500 DBStatus devRet = UnRegisterDeviceObserver(observer);
501 if (cloudRet == OK || devRet == OK) {
502 return OK;
503 }
504 return devRet;
505 }
506
UnRegisterDeviceObserver(const KvStoreObserver * observer)507 DBStatus KvStoreNbDelegateImpl::UnRegisterDeviceObserver(const KvStoreObserver *observer)
508 {
509 std::lock_guard<std::mutex> lockGuard(observerMapLock_);
510 auto iter = observerMap_.find(observer);
511 if (iter == observerMap_.end()) {
512 LOGE("[KvStoreNbDelegate] Observer has not been registered!");
513 return NOT_FOUND;
514 }
515
516 const KvDBObserverHandle *observerHandle = iter->second;
517 int errCode = conn_->UnRegisterObserver(observerHandle);
518 if (errCode != E_OK) {
519 LOGE("[KvStoreNbDelegate] UnRegistObserver failed:%d!", errCode);
520 return DB_ERROR;
521 }
522 observerMap_.erase(iter);
523 return OK;
524 }
525
UnRegisterCloudObserver(const KvStoreObserver * observer)526 DBStatus KvStoreNbDelegateImpl::UnRegisterCloudObserver(const KvStoreObserver *observer)
527 {
528 std::lock_guard<std::mutex> lockGuard(observerMapLock_);
529 auto iter = cloudObserverMap_.find(observer);
530 if (iter == cloudObserverMap_.end()) {
531 LOGW("[KvStoreNbDelegate] CloudObserver has not been registered!");
532 return NOT_FOUND;
533 }
534 int errCode = conn_->UnRegisterObserverAction(observer);
535 if (errCode != E_OK) {
536 LOGE("[KvStoreNbDelegate] UnRegisterCloudObserver failed:%d!", errCode);
537 return DB_ERROR;
538 }
539 cloudObserverMap_.erase(iter);
540 return OK;
541 }
542
RemoveDeviceData(const std::string & device)543 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device)
544 {
545 if (conn_ == nullptr) {
546 LOGE("%s", INVALID_CONNECTION);
547 return DB_ERROR;
548 }
549 if (device.empty() || device.length() > DBConstant::MAX_DEV_LENGTH) {
550 return INVALID_ARGS;
551 }
552 int errCode = conn_->Pragma(PRAGMA_RM_DEVICE_DATA,
553 const_cast<void *>(static_cast<const void *>(&device)));
554 if (errCode != E_OK) {
555 LOGE("[KvStoreNbDelegate][%.3s] Remove specific device data failed:%d", storeId_.c_str(), errCode);
556 } else {
557 LOGI("[KvStoreNbDelegate][%.3s] Remove specific device data OK", storeId_.c_str());
558 }
559 return TransferDBErrno(errCode);
560 }
561
GetStoreId() const562 std::string KvStoreNbDelegateImpl::GetStoreId() const
563 {
564 return storeId_;
565 }
566
Sync(const std::vector<std::string> & devices,SyncMode mode,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,bool wait=false)567 DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
568 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
569 bool wait = false)
570 {
571 if (conn_ == nullptr) {
572 LOGE("%s", INVALID_CONNECTION);
573 return DB_ERROR;
574 }
575 if (mode > SYNC_MODE_PUSH_PULL) {
576 LOGE("not support other mode");
577 return NOT_SUPPORT;
578 }
579
580 PragmaSync pragmaData(
581 devices, mode, [this, onComplete](const std::map<std::string, int> &statuses) {
582 OnSyncComplete(statuses, onComplete);
583 }, wait);
584 int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
585 if (errCode < E_OK) {
586 LOGE("[KvStoreNbDelegate] Sync data failed:%d", errCode);
587 return TransferDBErrno(errCode);
588 }
589 return OK;
590 }
591
Sync(const std::vector<std::string> & devices,SyncMode mode,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)592 DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
593 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
594 const Query &query, bool wait)
595 {
596 DeviceSyncOption option;
597 option.devices = devices;
598 option.mode = mode;
599 option.isQuery = true;
600 option.query = query;
601 option.isWait = wait;
602 return Sync(option, onComplete);
603 }
604
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & processMap,const DeviceSyncProcessCallback & onProcess) const605 void KvStoreNbDelegateImpl::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
606 const DeviceSyncProcessCallback &onProcess) const
607 {
608 std::map<std::string, DeviceSyncProcess> result;
609 for (const auto &pair : processMap) {
610 DeviceSyncProcess info = pair.second;
611 int status = info.errCode;
612 info.errCode = SyncOperation::DBStatusTrans(status);
613 info.process = SyncOperation::DBStatusTransProcess(status);
614 result.insert(std::pair<std::string, DeviceSyncProcess>(pair.first, info));
615 }
616 if (onProcess) {
617 onProcess(result);
618 }
619 }
620
Sync(const DeviceSyncOption & option,const DeviceSyncProcessCallback & onProcess)621 DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
622 {
623 if (conn_ == nullptr) {
624 LOGE("%s", INVALID_CONNECTION);
625 return DB_ERROR;
626 }
627 if (option.mode != SYNC_MODE_PULL_ONLY) {
628 LOGE("Not support other mode");
629 return NOT_SUPPORT;
630 }
631 DeviceSyncProcessCallback onSyncProcess = [this, onProcess](const std::map<std::string, DeviceSyncProcess> &map) {
632 OnDeviceSyncProcess(map, onProcess);
633 };
634 int errCode = E_OK;
635 if (option.isQuery) {
636 QuerySyncObject querySyncObj(option.query);
637 if (!querySyncObj.GetRelationTableNames().empty()) {
638 LOGE("Sync with option and check query table names from tables failed!");
639 return NOT_SUPPORT;
640 }
641 if (!DBCommon::CheckQueryWithoutMultiTable(option.query)) {
642 LOGE("Not support for invalid query");
643 return NOT_SUPPORT;
644 }
645
646 if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
647 LOGE("Not support order by timestamp and query by range");
648 return NOT_SUPPORT;
649 }
650 PragmaSync pragmaData(option, querySyncObj, onSyncProcess);
651 errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
652 } else {
653 PragmaSync pragmaData(option, onSyncProcess);
654 errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
655 }
656
657 if (errCode != E_OK) {
658 LOGE("[KvStoreNbDelegate] DeviceSync data failed:%d", errCode);
659 return TransferDBErrno(errCode);
660 }
661 return OK;
662 }
663
Sync(const DeviceSyncOption & option,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete)664 DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncOption &option,
665 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete)
666 {
667 if (conn_ == nullptr) {
668 LOGE("%s", INVALID_CONNECTION);
669 return DB_ERROR;
670 }
671 if (option.mode > SYNC_MODE_PUSH_PULL) {
672 LOGE("not support other mode");
673 return NOT_SUPPORT;
674 }
675
676 int errCode = E_OK;
677 if (option.isQuery) {
678 QuerySyncObject querySyncObj(option.query);
679 if (!querySyncObj.GetRelationTableNames().empty()) {
680 LOGE("check query table names from tables failed!");
681 return NOT_SUPPORT;
682 }
683
684 if (!DBCommon::CheckQueryWithoutMultiTable(option.query)) {
685 LOGE("not support for invalid query");
686 return NOT_SUPPORT;
687 }
688 if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
689 LOGE("not support order by timestamp and query by range");
690 return NOT_SUPPORT;
691 }
692 PragmaSync pragmaData(option, querySyncObj, [this, onComplete](const std::map<std::string, int> &statuses) {
693 OnSyncComplete(statuses, onComplete);
694 });
695 errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
696 } else {
697 PragmaSync pragmaData(option, [this, onComplete](const std::map<std::string, int> &statuses) {
698 OnSyncComplete(statuses, onComplete);
699 });
700 errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
701 }
702 if (errCode != E_OK) {
703 LOGE("[KvStoreNbDelegate] QuerySync data failed:%d", errCode);
704 return TransferDBErrno(errCode);
705 }
706 return OK;
707 }
708
CancelSync(uint32_t syncId)709 DBStatus KvStoreNbDelegateImpl::CancelSync(uint32_t syncId)
710 {
711 if (conn_ == nullptr) {
712 LOGE("%s", INVALID_CONNECTION);
713 return DB_ERROR;
714 }
715 uint32_t tempSyncId = syncId;
716 int errCode = conn_->Pragma(PRAGMA_CANCEL_SYNC_DEVICES, &tempSyncId);
717 if (errCode != E_OK) {
718 LOGE("[KvStoreNbDelegate] CancelSync failed:%d", errCode);
719 return TransferDBErrno(errCode);
720 }
721 return OK;
722 }
723
Pragma(PragmaCmd cmd,PragmaData & paramData)724 DBStatus KvStoreNbDelegateImpl::Pragma(PragmaCmd cmd, PragmaData ¶mData)
725 {
726 if (conn_ == nullptr) {
727 LOGE("%s", INVALID_CONNECTION);
728 return DB_ERROR;
729 }
730
731 int errCode = -E_NOT_SUPPORT;
732 for (const auto &item : g_pragmaMap) {
733 if (item.externCmd == cmd) {
734 errCode = conn_->Pragma(item.innerCmd, paramData);
735 break;
736 }
737 }
738
739 if (errCode != E_OK) {
740 LOGE("[KvStoreNbDelegate] Pragma failed:%d", errCode);
741 }
742 return TransferDBErrno(errCode);
743 }
744
SetConflictNotifier(int conflictType,const KvStoreNbConflictNotifier & notifier)745 DBStatus KvStoreNbDelegateImpl::SetConflictNotifier(int conflictType, const KvStoreNbConflictNotifier ¬ifier)
746 {
747 if (conn_ == nullptr) {
748 LOGE("%s", INVALID_CONNECTION);
749 return DB_ERROR;
750 }
751
752 if (!ParamCheckUtils::CheckConflictNotifierType(conflictType)) {
753 LOGE("%s", INVALID_CONNECTION);
754 return INVALID_ARGS;
755 }
756
757 int errCode;
758 if (!notifier) {
759 errCode = conn_->SetConflictNotifier(conflictType, nullptr);
760 goto END;
761 }
762
763 errCode = conn_->SetConflictNotifier(conflictType,
764 [conflictType, notifier](const KvDBCommitNotifyData &data) {
765 int resultCode;
766 const std::list<KvDBConflictEntry> entries = data.GetCommitConflicts(resultCode);
767 if (resultCode != E_OK) {
768 LOGE("Get commit conflicted entries failed:%d!", resultCode);
769 return;
770 }
771
772 for (const auto &entry : entries) {
773 // Prohibit signed numbers to perform bit operations
774 uint32_t entryType = static_cast<uint32_t>(entry.type);
775 uint32_t type = static_cast<uint32_t>(conflictType);
776 if (entryType & type) {
777 KvStoreNbConflictDataImpl dataImpl;
778 dataImpl.SetConflictData(entry);
779 notifier(dataImpl);
780 }
781 }
782 });
783
784 END:
785 if (errCode != E_OK) {
786 LOGE("[KvStoreNbDelegate] Register conflict failed:%d!", errCode);
787 }
788 return TransferDBErrno(errCode);
789 }
790
Rekey(const CipherPassword & password)791 DBStatus KvStoreNbDelegateImpl::Rekey(const CipherPassword &password)
792 {
793 if (conn_ == nullptr) {
794 LOGE("%s", INVALID_CONNECTION);
795 return DB_ERROR;
796 }
797
798 int errCode = conn_->Rekey(password);
799 if (errCode == E_OK) {
800 return OK;
801 }
802
803 LOGE("[KvStoreNbDelegate] Rekey failed:%d", errCode);
804 return TransferDBErrno(errCode);
805 }
806
Export(const std::string & filePath,const CipherPassword & passwd,bool force)807 DBStatus KvStoreNbDelegateImpl::Export(const std::string &filePath, const CipherPassword &passwd, bool force)
808 {
809 if (conn_ == nullptr) {
810 LOGE("%s", INVALID_CONNECTION);
811 return DB_ERROR;
812 }
813
814 std::string fileDir;
815 std::string fileName;
816 OS::SplitFilePath(filePath, fileDir, fileName);
817
818 std::string canonicalUrl;
819 if (!ParamCheckUtils::CheckDataDir(fileDir, canonicalUrl)) {
820 return INVALID_ARGS;
821 }
822
823 if (!OS::CheckPathExistence(canonicalUrl)) {
824 return NO_PERMISSION;
825 }
826
827 canonicalUrl = canonicalUrl + "/" + fileName;
828 if (!force && OS::CheckPathExistence(canonicalUrl)) {
829 return FILE_ALREADY_EXISTED;
830 }
831
832 int errCode = conn_->Export(canonicalUrl, passwd);
833 if (errCode == E_OK) {
834 return OK;
835 }
836 LOGE("[KvStoreNbDelegate] Export failed:%d", errCode);
837 return TransferDBErrno(errCode);
838 }
839
Import(const std::string & filePath,const CipherPassword & passwd,bool isNeedIntegrityCheck)840 DBStatus KvStoreNbDelegateImpl::Import(const std::string &filePath, const CipherPassword &passwd,
841 bool isNeedIntegrityCheck)
842 {
843 if (conn_ == nullptr) {
844 LOGE("%s", INVALID_CONNECTION);
845 return DB_ERROR;
846 }
847
848 std::string fileDir;
849 std::string fileName;
850 OS::SplitFilePath(filePath, fileDir, fileName);
851
852 std::string canonicalUrl;
853 if (!ParamCheckUtils::CheckDataDir(fileDir, canonicalUrl)) {
854 return INVALID_ARGS;
855 }
856
857 canonicalUrl = canonicalUrl + "/" + fileName;
858 if (!OS::CheckPathExistence(canonicalUrl)) {
859 LOGE("Import file path err, DBStatus = INVALID_FILE errno = [%d]", errno);
860 return INVALID_FILE;
861 }
862
863 int errCode = conn_->Import(canonicalUrl, passwd, isNeedIntegrityCheck);
864 if (errCode == E_OK) {
865 LOGI("[KvStoreNbDelegate] Import ok");
866 return OK;
867 }
868
869 LOGE("[KvStoreNbDelegate] Import failed:%d", errCode);
870 return TransferDBErrno(errCode);
871 }
872
StartTransaction()873 DBStatus KvStoreNbDelegateImpl::StartTransaction()
874 {
875 if (conn_ == nullptr) {
876 LOGE("%s", INVALID_CONNECTION);
877 return DB_ERROR;
878 }
879
880 int errCode = conn_->StartTransaction();
881 if (errCode != E_OK) {
882 LOGE("[KvStoreNbDelegate] StartTransaction failed:%d", errCode);
883 }
884 return TransferDBErrno(errCode);
885 }
886
Commit()887 DBStatus KvStoreNbDelegateImpl::Commit()
888 {
889 if (conn_ == nullptr) {
890 LOGE("%s", INVALID_CONNECTION);
891 return DB_ERROR;
892 }
893
894 int errCode = conn_->Commit();
895 if (errCode != E_OK) {
896 LOGE("[KvStoreNbDelegate] Commit failed:%d", errCode);
897 }
898 return TransferDBErrno(errCode);
899 }
900
Rollback()901 DBStatus KvStoreNbDelegateImpl::Rollback()
902 {
903 if (conn_ == nullptr) {
904 LOGE("%s", INVALID_CONNECTION);
905 return DB_ERROR;
906 }
907
908 int errCode = conn_->RollBack();
909 if (errCode != E_OK) {
910 LOGE("[KvStoreNbDelegate] Rollback failed:%d", errCode);
911 }
912 return TransferDBErrno(errCode);
913 }
914
SetReleaseFlag(bool flag)915 void KvStoreNbDelegateImpl::SetReleaseFlag(bool flag)
916 {
917 releaseFlag_ = flag;
918 }
919
Close(bool isCloseImmediately)920 DBStatus KvStoreNbDelegateImpl::Close(bool isCloseImmediately)
921 {
922 if (conn_ != nullptr) {
923 int errCode = KvDBManager::ReleaseDatabaseConnection(conn_, isCloseImmediately);
924 if (errCode == -E_BUSY) {
925 LOGI("[KvStoreNbDelegate] Busy for close");
926 return BUSY;
927 }
928 conn_ = nullptr;
929 }
930 return OK;
931 }
932
CheckIntegrity() const933 DBStatus KvStoreNbDelegateImpl::CheckIntegrity() const
934 {
935 if (conn_ == nullptr) {
936 LOGE("%s", INVALID_CONNECTION);
937 return DB_ERROR;
938 }
939
940 return TransferDBErrno(conn_->CheckIntegrity());
941 }
942
GetSecurityOption(SecurityOption & option) const943 DBStatus KvStoreNbDelegateImpl::GetSecurityOption(SecurityOption &option) const
944 {
945 if (conn_ == nullptr) {
946 LOGE("%s", INVALID_CONNECTION);
947 return DB_ERROR;
948 }
949 return TransferDBErrno(conn_->GetSecurityOption(option.securityLabel, option.securityFlag));
950 }
951
SetRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier)952 DBStatus KvStoreNbDelegateImpl::SetRemotePushFinishedNotify(const RemotePushFinishedNotifier ¬ifier)
953 {
954 if (conn_ == nullptr) {
955 LOGE("%s", INVALID_CONNECTION);
956 return DB_ERROR;
957 }
958
959 PragmaRemotePushNotify notify(notifier);
960 int errCode = conn_->Pragma(PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, reinterpret_cast<void *>(¬ify));
961 if (errCode != E_OK) {
962 LOGE("[KvStoreNbDelegate] Set remote push finished notify failed : %d", errCode);
963 }
964 return TransferDBErrno(errCode);
965 }
966
GetInner(const IOption & option,const Key & key,Value & value) const967 DBStatus KvStoreNbDelegateImpl::GetInner(const IOption &option, const Key &key, Value &value) const
968 {
969 if (conn_ == nullptr) {
970 LOGE("%s", INVALID_CONNECTION);
971 return DB_ERROR;
972 }
973
974 int errCode = conn_->Get(option, key, value);
975 if (errCode == E_OK) {
976 return OK;
977 }
978
979 if (errCode != -E_NOT_FOUND) {
980 LOGE("[KvStoreNbDelegate] Get the data failed:%d", errCode);
981 }
982 return TransferDBErrno(errCode);
983 }
984
GetEntriesInner(const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const985 DBStatus KvStoreNbDelegateImpl::GetEntriesInner(const IOption &option,
986 const Key &keyPrefix, std::vector<Entry> &entries) const
987 {
988 if (conn_ == nullptr) {
989 LOGE("%s", INVALID_CONNECTION);
990 return DB_ERROR;
991 }
992
993 int errCode = conn_->GetEntries(option, keyPrefix, entries);
994 if (errCode == E_OK) {
995 return OK;
996 }
997 LOGE("[KvStoreNbDelegate] Get the batch data failed:%d", errCode);
998 return TransferDBErrno(errCode);
999 }
1000
PutInner(const IOption & option,const Key & key,const Value & value)1001 DBStatus KvStoreNbDelegateImpl::PutInner(const IOption &option, const Key &key, const Value &value)
1002 {
1003 if (conn_ == nullptr) {
1004 LOGE("%s", INVALID_CONNECTION);
1005 return DB_ERROR;
1006 }
1007
1008 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
1009 if (performance != nullptr) {
1010 performance->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_PUT_DATA);
1011 }
1012
1013 int errCode = conn_->Put(option, key, value);
1014 if (performance != nullptr) {
1015 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_PUT_DATA);
1016 }
1017
1018 if (errCode == E_OK) {
1019 return OK;
1020 }
1021 LOGE("[KvStoreNbDelegate] Put the data failed:%d", errCode);
1022 return TransferDBErrno(errCode);
1023 }
1024
DeleteInner(const IOption & option,const Key & key)1025 DBStatus KvStoreNbDelegateImpl::DeleteInner(const IOption &option, const Key &key)
1026 {
1027 if (conn_ == nullptr) {
1028 LOGE("%s", INVALID_CONNECTION);
1029 return DB_ERROR;
1030 }
1031
1032 int errCode = conn_->Delete(option, key);
1033 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
1034 return OK;
1035 }
1036
1037 LOGE("[KvStoreNbDelegate] Delete the data failed:%d", errCode);
1038 return TransferDBErrno(errCode);
1039 }
1040
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete) const1041 void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<std::string, int> &statuses,
1042 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete) const
1043 {
1044 std::map<std::string, DBStatus> result;
1045 for (const auto &pair : statuses) {
1046 DBStatus status = SyncOperation::DBStatusTrans(pair.second);
1047 result.insert(std::pair<std::string, DBStatus>(pair.first, status));
1048 }
1049 if (onComplete) {
1050 onComplete(result);
1051 }
1052 }
1053
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)1054 DBStatus KvStoreNbDelegateImpl::SetEqualIdentifier(const std::string &identifier,
1055 const std::vector<std::string> &targets)
1056 {
1057 if (conn_ == nullptr) {
1058 LOGE("%s", INVALID_CONNECTION);
1059 return DB_ERROR;
1060 }
1061
1062 PragmaSetEqualIdentifier pragma(identifier, targets);
1063 int errCode = conn_->Pragma(PRAGMA_ADD_EQUAL_IDENTIFIER, reinterpret_cast<void *>(&pragma));
1064 if (errCode != E_OK) {
1065 LOGE("[KvStoreNbDelegate] Set store equal identifier failed : %d", errCode);
1066 }
1067
1068 return TransferDBErrno(errCode);
1069 }
1070
SetPushDataInterceptor(const PushDataInterceptor & interceptor)1071 DBStatus KvStoreNbDelegateImpl::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
1072 {
1073 if (conn_ == nullptr) {
1074 LOGE("%s", INVALID_CONNECTION);
1075 return DB_ERROR;
1076 }
1077
1078 PushDataInterceptor notify = interceptor;
1079 int errCode = conn_->Pragma(PRAGMA_INTERCEPT_SYNC_DATA, static_cast<void *>(¬ify));
1080 if (errCode != E_OK) {
1081 LOGE("[KvStoreNbDelegate] Set data interceptor notify failed : %d", errCode);
1082 }
1083 return TransferDBErrno(errCode);
1084 }
1085
SubscribeRemoteQuery(const std::vector<std::string> & devices,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)1086 DBStatus KvStoreNbDelegateImpl::SubscribeRemoteQuery(const std::vector<std::string> &devices,
1087 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
1088 const Query &query, bool wait)
1089 {
1090 if (conn_ == nullptr) {
1091 LOGE("%s", INVALID_CONNECTION);
1092 return DB_ERROR;
1093 }
1094
1095 QuerySyncObject querySyncObj(query);
1096 if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
1097 LOGE("not support order by timestamp and query by range");
1098 return NOT_SUPPORT;
1099 }
1100 PragmaSync pragmaData(devices, SyncModeType::SUBSCRIBE_QUERY, querySyncObj,
1101 [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
1102 int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
1103 if (errCode < E_OK) {
1104 LOGE("[KvStoreNbDelegate] Subscribe remote data with query failed:%d", errCode);
1105 return TransferDBErrno(errCode);
1106 }
1107 return OK;
1108 }
1109
UnSubscribeRemoteQuery(const std::vector<std::string> & devices,const std::function<void (const std::map<std::string,DBStatus> & devicesMap)> & onComplete,const Query & query,bool wait)1110 DBStatus KvStoreNbDelegateImpl::UnSubscribeRemoteQuery(const std::vector<std::string> &devices,
1111 const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
1112 const Query &query, bool wait)
1113 {
1114 if (conn_ == nullptr) {
1115 LOGE("%s", INVALID_CONNECTION);
1116 return DB_ERROR;
1117 }
1118
1119 QuerySyncObject querySyncObj(query);
1120 if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
1121 LOGE("not support order by timestamp and query by range");
1122 return NOT_SUPPORT;
1123 }
1124 PragmaSync pragmaData(devices, SyncModeType::UNSUBSCRIBE_QUERY, querySyncObj,
1125 [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
1126 int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
1127 if (errCode < E_OK) {
1128 LOGE("[KvStoreNbDelegate] Unsubscribe remote data with query failed:%d", errCode);
1129 return TransferDBErrno(errCode);
1130 }
1131 return OK;
1132 }
1133
RemoveDeviceData()1134 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData()
1135 {
1136 if (conn_ == nullptr) {
1137 LOGE("%s", INVALID_CONNECTION);
1138 return DB_ERROR;
1139 }
1140
1141 std::string device; // Empty device for remove all device data
1142 int errCode = conn_->Pragma(PRAGMA_RM_DEVICE_DATA,
1143 const_cast<void *>(static_cast<const void *>(&device)));
1144 if (errCode != E_OK) {
1145 LOGE("[KvStoreNbDelegate][%.3s] Remove device data failed:%d", storeId_.c_str(), errCode);
1146 } else {
1147 LOGI("[KvStoreNbDelegate][%.3s] Remove device data OK", storeId_.c_str());
1148 }
1149 return TransferDBErrno(errCode);
1150 }
1151
GetKeys(const Key & keyPrefix,std::vector<Key> & keys) const1152 DBStatus KvStoreNbDelegateImpl::GetKeys(const Key &keyPrefix, std::vector<Key> &keys) const
1153 {
1154 if (conn_ == nullptr) {
1155 LOGE("%s", INVALID_CONNECTION);
1156 return DB_ERROR;
1157 }
1158 IOption option;
1159 option.dataType = IOption::SYNC_DATA;
1160 int errCode = conn_->GetKeys(option, keyPrefix, keys);
1161 if (errCode == E_OK) {
1162 return OK;
1163 }
1164 LOGE("[KvStoreNbDelegate] Get the keys failed:%d", errCode);
1165 return TransferDBErrno(errCode);
1166 }
1167
GetSyncDataSize(const std::string & device) const1168 size_t KvStoreNbDelegateImpl::GetSyncDataSize(const std::string &device) const
1169 {
1170 if (conn_ == nullptr) {
1171 LOGE("%s", INVALID_CONNECTION);
1172 return 0;
1173 }
1174 if (device.empty()) {
1175 LOGE("device len is invalid");
1176 return 0;
1177 }
1178 size_t size = 0;
1179 int errCode = conn_->GetSyncDataSize(device, size);
1180 if (errCode != E_OK) {
1181 LOGE("[KvStoreNbDelegate] calculate sync data size failed : %d", errCode);
1182 return 0;
1183 }
1184 return size;
1185 }
1186
UpdateKey(const UpdateKeyCallback & callback)1187 DBStatus KvStoreNbDelegateImpl::UpdateKey(const UpdateKeyCallback &callback)
1188 {
1189 if (conn_ == nullptr) {
1190 LOGE("%s", INVALID_CONNECTION);
1191 return DB_ERROR;
1192 }
1193 if (callback == nullptr) {
1194 LOGE("[KvStoreNbDelegate] Invalid callback for operation");
1195 return INVALID_ARGS;
1196 }
1197 int errCode = conn_->UpdateKey(callback);
1198 if (errCode == E_OK) {
1199 LOGI("[KvStoreNbDelegate] update keys success");
1200 return OK;
1201 }
1202 LOGE("[KvStoreNbDelegate] update keys failed:%d", errCode);
1203 return TransferDBErrno(errCode);
1204 }
1205
GetWatermarkInfo(const std::string & device)1206 std::pair<DBStatus, WatermarkInfo> KvStoreNbDelegateImpl::GetWatermarkInfo(const std::string &device)
1207 {
1208 std::pair<DBStatus, WatermarkInfo> res;
1209 if (device.empty() || device.size() > DBConstant::MAX_DEV_LENGTH) {
1210 LOGE("[KvStoreNbDelegate] device invalid length %zu", device.size());
1211 res.first = INVALID_ARGS;
1212 return res;
1213 }
1214 if (conn_ == nullptr) {
1215 LOGE("%s", INVALID_CONNECTION);
1216 res.first = DB_ERROR;
1217 return res;
1218 }
1219 int errCode = conn_->GetWatermarkInfo(device, res.second);
1220 if (errCode == E_OK) {
1221 LOGI("[KvStoreNbDelegate] get watermark info success");
1222 } else {
1223 LOGE("[KvStoreNbDelegate] get watermark info failed:%d", errCode);
1224 }
1225 res.first = TransferDBErrno(errCode);
1226 return res;
1227 }
1228
RemoveDeviceData(const std::string & device,ClearMode mode)1229 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device, ClearMode mode)
1230 {
1231 if (conn_ == nullptr) {
1232 LOGE("%s", INVALID_CONNECTION);
1233 return DB_ERROR;
1234 }
1235 int errCode = conn_->RemoveDeviceData(device, mode);
1236 if (errCode != E_OK) {
1237 LOGE("[KvStoreNbDelegate][%.3s] Remove device data res %d", storeId_.c_str(), errCode);
1238 } else {
1239 LOGI("[KvStoreNbDelegate][%.3s] Remove device data OK, mode[%d]", storeId_.c_str(), static_cast<int>(mode));
1240 }
1241 return TransferDBErrno(errCode);
1242 }
1243
RemoveDeviceData(const std::string & device,const std::string & user,ClearMode mode)1244 DBStatus KvStoreNbDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &user,
1245 ClearMode mode)
1246 {
1247 if (conn_ == nullptr) {
1248 LOGE("%s", INVALID_CONNECTION);
1249 return DB_ERROR;
1250 }
1251 if (user.empty() && mode != ClearMode::DEFAULT) {
1252 LOGE("[KvStoreNbDelegate] Remove device data with empty user!");
1253 return INVALID_ARGS;
1254 }
1255 int errCode = conn_->RemoveDeviceData(device, user, mode);
1256 if (errCode != E_OK) {
1257 LOGE("[KvStoreNbDelegate][%.3s] Remove device data with user res %d", storeId_.c_str(), errCode);
1258 } else {
1259 LOGI("[KvStoreNbDelegate][%.3s] Remove device data with user OK, mode[%d]",
1260 storeId_.c_str(), static_cast<int>(mode));
1261 }
1262 return TransferDBErrno(errCode);
1263 }
1264
GetTaskCount()1265 int32_t KvStoreNbDelegateImpl::GetTaskCount()
1266 {
1267 if (conn_ == nullptr) {
1268 LOGE("%s", INVALID_CONNECTION);
1269 return DB_ERROR;
1270 }
1271 return conn_->GetTaskCount();
1272 }
1273
SetReceiveDataInterceptor(const DataInterceptor & interceptor)1274 DBStatus KvStoreNbDelegateImpl::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
1275 {
1276 if (conn_ == nullptr) {
1277 LOGE("%s", INVALID_CONNECTION);
1278 return DB_ERROR;
1279 }
1280 int errCode = conn_->SetReceiveDataInterceptor(interceptor);
1281 if (errCode != E_OK) {
1282 LOGE("[KvStoreNbDelegate] Set receive data interceptor errCode:%d", errCode);
1283 }
1284 LOGI("[KvStoreNbDelegate] Set receive data interceptor");
1285 return TransferDBErrno(errCode);
1286 }
1287
GetDeviceEntries(const std::string & device,std::vector<Entry> & entries) const1288 DBStatus KvStoreNbDelegateImpl::GetDeviceEntries(const std::string &device, std::vector<Entry> &entries) const
1289 {
1290 if (conn_ == nullptr) {
1291 LOGE("%s", INVALID_CONNECTION);
1292 return DB_ERROR;
1293 }
1294 int errCode = conn_->GetEntries(device, entries);
1295 if (errCode == E_OK) {
1296 return OK;
1297 }
1298 LOGE("[KvStoreNbDelegate] Get the entries failed:%d", errCode);
1299 return TransferDBErrno(errCode);
1300 }
1301
GetDatabaseStatus() const1302 KvStoreNbDelegate::DatabaseStatus KvStoreNbDelegateImpl::GetDatabaseStatus() const
1303 {
1304 KvStoreNbDelegate::DatabaseStatus status;
1305 if (conn_ == nullptr) {
1306 LOGE("%s", INVALID_CONNECTION);
1307 return status;
1308 }
1309 status.isRebuild = conn_->IsRebuild();
1310 LOGI("[KvStoreNbDelegate] rebuild %d", static_cast<int>(status.isRebuild));
1311 return status;
1312 }
1313
1314 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)1315 DBStatus KvStoreNbDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
1316 {
1317 if (conn_ == nullptr) {
1318 LOGE("%s", INVALID_CONNECTION);
1319 return DB_ERROR;
1320 }
1321 return TransferDBErrno(conn_->Sync(option, onProcess));
1322 }
1323
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)1324 DBStatus KvStoreNbDelegateImpl::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
1325 {
1326 if (conn_ == nullptr) {
1327 LOGE("%s", INVALID_CONNECTION);
1328 return DB_ERROR;
1329 }
1330 if (cloudDBs.empty()) {
1331 LOGE("[KvStoreNbDelegate] no cloud db");
1332 return INVALID_ARGS;
1333 }
1334 return TransferDBErrno(conn_->SetCloudDB(cloudDBs));
1335 }
1336
SetCloudDbSchema(const std::map<std::string,DataBaseSchema> & schema)1337 DBStatus KvStoreNbDelegateImpl::SetCloudDbSchema(const std::map<std::string, DataBaseSchema> &schema)
1338 {
1339 if (conn_ == nullptr) {
1340 LOGE("%s", INVALID_CONNECTION);
1341 return DB_ERROR;
1342 }
1343 return TransferDBErrno(conn_->SetCloudDbSchema(schema));
1344 }
1345
SetCloudSyncConfig(const CloudSyncConfig & config)1346 DBStatus KvStoreNbDelegateImpl::SetCloudSyncConfig(const CloudSyncConfig &config)
1347 {
1348 if (conn_ == nullptr) {
1349 LOGE("%s", INVALID_CONNECTION);
1350 return DB_ERROR;
1351 }
1352 if (!DBCommon::CheckCloudSyncConfigValid(config)) {
1353 return INVALID_ARGS;
1354 }
1355 int errCode = conn_->SetCloudSyncConfig(config);
1356 if (errCode != E_OK) {
1357 LOGE("[KvStoreNbDelegate] Set cloud sync config errCode:%d", errCode);
1358 }
1359 LOGI("[KvStoreNbDelegate] Set cloud sync config");
1360 return TransferDBErrno(errCode);
1361 }
1362
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)1363 void KvStoreNbDelegateImpl::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
1364 {
1365 if (conn_ == nullptr || callback == nullptr) {
1366 LOGD("[KvStoreNbDelegate] Invalid connection or callback for operation");
1367 return;
1368 }
1369 conn_->SetGenCloudVersionCallback(callback);
1370 }
1371
GetCloudVersion(const std::string & device)1372 std::pair<DBStatus, std::map<std::string, std::string>> KvStoreNbDelegateImpl::GetCloudVersion(
1373 const std::string &device)
1374 {
1375 std::pair<DBStatus, std::map<std::string, std::string>> res;
1376 if (device.size() > DBConstant::MAX_DEV_LENGTH) {
1377 LOGE("[KvStoreNbDelegate] device invalid length %zu", device.size());
1378 res.first = INVALID_ARGS;
1379 return res;
1380 }
1381 if (conn_ == nullptr) {
1382 LOGE("%s", INVALID_CONNECTION);
1383 res.first = DB_ERROR;
1384 return res;
1385 }
1386 int errCode = conn_->GetCloudVersion(device, res.second);
1387 if (errCode == E_OK) {
1388 LOGI("[KvStoreNbDelegate] get cloudVersion success");
1389 } else {
1390 LOGE("[KvStoreNbDelegate] get cloudVersion failed:%d", errCode);
1391 }
1392 if (errCode == E_OK && res.second.empty()) {
1393 errCode = -E_NOT_FOUND;
1394 }
1395 res.first = TransferDBErrno(errCode);
1396 return res;
1397 }
1398
ClearMetaData(ClearKvMetaDataOption option)1399 DBStatus KvStoreNbDelegateImpl::ClearMetaData(ClearKvMetaDataOption option)
1400 {
1401 if (option.type != ClearKvMetaOpType::CLEAN_CLOUD_WATERMARK) {
1402 return NOT_SUPPORT;
1403 }
1404 if (conn_ == nullptr) {
1405 LOGE("%s", INVALID_CONNECTION);
1406 return DB_ERROR;
1407 }
1408 int errCode = conn_->ClearCloudWatermark();
1409 if (errCode == E_OK) {
1410 LOGI("[KvStoreNbDelegate][%.3s] clear kv cloud watermark success", storeId_.c_str());
1411 } else {
1412 LOGE("[KvStoreNbDelegate][%.3s] clear kv cloud watermark failed:%d", storeId_.c_str(), errCode);
1413 }
1414 return TransferDBErrno(errCode);
1415 }
1416 #endif
1417
OperateDataStatus(uint32_t dataOperator)1418 DBStatus KvStoreNbDelegateImpl::OperateDataStatus(uint32_t dataOperator)
1419 {
1420 if (conn_ == nullptr) {
1421 LOGE("%s", INVALID_CONNECTION);
1422 return DB_ERROR;
1423 }
1424 int errCode = conn_->OperateDataStatus(dataOperator);
1425 if (errCode != E_OK) {
1426 LOGE("[KvStoreNbDelegate] Operate data status errCode:%d", errCode);
1427 } else {
1428 LOGI("[KvStoreNbDelegate] Operate data status success");
1429 }
1430 return TransferDBErrno(errCode);
1431 }
1432
SetHandle(void * handle)1433 void KvStoreNbDelegateImpl::SetHandle(void *handle)
1434 {
1435 #ifndef _WIN32
1436 std::lock_guard<std::mutex> lock(libMutex_);
1437 dlHandle_ = handle;
1438 #endif
1439 }
1440 } // namespace DistributedDB
1441