1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17
18 #include <endian.h>
19 #include "bootstrap.h"
20 #include "checker/checker_manager.h"
21 #include "cloud/cloud_sync_finished_event.h"
22 #include "cloud/schema_meta.h"
23 #include "crypto_manager.h"
24 #include "device_matrix.h"
25 #include "directory/directory_manager.h"
26 #include "eventcenter/event_center.h"
27 #include "kvdb_query.h"
28 #include "log_print.h"
29 #include "metadata/meta_data_manager.h"
30 #include "metadata/secret_key_meta_data.h"
31 #include "metadata/store_meta_data_local.h"
32 #include "query_helper.h"
33 #include "rdb_cloud.h"
34 #include "snapshot/bind_event.h"
35 #include "types.h"
36 #include "user_delegate.h"
37 #include "utils/anonymous.h"
38 #include "water_version_manager.h"
39 #include "device_manager_adapter.h"
40
41 namespace OHOS::DistributedKv {
42 using namespace DistributedData;
43 using namespace DistributedDB;
44 using DBField = DistributedDB::Field;
45 using DBTable = DistributedDB::TableSchema;
46 using DBSchema = DistributedDB::DataBaseSchema;
47 using ClearMode = DistributedDB::ClearMode;
48 using DMAdapter = DistributedData::DeviceManagerAdapter;
49 using DBInterceptedData = DistributedDB::InterceptedData;
50 constexpr int UUID_WIDTH = 4;
51 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
52 { DBStatus::OK, GenErr::E_OK },
53 { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
54 { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
55 { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
56 { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
57 { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
58 { DBStatus::BUSY, GenErr::E_DB_ERROR },
59 { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
60 { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
61 { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
62 { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
63 { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
64 { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
65 { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
66 { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
67 { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
68 { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
69 { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
70 { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
71 { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
72 { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
73 };
74
75 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)76 static DBSchema GetDBSchema(const Database &database)
77 {
78 DBSchema schema;
79 schema.tables.resize(database.tables.size());
80 for (size_t i = 0; i < database.tables.size(); i++) {
81 const Table &table = database.tables[i];
82 DBTable &dbTable = schema.tables[i];
83 dbTable.name = table.name;
84 dbTable.sharedTableName = table.sharedTableName;
85 for (auto &field : table.fields) {
86 DBField dbField;
87 dbField.colName = field.colName;
88 dbField.type = field.type;
89 dbField.primary = field.primary;
90 dbField.nullable = field.nullable;
91 dbTable.fields.push_back(std::move(dbField));
92 }
93 }
94 return schema;
95 }
GetDBPassword(const StoreMetaData & data)96 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
97 {
98 DBPassword dbPassword;
99 if (!data.isEncrypt) {
100 return dbPassword;
101 }
102
103 SecretKeyMetaData secretKey;
104 secretKey.storeType = data.storeType;
105 auto storeKey = data.GetSecretKey();
106 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
107 std::vector<uint8_t> password;
108 CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
109 dbPassword.SetValue(password.data(), password.size());
110 password.assign(password.size(), 0);
111 return dbPassword;
112 }
113
GetDBSecurity(int32_t secLevel)114 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
115 {
116 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
117 return { DistributedDB::NOT_SET, DistributedDB::ECE };
118 }
119 if (secLevel == SecurityLevel::S3) {
120 return { DistributedDB::S3, DistributedDB::SECE };
121 }
122 if (secLevel == SecurityLevel::S4) {
123 return { DistributedDB::S4, DistributedDB::ECE };
124 }
125 return { secLevel, DistributedDB::ECE };
126 }
127
GetDBOption(const StoreMetaData & data,const DBPassword & password)128 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
129 {
130 DBOption dbOption;
131 dbOption.createIfNecessary = false;
132 dbOption.isMemoryDb = false;
133 dbOption.isEncryptedDb = data.isEncrypt;
134 dbOption.isNeedCompressOnSync = data.isNeedCompress;
135 if (data.isEncrypt) {
136 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
137 dbOption.passwd = password;
138 }
139 StoreMetaDataLocal local;
140 MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
141 if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
142 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
143 } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
144 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
145 }
146 if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
147 dbOption.compressionRate = META_COMPRESS_RATE;
148 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
149 } else {
150 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
151 }
152 dbOption.schema = data.schema;
153 dbOption.createDirByStoreIdOnly = true;
154 dbOption.secOption = GetDBSecurity(data.securityLevel);
155 return dbOption;
156 }
157
KVDBGeneralStore(const StoreMetaData & meta)158 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
159 : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
160 meta.instanceId)
161 {
162 observer_.storeId_ = meta.storeId;
163 StoreMetaDataLocal local;
164 MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
165 isPublic_ = local.isPublic;
166 DBStatus status = DBStatus::NOT_FOUND;
167 manager_.SetKvStoreConfig({ meta.dataDir });
168 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
169 manager_.GetKvStore(
170 meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
171 status = dbStatus;
172 delegate_ = tmpStore;
173 });
174 if (delegate_ == nullptr || status != DBStatus::OK) {
175 ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
176 manager_.CloseKvStore(delegate_);
177 return;
178 }
179 SetDBPushDataInterceptor(meta.storeType);
180 SetDBReceiveDataInterceptor(meta.storeType);
181 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
182 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
183 if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
184 delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
185 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
186 });
187 }
188 if (meta.isAutoSync) {
189 bool param = true;
190 auto data = static_cast<DistributedDB::PragmaData>(¶m);
191 delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
192 }
193 storeInfo_.tokenId = meta.tokenId;
194 storeInfo_.bundleName = meta.bundleName;
195 storeInfo_.storeName = meta.storeId;
196 storeInfo_.instanceId = meta.instanceId;
197 storeInfo_.user = std::stoi(meta.user);
198 enableCloud_ = meta.enableCloud;
199 }
200
~KVDBGeneralStore()201 KVDBGeneralStore::~KVDBGeneralStore()
202 {
203 {
204 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
205 if (delegate_ != nullptr) {
206 delegate_->UnRegisterObserver(&observer_);
207 }
208 manager_.CloseKvStore(delegate_);
209 delegate_ = nullptr;
210 }
211 for (auto &bindInfo_ : bindInfos_) {
212 if (bindInfo_.db_ != nullptr) {
213 bindInfo_.db_->Close();
214 }
215 }
216 bindInfos_.clear();
217 dbClouds_.clear();
218 }
219
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)220 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
221 {
222 return GenErr::E_NOT_SUPPORT;
223 }
224
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)225 int32_t KVDBGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
226 const CloudConfig &config)
227 {
228 if (bindInfos.empty()) {
229 ZLOGW("No cloudDB!");
230 return GeneralError::E_OK;
231 }
232
233 std::map<std::string, DataBaseSchema> schemas;
234 auto dbSchema = GetDBSchema(database);
235 for (auto &[userId, bindInfo] : bindInfos) {
236 if (bindInfo.db_ == nullptr) {
237 return GeneralError::E_INVALID_ARGS;
238 }
239
240 if (isBound_.exchange(true)) {
241 return GeneralError::E_OK;
242 }
243
244 dbClouds_.insert({ std::to_string(userId), std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
245 bindInfos_.insert(std::move(bindInfo));
246 schemas.insert({ std::to_string(userId), dbSchema });
247 }
248 DistributedDB::CloudSyncConfig dbConfig;
249 dbConfig.maxUploadCount = config.maxNumber;
250 dbConfig.maxUploadSize = config.maxSize;
251 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
252 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
253 if (delegate_ == nullptr) {
254 return GeneralError::E_ALREADY_CLOSED;
255 }
256 delegate_->SetCloudDB(dbClouds_);
257 delegate_->SetCloudDbSchema(std::move(schemas));
258 delegate_->SetCloudSyncConfig(dbConfig);
259 return GeneralError::E_OK;
260 }
261
IsBound()262 bool KVDBGeneralStore::IsBound()
263 {
264 return isBound_;
265 }
266
Close(bool isForce)267 int32_t KVDBGeneralStore::Close(bool isForce)
268 {
269 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
270 if (!lock) {
271 return GeneralError::E_BUSY;
272 }
273
274 if (delegate_ == nullptr) {
275 return GeneralError::E_OK;
276 }
277 if (!isForce && delegate_->GetTaskCount() > 0) {
278 return GeneralError::E_BUSY;
279 }
280 if (delegate_ != nullptr) {
281 delegate_->UnRegisterObserver(&observer_);
282 }
283 auto status = manager_.CloseKvStore(delegate_);
284 if (status != DBStatus::OK) {
285 return status;
286 }
287 delegate_ = nullptr;
288 return GeneralError::E_OK;
289 }
290
Execute(const std::string & table,const std::string & sql)291 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
292 {
293 return GeneralError::E_NOT_SUPPORT;
294 }
295
Insert(const std::string & table,VBuckets && values)296 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
297 {
298 return GeneralError::E_NOT_SUPPORT;
299 }
300
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)301 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
302 const std::string &whereSql, Values &&conditions)
303 {
304 return GeneralError::E_NOT_SUPPORT;
305 }
306
Delete(const std::string & table,const std::string & sql,Values && args)307 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
308 {
309 return GeneralError::E_NOT_SUPPORT;
310 }
311
Replace(const std::string & table,VBucket && value)312 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
313 {
314 return GeneralError::E_NOT_SUPPORT;
315 }
316
Query(const std::string & table,const std::string & sql,Values && args)317 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
318 __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
319 {
320 return { GeneralError::E_NOT_SUPPORT, nullptr };
321 }
322
Query(const std::string & table,GenQuery & query)323 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
324 {
325 return { GeneralError::E_NOT_SUPPORT, nullptr };
326 }
327
MergeMigratedData(const std::string & tableName,VBuckets && values)328 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
329 {
330 return GeneralError::E_NOT_SUPPORT;
331 }
332
CleanTrackerData(const std::string & tableName,int64_t cursor)333 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
334 {
335 return GeneralError::E_NOT_SUPPORT;
336 }
337
GetDBSyncCompleteCB(DetailAsync async)338 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
339 {
340 if (!async) {
341 return [](auto &) {};
342 }
343 return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
344 GenDetails details;
345 for (auto &[key, dbStatus] : status) {
346 auto &value = details[key];
347 value.progress = FINISHED;
348 value.code = GeneralError::E_OK;
349 if (dbStatus != DBStatus::OK) {
350 value.code = dbStatus;
351 }
352 }
353 async(details);
354 };
355 }
356
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)357 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
358 int64_t wait, const std::string &prepareTraceId)
359 {
360 DistributedDB::CloudSyncOption syncOption;
361 syncOption.devices = devices;
362 syncOption.mode = cloudSyncMode;
363 syncOption.waitTime = wait;
364 syncOption.prepareTraceId = prepareTraceId;
365 syncOption.lockAction = DistributedDB::LockAction::NONE;
366 if (storeInfo_.user == 0) {
367 std::vector<int32_t> users;
368 AccountDelegate::GetInstance()->QueryUsers(users);
369 syncOption.users.push_back(std::to_string(users[0]));
370 } else {
371 syncOption.users.push_back(std::to_string(storeInfo_.user));
372 }
373 return delegate_->Sync(syncOption, GetDBProcessCB(async));
374 }
375
Sync(const Devices & devices,GenQuery & query,DetailAsync async,SyncParam & syncParam)376 int32_t KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParam)
377 {
378 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
379 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
380 if (delegate_ == nullptr) {
381 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
382 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
383 return GeneralError::E_ALREADY_CLOSED;
384 }
385 DBStatus dbStatus;
386 auto dbMode = DistributedDB::SyncMode(syncMode);
387 if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
388 if (!enableCloud_) {
389 return GeneralError::E_NOT_SUPPORT;
390 }
391 dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
392 } else {
393 if (devices.empty()) {
394 ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
395 return GeneralError::E_INVALID_ARGS;
396 }
397 KVDBQuery *kvQuery = nullptr;
398 auto ret = query.QueryInterface(kvQuery);
399 DistributedDB::Query dbQuery;
400 if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
401 dbQuery = kvQuery->GetDBQuery();
402 } else {
403 return GeneralError::E_INVALID_ARGS;
404 }
405 if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
406 dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
407 } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
408 dbStatus =
409 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
410 } else if (syncMode < NEARBY_END) {
411 if (kvQuery->IsEmpty()) {
412 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
413 } else {
414 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
415 }
416 } else {
417 ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
418 dbStatus = DistributedDB::INVALID_ARGS;
419 }
420 }
421 return ConvertStatus(dbStatus);
422 }
423
SetEqualIdentifier(const std::string & appId,const std::string & storeId)424 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId)
425 {
426 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
427 if (delegate_ == nullptr) {
428 ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
429 Anonymous::Change(storeId).c_str());
430 return;
431 }
432 std::vector<std::string> sameAccountDevs {};
433 std::vector<std::string> defaultAccountDevs {};
434 auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
435 GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
436 GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
437 if (!sameAccountDevs.empty()) {
438 auto accountId = AccountDelegate::GetInstance()->GetUnencryptedAccountId();
439 auto syncIdentifier = KvManager::GetKvStoreIdentifier(accountId, appId, storeId);
440 ZLOGI("same account set compatible identifier store:%{public}s, user:%{public}s, device:%{public}.10s",
441 Anonymous::Change(storeId).c_str(), Anonymous::Change(accountId).c_str(),
442 DistributedData::Serializable::Marshall(sameAccountDevs).c_str());
443 delegate_->SetEqualIdentifier(syncIdentifier, sameAccountDevs);
444 }
445 if (!defaultAccountDevs.empty()) {
446 auto syncIdentifier = KvManager::GetKvStoreIdentifier(defaultAccountId, appId, storeId);
447 ZLOGI("no account set compatible identifier store:%{public}s, device:%{public}.10s",
448 Anonymous::Change(storeId).c_str(),
449 DistributedData::Serializable::Marshall(defaultAccountDevs).c_str());
450 delegate_->SetEqualIdentifier(syncIdentifier, defaultAccountDevs);
451 }
452 }
453
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)454 void KVDBGeneralStore::GetIdentifierParams(std::vector<std::string> &devices,
455 const std::vector<std::string> &uuids, int32_t authType)
456 {
457 for (const auto &devId : uuids) {
458 if (DMAdapter::GetInstance().IsOHOSType(devId)) {
459 continue;
460 }
461 if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
462 continue;
463 }
464 devices.push_back(devId);
465 }
466 ZLOGI("devices size: %{public}zu", devices.size());
467 }
468
PreSharing(GenQuery & query)469 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
470 {
471 return { GeneralError::E_NOT_SUPPORT, nullptr };
472 }
473
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)474 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
475 {
476 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
477 return GeneralError::E_INVALID_ARGS;
478 }
479 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
480 if (delegate_ == nullptr) {
481 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
482 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
483 return GeneralError::E_ALREADY_CLOSED;
484 }
485 DBStatus status = OK;
486 switch (mode) {
487 case CLOUD_INFO:
488 status = delegate_->RemoveDeviceData(
489 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
490 break;
491 case CLOUD_DATA:
492 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
493 break;
494 case NEARBY_DATA:
495 if (devices.empty()) {
496 status = delegate_->RemoveDeviceData();
497 break;
498 }
499 for (auto device : devices) {
500 status = delegate_->RemoveDeviceData(device);
501 }
502 break;
503 default:
504 return GeneralError::E_ERROR;
505 }
506 return ConvertStatus(status);
507 }
508
Watch(int32_t origin,Watcher & watcher)509 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
510 {
511 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
512 return GeneralError::E_INVALID_ARGS;
513 }
514
515 observer_.watcher_ = &watcher;
516 return GeneralError::E_OK;
517 }
518
Unwatch(int32_t origin,Watcher & watcher)519 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
520 {
521 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
522 return GeneralError::E_INVALID_ARGS;
523 }
524
525 observer_.watcher_ = nullptr;
526 return GeneralError::E_OK;
527 }
528
Release()529 int32_t KVDBGeneralStore::Release()
530 {
531 auto ref = 1;
532 {
533 std::lock_guard<decltype(mutex_)> lock(mutex_);
534 if (ref_ == 0) {
535 return 0;
536 }
537 ref = --ref_;
538 }
539 ZLOGD("ref:%{public}d", ref);
540 if (ref == 0) {
541 delete this;
542 }
543 return ref;
544 }
545
AddRef()546 int32_t KVDBGeneralStore::AddRef()
547 {
548 std::lock_guard<decltype(mutex_)> lock(mutex_);
549 if (ref_ == 0) {
550 return 0;
551 }
552 return ++ref_;
553 }
554
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)555 int32_t KVDBGeneralStore::SetDistributedTables(
556 const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
557 {
558 return GeneralError::E_OK;
559 }
560
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)561 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
562 const std::string &extendColName, bool isForceUpgrade)
563 {
564 return GeneralError::E_OK;
565 }
566
ConvertStatus(DistributedDB::DBStatus status)567 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
568 {
569 auto it = dbStatusMap_.find(status);
570 if (it != dbStatusMap_.end()) {
571 return it->second;
572 }
573 ZLOGI("status:0x%{public}x", status);
574 return GenErr::E_ERROR;
575 }
576
IsValid()577 bool KVDBGeneralStore::IsValid()
578 {
579 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
580 return delegate_ != nullptr;
581 }
582
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)583 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
584 {
585 return GenErr::E_OK;
586 }
587
UnregisterDetailProgressObserver()588 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
589 {
590 return GenErr::E_OK;
591 }
592
GetWaterVersion(const std::string & deviceId)593 std::vector<std::string> KVDBGeneralStore::GetWaterVersion(const std::string &deviceId)
594 {
595 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
596 if (delegate_ == nullptr) {
597 ZLOGE("store already closed! deviceId:%{public}s", Anonymous::Change(deviceId).c_str());
598 return {};
599 }
600 auto [status, versions] = delegate_->GetCloudVersion(deviceId);
601 if (status != DBStatus::OK || versions.empty()) {
602 return {};
603 }
604 std::vector<std::string> res;
605 for (auto &[_, version] : versions) {
606 res.push_back(std::move(version));
607 }
608 return res;
609 }
610
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)611 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
612 {
613 if (!HasWatcher()) {
614 return;
615 }
616 GenOrigin genOrigin;
617 genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
618 genOrigin.id.push_back(originalId);
619 genOrigin.store = storeId_;
620 Watcher::ChangeInfo changeInfo;
621 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
622 auto &info = changeInfo[storeId_][i];
623 for (auto &priData : data.primaryData[i]) {
624 Watcher::PRIValue value;
625 Convert(std::move(*(priData.begin())), value);
626 info.push_back(std::move(value));
627 }
628 }
629 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
630 }
631
OnChange(const DistributedDB::KvStoreChangedData & data)632 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
633 {
634 if (!HasWatcher()) {
635 return;
636 }
637 const auto &inserts = data.GetEntriesInserted();
638 const auto &deletes = data.GetEntriesDeleted();
639 const auto &updates = data.GetEntriesUpdated();
640 Watcher::ChangeData changeData;
641 ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
642 ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
643 ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
644 GenOrigin genOrigin;
645 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
646 genOrigin.store = storeId_;
647
648 watcher_->OnChange(genOrigin, {}, std::move(changeData));
649 }
650
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)651 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
652 {
653 for (auto &entry : entries) {
654 auto value = std::vector<Value>{ entry.key, entry.value };
655 values.push_back(value);
656 }
657 }
658
GetDBProcessCB(DetailAsync async)659 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
660 {
661 return [async](const std::map<std::string, SyncProcess> &processes) {
662 if (!async) {
663 return;
664 }
665 DistributedData::GenDetails details;
666 for (auto &[id, process] : processes) {
667 auto &detail = details[id];
668 detail.progress = process.process;
669 detail.code = ConvertStatus(process.errCode);
670 detail.dbCode = DB_ERR_OFFSET + process.errCode;
671 for (auto [key, value] : process.tableProcess) {
672 auto &table = detail.details[key];
673 table.upload.total = value.upLoadInfo.total;
674 table.upload.success = value.upLoadInfo.successCount;
675 table.upload.failed = value.upLoadInfo.failCount;
676 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
677 table.download.total = value.downLoadInfo.total;
678 table.download.success = value.downLoadInfo.successCount;
679 table.download.failed = value.downLoadInfo.failCount;
680 table.download.untreated = table.download.total - table.download.success - table.download.failed;
681 detail.changeCount = (process.process == FINISHED)
682 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
683 value.downLoadInfo.deleteCount
684 : 0;
685 }
686 }
687 if (async) {
688 async(details);
689 }
690 };
691 }
692
SetDBPushDataInterceptor(int32_t storeType)693 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
694 {
695 delegate_->SetPushDataInterceptor(
696 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
697 int errCode = DBStatus::OK;
698 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
699 return errCode;
700 }
701 if (targetID.empty()) {
702 ZLOGE("targetID empty");
703 return static_cast<int>(DBStatus::DB_ERROR);
704 }
705 auto entries = data.GetEntries();
706 for (size_t i = 0; i < entries.size(); i++) {
707 if (entries[i].key.empty()) {
708 continue;
709 }
710 auto oriKey = entries[i].key;
711 auto newKey = GetNewKey(oriKey, sourceID);
712 errCode = data.ModifyKey(i, newKey);
713 if (errCode != DBStatus::OK) {
714 ZLOGE("ModifyKey err: %{public}d", errCode);
715 break;
716 }
717 }
718 return errCode;
719 }
720 );
721 }
722
SetDBReceiveDataInterceptor(int32_t storeType)723 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
724 {
725 delegate_->SetReceiveDataInterceptor(
726 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
727 int errCode = DBStatus::OK;
728 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
729 return errCode;
730 }
731 if (sourceID.empty()) {
732 ZLOGE("targetID empty");
733 return static_cast<int>(DBStatus::DB_ERROR);
734 }
735 auto entries = data.GetEntries();
736 for (size_t i = 0; i < entries.size(); i++) {
737 if (entries[i].key.empty()) {
738 continue;
739 }
740
741 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
742 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
743 if (encyptedUuid.empty()) {
744 ZLOGE("get encyptedUuid failed");
745 continue;
746 }
747
748 auto oriKey = entries[i].key;
749 auto newKey = GetNewKey(oriKey, encyptedUuid);
750 errCode = data.ModifyKey(i, newKey);
751 if (errCode != DBStatus::OK) {
752 ZLOGE("ModifyKey err: %{public}d", errCode);
753 break;
754 }
755 }
756 return errCode;
757 }
758 );
759 }
760
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)761 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
762 {
763 uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
764 remoteLen = le32toh(remoteLen);
765 uint32_t uuidLen = uuid.size();
766
767 std::vector<uint8_t> out;
768 std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
769 out.insert(out.end(), uuid.begin(), uuid.end());
770 out.insert(out.end(), oriKey.begin(), oriKey.end());
771 uuidLen = htole32(uuidLen);
772 uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
773 out.insert(out.end(), buf, buf + sizeof(uuidLen));
774 return out;
775 }
776
SetConfig(const GeneralStore::StoreConfig & storeConfig)777 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
778 {
779 enableCloud_ = storeConfig.enableCloud_;
780 }
781
LockCloudDB()782 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
783 {
784 return { GeneralError::E_NOT_SUPPORT, 0 };
785 }
786
UnLockCloudDB()787 int32_t KVDBGeneralStore::UnLockCloudDB()
788 {
789 return GeneralError::E_NOT_SUPPORT;
790 }
791
SetExecutor(std::shared_ptr<Executor> executor)792 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
793 {
794 return;
795 }
796 } // namespace OHOS::DistributedKv
797