1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17
18 #include <endian.h>
19
20 #include "app_id_mapping/app_id_mapping_config_manager.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_sync_finished_event.h"
24 #include "cloud/schema_meta.h"
25 #include "crypto/crypto_manager.h"
26 #include "device_manager_adapter.h"
27 #include "device_matrix.h"
28 #include "dfx/dfx_types.h"
29 #include "dfx/reporter.h"
30 #include "directory/directory_manager.h"
31 #include "eventcenter/event_center.h"
32 #include "kvdb_query.h"
33 #include "log_print.h"
34 #include "metadata/meta_data_manager.h"
35 #include "metadata/secret_key_meta_data.h"
36 #include "metadata/store_meta_data_local.h"
37 #include "query_helper.h"
38 #include "rdb_cloud.h"
39 #include "snapshot/bind_event.h"
40 #include "types.h"
41 #include "user_delegate.h"
42 #include "utils/anonymous.h"
43
44 namespace OHOS::DistributedKv {
45 using namespace DistributedData;
46 using namespace DistributedDB;
47 using namespace DistributedDataDfx;
48 using DBField = DistributedDB::Field;
49 using DBTable = DistributedDB::TableSchema;
50 using DBSchema = DistributedDB::DataBaseSchema;
51 using ClearMode = DistributedDB::ClearMode;
52 using DMAdapter = DistributedData::DeviceManagerAdapter;
53 using DBInterceptedData = DistributedDB::InterceptedData;
54 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
55 constexpr int UUID_WIDTH = 4;
56 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
57 { DBStatus::OK, GenErr::E_OK },
58 { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
59 { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
60 { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
61 { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
62 { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
63 { DBStatus::BUSY, GenErr::E_DB_ERROR },
64 { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
65 { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
66 { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
67 { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
68 { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
69 { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
70 { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
71 { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
72 { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
73 { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
74 { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
75 { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
76 { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
77 { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
78 };
79
80 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)81 static DBSchema GetDBSchema(const Database &database)
82 {
83 DBSchema schema;
84 schema.tables.resize(database.tables.size());
85 for (size_t i = 0; i < database.tables.size(); i++) {
86 const Table &table = database.tables[i];
87 DBTable &dbTable = schema.tables[i];
88 dbTable.name = table.name;
89 dbTable.sharedTableName = table.sharedTableName;
90 for (auto &field : table.fields) {
91 DBField dbField;
92 dbField.colName = field.colName;
93 dbField.type = field.type;
94 dbField.primary = field.primary;
95 dbField.nullable = field.nullable;
96 dbTable.fields.push_back(std::move(dbField));
97 }
98 }
99 return schema;
100 }
101
GetDBPassword(const StoreMetaData & data)102 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
103 {
104 DBPassword dbPassword;
105 if (!data.isEncrypt) {
106 return dbPassword;
107 }
108 SecretKeyMetaData secretKey;
109 auto metaKey = data.GetSecretKey();
110 if (!MetaDataManager::GetInstance().LoadMeta(metaKey, secretKey, true) || secretKey.sKey.empty()) {
111 return dbPassword;
112 }
113 CryptoManager::CryptoParams decryptParams = { .area = secretKey.area, .userId = data.user,
114 .nonce = secretKey.nonce };
115 auto password = CryptoManager::GetInstance().Decrypt(secretKey.sKey, decryptParams);
116 if (password.empty()) {
117 return dbPassword;
118 }
119 // update secret key of area or nonce
120 CryptoManager::GetInstance().UpdateSecretMeta(password, data, metaKey, secretKey);
121 dbPassword.SetValue(password.data(), password.size());
122 password.assign(password.size(), 0);
123 return dbPassword;
124 }
125
GetDBSecurity(int32_t secLevel)126 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
127 {
128 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
129 return { DistributedDB::NOT_SET, DistributedDB::ECE };
130 }
131 if (secLevel == SecurityLevel::S3) {
132 return { DistributedDB::S3, DistributedDB::SECE };
133 }
134 if (secLevel == SecurityLevel::S4) {
135 return { DistributedDB::S4, DistributedDB::ECE };
136 }
137 return { secLevel, DistributedDB::ECE };
138 }
139
GetDBOption(const StoreMetaData & data,const DBPassword & password)140 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
141 {
142 DBOption dbOption;
143 dbOption.createIfNecessary = false;
144 dbOption.isMemoryDb = false;
145 dbOption.isEncryptedDb = data.isEncrypt;
146 dbOption.isNeedCompressOnSync = data.isNeedCompress;
147 if (data.isEncrypt) {
148 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
149 dbOption.passwd = password;
150 }
151 StoreMetaDataLocal local;
152 MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
153 if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
154 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
155 } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
156 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
157 }
158 if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
159 dbOption.compressionRate = META_COMPRESS_RATE;
160 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
161 } else {
162 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
163 }
164 dbOption.schema = data.schema;
165 dbOption.createDirByStoreIdOnly = true;
166 dbOption.secOption = GetDBSecurity(data.securityLevel);
167 return dbOption;
168 }
169
KVDBGeneralStore(const StoreMetaData & meta)170 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
171 : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
172 meta.instanceId)
173 {
174 observer_.storeId_ = meta.storeId;
175 StoreMetaDataLocal local;
176 MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
177 isPublic_ = local.isPublic;
178 DBStatus status = DBStatus::NOT_FOUND;
179 manager_.SetKvStoreConfig({ meta.dataDir });
180 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
181 manager_.GetKvStore(
182 meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
183 status = dbStatus;
184 delegate_ = tmpStore;
185 });
186 if (delegate_ == nullptr || status != DBStatus::OK) {
187 ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
188 manager_.CloseKvStore(delegate_);
189 return;
190 }
191 SetDBPushDataInterceptor(meta.storeType);
192 SetDBReceiveDataInterceptor(meta.storeType);
193 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
194 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
195 if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
196 delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
197 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
198 });
199 }
200 if (meta.isAutoSync) {
201 bool param = true;
202 auto data = static_cast<DistributedDB::PragmaData>(¶m);
203 delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
204 }
205 storeInfo_.tokenId = meta.tokenId;
206 storeInfo_.bundleName = meta.bundleName;
207 storeInfo_.storeName = meta.storeId;
208 storeInfo_.instanceId = meta.instanceId;
209 storeInfo_.user = std::atoi(meta.user.c_str());
210 enableCloud_ = meta.enableCloud;
211 }
212
~KVDBGeneralStore()213 KVDBGeneralStore::~KVDBGeneralStore()
214 {
215 {
216 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
217 if (delegate_ != nullptr) {
218 delegate_->UnRegisterObserver(&observer_);
219 }
220 manager_.CloseKvStore(delegate_);
221 delegate_ = nullptr;
222 }
223 {
224 std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
225 for (auto &[userId, bindInfo] : bindInfos_) {
226 if (bindInfo.db_ != nullptr) {
227 bindInfo.db_->Close();
228 }
229 }
230 bindInfos_.clear();
231 }
232 }
233
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)234 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
235 {
236 return GenErr::E_NOT_SUPPORT;
237 }
238
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)239 int32_t KVDBGeneralStore::Bind(
240 const Database &database, const std::map<uint32_t, BindInfo> &bindInfos, const CloudConfig &config)
241 {
242 if (bindInfos.empty()) {
243 ZLOGW("No cloudDB!");
244 return GeneralError::E_OK;
245 }
246 std::map<std::string, DataBaseSchema> schemas;
247 std::map<std::string, std::shared_ptr<DistributedDB::ICloudDb>> dbClouds{};
248 auto dbSchema = GetDBSchema(database);
249 {
250 std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
251 for (auto &[userId, bindInfo] : bindInfos) {
252 if (bindInfo.db_ == nullptr) {
253 return GeneralError::E_INVALID_ARGS;
254 }
255 dbClouds.insert({ std::to_string(userId),
256 std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
257 bindInfos_.insert(std::make_pair(userId, std::move(bindInfo)));
258 schemas.insert({ std::to_string(userId), dbSchema });
259 }
260 }
261 DistributedDB::CloudSyncConfig dbConfig;
262 dbConfig.maxUploadCount = config.maxNumber;
263 dbConfig.maxUploadSize = config.maxSize;
264 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
265 dbConfig.isSupportEncrypt = config.isSupportEncrypt;
266 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
267 if (delegate_ == nullptr) {
268 return GeneralError::E_ALREADY_CLOSED;
269 }
270 delegate_->SetCloudDB(std::move(dbClouds));
271 delegate_->SetCloudDbSchema(std::move(schemas));
272 delegate_->SetCloudSyncConfig(dbConfig);
273 return GeneralError::E_OK;
274 }
275
IsBound(uint32_t user)276 bool KVDBGeneralStore::IsBound(uint32_t user)
277 {
278 std::shared_lock<std::shared_mutex> lock(bindMutex_);
279 return bindInfos_.find(user) != bindInfos_.end();
280 }
281
Close(bool isForce)282 int32_t KVDBGeneralStore::Close(bool isForce)
283 {
284 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
285 if (!lock) {
286 return GeneralError::E_BUSY;
287 }
288
289 if (delegate_ == nullptr) {
290 return GeneralError::E_OK;
291 }
292 if (!isForce && delegate_->GetTaskCount() > 0) {
293 return GeneralError::E_BUSY;
294 }
295 if (delegate_ != nullptr) {
296 delegate_->UnRegisterObserver(&observer_);
297 }
298 auto status = manager_.CloseKvStore(delegate_);
299 if (status != DBStatus::OK) {
300 return status;
301 }
302 delegate_ = nullptr;
303 return GeneralError::E_OK;
304 }
305
Execute(const std::string & table,const std::string & sql)306 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
307 {
308 return GeneralError::E_NOT_SUPPORT;
309 }
310
Insert(const std::string & table,VBuckets && values)311 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
312 {
313 return GeneralError::E_NOT_SUPPORT;
314 }
315
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)316 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
317 const std::string &whereSql, Values &&conditions)
318 {
319 return GeneralError::E_NOT_SUPPORT;
320 }
321
Delete(const std::string & table,const std::string & sql,Values && args)322 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
323 {
324 return GeneralError::E_NOT_SUPPORT;
325 }
326
Replace(const std::string & table,VBucket && value)327 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
328 {
329 return GeneralError::E_NOT_SUPPORT;
330 }
331
Query(const std::string & table,const std::string & sql,Values && args)332 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
333 __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
334 {
335 return { GeneralError::E_NOT_SUPPORT, nullptr };
336 }
337
Query(const std::string & table,GenQuery & query)338 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
339 {
340 return { GeneralError::E_NOT_SUPPORT, nullptr };
341 }
342
MergeMigratedData(const std::string & tableName,VBuckets && values)343 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
344 {
345 return GeneralError::E_NOT_SUPPORT;
346 }
347
CleanTrackerData(const std::string & tableName,int64_t cursor)348 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
349 {
350 return GeneralError::E_NOT_SUPPORT;
351 }
352
GetDBSyncCompleteCB(DetailAsync async)353 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
354 {
355 if (!async) {
356 return [](auto &) {};
357 }
358 return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
359 GenDetails details;
360 for (auto &[key, dbStatus] : status) {
361 auto &value = details[key];
362 value.progress = FINISHED;
363 value.code = GeneralError::E_OK;
364 if (dbStatus != DBStatus::OK) {
365 value.code = dbStatus;
366 }
367 }
368 async(details);
369 };
370 }
371
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)372 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
373 int64_t wait, const std::string &prepareTraceId)
374 {
375 DistributedDB::CloudSyncOption syncOption;
376 syncOption.devices = devices;
377 syncOption.mode = cloudSyncMode;
378 syncOption.waitTime = wait;
379 syncOption.prepareTraceId = prepareTraceId;
380 syncOption.lockAction = DistributedDB::LockAction::NONE;
381 if (storeInfo_.user == 0) {
382 std::vector<int32_t> users;
383 auto ret = AccountDelegate::GetInstance()->QueryUsers(users);
384 if (!ret || users.empty()) {
385 ZLOGE("failed to query os accounts, ret:%{public}d", ret);
386 return DBStatus::DB_ERROR;
387 }
388 syncOption.users.push_back(std::to_string(users[0]));
389 } else {
390 syncOption.users.push_back(std::to_string(storeInfo_.user));
391 }
392 return delegate_->Sync(syncOption, GetDBProcessCB(async));
393 }
394
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)395 void KVDBGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
396 {
397 ArkDataFaultMsg msg = { .faultType = faultType,
398 .bundleName = storeInfo_.bundleName,
399 .moduleName = ModuleName::KV_STORE,
400 .storeName = storeInfo_.storeName,
401 .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
402 .appendixMsg = appendix };
403 Reporter::GetInstance()->CloudSyncFault()->Report(msg);
404 }
405
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)406 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
407 const SyncParam &syncParam)
408 {
409 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
410 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
411 if (delegate_ == nullptr) {
412 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
413 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
414 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
415 }
416 DBStatus dbStatus;
417 auto dbMode = DistributedDB::SyncMode(syncMode);
418 if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
419 if (!enableCloud_) {
420 return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
421 }
422 dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
423 if (dbStatus != DBStatus::OK) {
424 Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
425 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
426 }
427 } else {
428 if (devices.empty()) {
429 ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
430 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
431 }
432 KVDBQuery *kvQuery = nullptr;
433 auto ret = query.QueryInterface(kvQuery);
434 DistributedDB::Query dbQuery;
435 if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
436 dbQuery = kvQuery->GetDBQuery();
437 } else {
438 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
439 }
440 if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
441 dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
442 } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
443 dbStatus =
444 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
445 } else if (syncMode < NEARBY_END) {
446 DeviceSyncOption syncOption = { .devices = devices, .mode = dbMode, .query = dbQuery, .isWait = false,
447 .isRetry = syncParam.isRetry };
448 dbStatus = delegate_->Sync(syncOption, GetDBSyncCompleteCB(std::move(async)));
449 } else {
450 ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
451 dbStatus = DistributedDB::INVALID_ARGS;
452 }
453 }
454 return { ConvertStatus(dbStatus), dbStatus };
455 }
456
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)457 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
458 {
459 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
460 if (delegate_ == nullptr) {
461 ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
462 Anonymous::Change(storeId).c_str());
463 return;
464 }
465 std::vector<std::string> sameAccountDevs{};
466 std::vector<std::string> defaultAccountDevs{};
467 auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
468 if (uuids.empty()) {
469 ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
470 return;
471 }
472 GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
473 GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
474 if (!sameAccountDevs.empty()) {
475 auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
476 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
477 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
478 ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}s, appId:%{public}s",
479 Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
480 Anonymous::Change(DistributedData::Serializable::Marshall(sameAccountDevs)).c_str(),
481 convertedIds.first.c_str());
482 delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
483 }
484 if (!defaultAccountDevs.empty()) {
485 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
486 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
487 ZLOGI("no account store:%{public}s, device:%{public}s, appId:%{public}s", Anonymous::Change(storeId).c_str(),
488 Anonymous::Change(DistributedData::Serializable::Marshall(defaultAccountDevs)).c_str(),
489 convertedIds.first.c_str());
490 delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
491 }
492 }
493
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)494 void KVDBGeneralStore::GetIdentifierParams(
495 std::vector<std::string> &devices, const std::vector<std::string> &uuids, int32_t authType)
496 {
497 for (const auto &devId : uuids) {
498 if (DMAdapter::GetInstance().IsOHOSType(devId)) {
499 continue;
500 }
501 if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
502 continue;
503 }
504 devices.push_back(devId);
505 }
506 ZLOGI("devices size: %{public}zu", devices.size());
507 }
508
PreSharing(GenQuery & query)509 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
510 {
511 return { GeneralError::E_NOT_SUPPORT, nullptr };
512 }
513
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)514 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
515 {
516 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
517 return GeneralError::E_INVALID_ARGS;
518 }
519 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
520 if (delegate_ == nullptr) {
521 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
522 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
523 return GeneralError::E_ALREADY_CLOSED;
524 }
525 DBStatus status = OK;
526 ClearKvMetaDataOption option;
527 switch (mode) {
528 case CLOUD_INFO:
529 status = delegate_->RemoveDeviceData(
530 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
531 break;
532 case CLOUD_DATA:
533 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
534 break;
535 case CLEAN_WATER:
536 option.type = ClearKvMetaOpType::CLEAN_CLOUD_WATERMARK;
537 status = delegate_->ClearMetaData(option);
538 break;
539 case NEARBY_DATA:
540 if (devices.empty()) {
541 status = delegate_->RemoveDeviceData();
542 break;
543 }
544 for (auto device : devices) {
545 status = delegate_->RemoveDeviceData(device);
546 }
547 break;
548 default:
549 return GeneralError::E_ERROR;
550 }
551 return ConvertStatus(status);
552 }
553
Watch(int32_t origin,Watcher & watcher)554 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
555 {
556 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
557 return GeneralError::E_INVALID_ARGS;
558 }
559
560 observer_.watcher_ = &watcher;
561 return GeneralError::E_OK;
562 }
563
Unwatch(int32_t origin,Watcher & watcher)564 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
565 {
566 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
567 return GeneralError::E_INVALID_ARGS;
568 }
569
570 observer_.watcher_ = nullptr;
571 return GeneralError::E_OK;
572 }
573
Release()574 int32_t KVDBGeneralStore::Release()
575 {
576 auto ref = 1;
577 {
578 std::lock_guard<decltype(mutex_)> lock(mutex_);
579 if (ref_ == 0) {
580 return 0;
581 }
582 ref = --ref_;
583 }
584 ZLOGD("ref:%{public}d", ref);
585 if (ref == 0) {
586 delete this;
587 }
588 return ref;
589 }
590
AddRef()591 int32_t KVDBGeneralStore::AddRef()
592 {
593 std::lock_guard<decltype(mutex_)> lock(mutex_);
594 if (ref_ == 0) {
595 return 0;
596 }
597 return ++ref_;
598 }
599
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)600 int32_t KVDBGeneralStore::SetDistributedTables(
601 const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
602 {
603 return GeneralError::E_OK;
604 }
605
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)606 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
607 const std::set<std::string> &extendColNames, bool isForceUpgrade)
608 {
609 return GeneralError::E_OK;
610 }
611
ConvertStatus(DistributedDB::DBStatus status)612 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
613 {
614 auto it = dbStatusMap_.find(status);
615 if (it != dbStatusMap_.end()) {
616 return it->second;
617 }
618 ZLOGI("status:0x%{public}x", status);
619 return GenErr::E_ERROR;
620 }
621
IsValid()622 bool KVDBGeneralStore::IsValid()
623 {
624 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
625 return delegate_ != nullptr;
626 }
627
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)628 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
629 {
630 return GenErr::E_OK;
631 }
632
UnregisterDetailProgressObserver()633 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
634 {
635 return GenErr::E_OK;
636 }
637
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)638 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
639 {
640 if (!HasWatcher()) {
641 return;
642 }
643 GenOrigin genOrigin;
644 genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
645 genOrigin.id.push_back(originalId);
646 genOrigin.store = storeId_;
647 Watcher::ChangeInfo changeInfo;
648 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
649 auto &info = changeInfo[storeId_][i];
650 for (auto &priData : data.primaryData[i]) {
651 Watcher::PRIValue value;
652 if (priData.empty()) {
653 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
654 Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
655 Anonymous::Change(originalId).c_str(), i);
656 continue;
657 }
658 Convert(std::move(*(priData.begin())), value);
659 info.push_back(std::move(value));
660 }
661 }
662 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
663 }
664
OnChange(const DistributedDB::KvStoreChangedData & data)665 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
666 {
667 if (!HasWatcher()) {
668 return;
669 }
670 const auto &inserts = data.GetEntriesInserted();
671 const auto &deletes = data.GetEntriesDeleted();
672 const auto &updates = data.GetEntriesUpdated();
673 Watcher::ChangeData changeData;
674 ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
675 ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
676 ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
677 GenOrigin genOrigin;
678 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
679 genOrigin.store = storeId_;
680
681 watcher_->OnChange(genOrigin, {}, std::move(changeData));
682 }
683
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)684 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
685 {
686 for (auto &entry : entries) {
687 auto value = std::vector<Value>{ entry.key, entry.value };
688 values.push_back(value);
689 }
690 }
691
GetDBProcessCB(DetailAsync async)692 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
693 {
694 return [async](const std::map<std::string, SyncProcess> &processes) {
695 if (!async) {
696 return;
697 }
698 DistributedData::GenDetails details;
699 for (auto &[id, process] : processes) {
700 auto &detail = details[id];
701 detail.progress = process.process;
702 detail.code = ConvertStatus(process.errCode);
703 detail.dbCode = process.errCode;
704 for (auto [key, value] : process.tableProcess) {
705 auto &table = detail.details[key];
706 table.upload.total = value.upLoadInfo.total;
707 table.upload.success = value.upLoadInfo.successCount;
708 table.upload.failed = value.upLoadInfo.failCount;
709 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
710 table.download.total = value.downLoadInfo.total;
711 table.download.success = value.downLoadInfo.successCount;
712 table.download.failed = value.downLoadInfo.failCount;
713 table.download.untreated = table.download.total - table.download.success - table.download.failed;
714 detail.changeCount = (process.process == FINISHED)
715 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
716 value.downLoadInfo.deleteCount
717 : 0;
718 }
719 }
720 if (async) {
721 async(details);
722 }
723 };
724 }
725
SetDBPushDataInterceptor(int32_t storeType)726 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
727 {
728 delegate_->SetPushDataInterceptor(
729 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
730 int errCode = DBStatus::OK;
731 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
732 return errCode;
733 }
734 if (targetID.empty()) {
735 ZLOGE("targetID empty");
736 return static_cast<int>(DBStatus::DB_ERROR);
737 }
738 auto entries = data.GetEntries();
739 for (size_t i = 0; i < entries.size(); i++) {
740 if (entries[i].key.empty()) {
741 continue;
742 }
743 auto oriKey = entries[i].key;
744 auto newKey = GetNewKey(oriKey, sourceID);
745 errCode = data.ModifyKey(i, newKey);
746 if (errCode != DBStatus::OK) {
747 ZLOGE("ModifyKey err: %{public}d", errCode);
748 break;
749 }
750 }
751 return errCode;
752 });
753 }
754
SetDBReceiveDataInterceptor(int32_t storeType)755 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
756 {
757 delegate_->SetReceiveDataInterceptor(
758 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
759 int errCode = DBStatus::OK;
760 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
761 return errCode;
762 }
763 if (sourceID.empty()) {
764 ZLOGE("sourceID empty");
765 return static_cast<int>(DBStatus::DB_ERROR);
766 }
767 auto entries = data.GetEntries();
768 for (size_t i = 0; i < entries.size(); i++) {
769 if (entries[i].key.empty()) {
770 continue;
771 }
772
773 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
774 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
775 if (encyptedUuid.empty()) {
776 ZLOGE("get encyptedUuid failed");
777 continue;
778 }
779
780 auto oriKey = entries[i].key;
781 auto newKey = GetNewKey(oriKey, encyptedUuid);
782 errCode = data.ModifyKey(i, newKey);
783 if (errCode != DBStatus::OK) {
784 ZLOGE("ModifyKey err: %{public}d", errCode);
785 break;
786 }
787 }
788 return errCode;
789 });
790 }
791
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)792 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
793 {
794 uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
795 remoteLen = le32toh(remoteLen);
796 uint32_t uuidLen = uuid.size();
797
798 std::vector<uint8_t> out;
799 std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
800 out.insert(out.end(), uuid.begin(), uuid.end());
801 out.insert(out.end(), oriKey.begin(), oriKey.end());
802 uuidLen = htole32(uuidLen);
803 uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
804 out.insert(out.end(), buf, buf + sizeof(uuidLen));
805 return out;
806 }
807
SetConfig(const GeneralStore::StoreConfig & storeConfig)808 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
809 {
810 enableCloud_ = storeConfig.enableCloud_;
811 }
812
LockCloudDB()813 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
814 {
815 return { GeneralError::E_NOT_SUPPORT, 0 };
816 }
817
UnLockCloudDB()818 int32_t KVDBGeneralStore::UnLockCloudDB()
819 {
820 return GeneralError::E_NOT_SUPPORT;
821 }
822
SetExecutor(std::shared_ptr<Executor> executor)823 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
824 {
825 return;
826 }
827 } // namespace OHOS::DistributedKv