1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17
18 #include <endian.h>
19
20 #include "app_id_mapping/app_id_mapping_config_manager.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_sync_finished_event.h"
24 #include "cloud/schema_meta.h"
25 #include "crypto_manager.h"
26 #include "device_manager_adapter.h"
27 #include "device_matrix.h"
28 #include "dfx_types.h"
29 #include "directory/directory_manager.h"
30 #include "eventcenter/event_center.h"
31 #include "kvdb_query.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "metadata/secret_key_meta_data.h"
35 #include "metadata/store_meta_data_local.h"
36 #include "query_helper.h"
37 #include "rdb_cloud.h"
38 #include "reporter.h"
39 #include "snapshot/bind_event.h"
40 #include "types.h"
41 #include "user_delegate.h"
42 #include "utils/anonymous.h"
43
44 namespace OHOS::DistributedKv {
45 using namespace DistributedData;
46 using namespace DistributedDB;
47 using namespace DistributedDataDfx;
48 using DBField = DistributedDB::Field;
49 using DBTable = DistributedDB::TableSchema;
50 using DBSchema = DistributedDB::DataBaseSchema;
51 using ClearMode = DistributedDB::ClearMode;
52 using DMAdapter = DistributedData::DeviceManagerAdapter;
53 using DBInterceptedData = DistributedDB::InterceptedData;
54 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
55 constexpr int UUID_WIDTH = 4;
56 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
57 { DBStatus::OK, GenErr::E_OK },
58 { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
59 { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
60 { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
61 { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
62 { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
63 { DBStatus::BUSY, GenErr::E_DB_ERROR },
64 { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
65 { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
66 { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
67 { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
68 { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
69 { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
70 { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
71 { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
72 { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
73 { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
74 { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
75 { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
76 { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
77 { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
78 };
79
80 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)81 static DBSchema GetDBSchema(const Database &database)
82 {
83 DBSchema schema;
84 schema.tables.resize(database.tables.size());
85 for (size_t i = 0; i < database.tables.size(); i++) {
86 const Table &table = database.tables[i];
87 DBTable &dbTable = schema.tables[i];
88 dbTable.name = table.name;
89 dbTable.sharedTableName = table.sharedTableName;
90 for (auto &field : table.fields) {
91 DBField dbField;
92 dbField.colName = field.colName;
93 dbField.type = field.type;
94 dbField.primary = field.primary;
95 dbField.nullable = field.nullable;
96 dbTable.fields.push_back(std::move(dbField));
97 }
98 }
99 return schema;
100 }
GetDBPassword(const StoreMetaData & data)101 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
102 {
103 DBPassword dbPassword;
104 if (!data.isEncrypt) {
105 return dbPassword;
106 }
107
108 SecretKeyMetaData secretKey;
109 secretKey.storeType = data.storeType;
110 auto storeKey = data.GetSecretKey();
111 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
112 std::vector<uint8_t> password;
113 StoreMetaData metaData;
114 MetaDataManager::GetInstance().LoadMeta(data.GetKey(), metaData, true);
115 CryptoManager::GetInstance().Decrypt(metaData, secretKey, password);
116 dbPassword.SetValue(password.data(), password.size());
117 password.assign(password.size(), 0);
118 return dbPassword;
119 }
120
GetDBSecurity(int32_t secLevel)121 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
122 {
123 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
124 return { DistributedDB::NOT_SET, DistributedDB::ECE };
125 }
126 if (secLevel == SecurityLevel::S3) {
127 return { DistributedDB::S3, DistributedDB::SECE };
128 }
129 if (secLevel == SecurityLevel::S4) {
130 return { DistributedDB::S4, DistributedDB::ECE };
131 }
132 return { secLevel, DistributedDB::ECE };
133 }
134
GetDBOption(const StoreMetaData & data,const DBPassword & password)135 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
136 {
137 DBOption dbOption;
138 dbOption.createIfNecessary = false;
139 dbOption.isMemoryDb = false;
140 dbOption.isEncryptedDb = data.isEncrypt;
141 dbOption.isNeedCompressOnSync = data.isNeedCompress;
142 if (data.isEncrypt) {
143 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
144 dbOption.passwd = password;
145 }
146 StoreMetaDataLocal local;
147 MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
148 if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
149 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
150 } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
151 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
152 }
153 if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
154 dbOption.compressionRate = META_COMPRESS_RATE;
155 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
156 } else {
157 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
158 }
159 dbOption.schema = data.schema;
160 dbOption.createDirByStoreIdOnly = true;
161 dbOption.secOption = GetDBSecurity(data.securityLevel);
162 return dbOption;
163 }
164
KVDBGeneralStore(const StoreMetaData & meta)165 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
166 : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
167 meta.instanceId)
168 {
169 observer_.storeId_ = meta.storeId;
170 StoreMetaDataLocal local;
171 MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
172 isPublic_ = local.isPublic;
173 DBStatus status = DBStatus::NOT_FOUND;
174 manager_.SetKvStoreConfig({ meta.dataDir });
175 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
176 manager_.GetKvStore(
177 meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
178 status = dbStatus;
179 delegate_ = tmpStore;
180 });
181 if (delegate_ == nullptr || status != DBStatus::OK) {
182 ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
183 manager_.CloseKvStore(delegate_);
184 return;
185 }
186 SetDBPushDataInterceptor(meta.storeType);
187 SetDBReceiveDataInterceptor(meta.storeType);
188 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
189 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
190 if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
191 delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
192 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
193 });
194 }
195 if (meta.isAutoSync) {
196 bool param = true;
197 auto data = static_cast<DistributedDB::PragmaData>(¶m);
198 delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
199 }
200 storeInfo_.tokenId = meta.tokenId;
201 storeInfo_.bundleName = meta.bundleName;
202 storeInfo_.storeName = meta.storeId;
203 storeInfo_.instanceId = meta.instanceId;
204 storeInfo_.user = std::atoi(meta.user.c_str());
205 enableCloud_ = meta.enableCloud;
206 }
207
~KVDBGeneralStore()208 KVDBGeneralStore::~KVDBGeneralStore()
209 {
210 {
211 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
212 if (delegate_ != nullptr) {
213 delegate_->UnRegisterObserver(&observer_);
214 }
215 manager_.CloseKvStore(delegate_);
216 delegate_ = nullptr;
217 }
218 {
219 std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
220 for (auto &[userId, bindInfo] : bindInfos_) {
221 if (bindInfo.db_ != nullptr) {
222 bindInfo.db_->Close();
223 }
224 }
225 bindInfos_.clear();
226 }
227 }
228
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)229 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
230 {
231 return GenErr::E_NOT_SUPPORT;
232 }
233
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)234 int32_t KVDBGeneralStore::Bind(
235 const Database &database, const std::map<uint32_t, BindInfo> &bindInfos, const CloudConfig &config)
236 {
237 if (bindInfos.empty()) {
238 ZLOGW("No cloudDB!");
239 return GeneralError::E_OK;
240 }
241 std::map<std::string, DataBaseSchema> schemas;
242 std::map<std::string, std::shared_ptr<DistributedDB::ICloudDb>> dbClouds{};
243 auto dbSchema = GetDBSchema(database);
244 {
245 std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
246 for (auto &[userId, bindInfo] : bindInfos) {
247 if (bindInfo.db_ == nullptr) {
248 return GeneralError::E_INVALID_ARGS;
249 }
250 dbClouds.insert({ std::to_string(userId),
251 std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
252 bindInfos_.insert(std::make_pair(userId, std::move(bindInfo)));
253 schemas.insert({ std::to_string(userId), dbSchema });
254 }
255 }
256 DistributedDB::CloudSyncConfig dbConfig;
257 dbConfig.maxUploadCount = config.maxNumber;
258 dbConfig.maxUploadSize = config.maxSize;
259 dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
260 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
261 if (delegate_ == nullptr) {
262 return GeneralError::E_ALREADY_CLOSED;
263 }
264 delegate_->SetCloudDB(std::move(dbClouds));
265 delegate_->SetCloudDbSchema(std::move(schemas));
266 delegate_->SetCloudSyncConfig(dbConfig);
267 return GeneralError::E_OK;
268 }
269
IsBound(uint32_t user)270 bool KVDBGeneralStore::IsBound(uint32_t user)
271 {
272 std::shared_lock<std::shared_mutex> lock(bindMutex_);
273 return bindInfos_.find(user) != bindInfos_.end();
274 }
275
Close(bool isForce)276 int32_t KVDBGeneralStore::Close(bool isForce)
277 {
278 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
279 if (!lock) {
280 return GeneralError::E_BUSY;
281 }
282
283 if (delegate_ == nullptr) {
284 return GeneralError::E_OK;
285 }
286 if (!isForce && delegate_->GetTaskCount() > 0) {
287 return GeneralError::E_BUSY;
288 }
289 if (delegate_ != nullptr) {
290 delegate_->UnRegisterObserver(&observer_);
291 }
292 auto status = manager_.CloseKvStore(delegate_);
293 if (status != DBStatus::OK) {
294 return status;
295 }
296 delegate_ = nullptr;
297 return GeneralError::E_OK;
298 }
299
Execute(const std::string & table,const std::string & sql)300 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
301 {
302 return GeneralError::E_NOT_SUPPORT;
303 }
304
Insert(const std::string & table,VBuckets && values)305 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
306 {
307 return GeneralError::E_NOT_SUPPORT;
308 }
309
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)310 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
311 const std::string &whereSql, Values &&conditions)
312 {
313 return GeneralError::E_NOT_SUPPORT;
314 }
315
Delete(const std::string & table,const std::string & sql,Values && args)316 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
317 {
318 return GeneralError::E_NOT_SUPPORT;
319 }
320
Replace(const std::string & table,VBucket && value)321 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
322 {
323 return GeneralError::E_NOT_SUPPORT;
324 }
325
Query(const std::string & table,const std::string & sql,Values && args)326 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
327 __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
328 {
329 return { GeneralError::E_NOT_SUPPORT, nullptr };
330 }
331
Query(const std::string & table,GenQuery & query)332 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
333 {
334 return { GeneralError::E_NOT_SUPPORT, nullptr };
335 }
336
MergeMigratedData(const std::string & tableName,VBuckets && values)337 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
338 {
339 return GeneralError::E_NOT_SUPPORT;
340 }
341
CleanTrackerData(const std::string & tableName,int64_t cursor)342 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
343 {
344 return GeneralError::E_NOT_SUPPORT;
345 }
346
GetDBSyncCompleteCB(DetailAsync async)347 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
348 {
349 if (!async) {
350 return [](auto &) {};
351 }
352 return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
353 GenDetails details;
354 for (auto &[key, dbStatus] : status) {
355 auto &value = details[key];
356 value.progress = FINISHED;
357 value.code = GeneralError::E_OK;
358 if (dbStatus != DBStatus::OK) {
359 value.code = dbStatus;
360 }
361 }
362 async(details);
363 };
364 }
365
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)366 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
367 int64_t wait, const std::string &prepareTraceId)
368 {
369 DistributedDB::CloudSyncOption syncOption;
370 syncOption.devices = devices;
371 syncOption.mode = cloudSyncMode;
372 syncOption.waitTime = wait;
373 syncOption.prepareTraceId = prepareTraceId;
374 syncOption.lockAction = DistributedDB::LockAction::NONE;
375 if (storeInfo_.user == 0) {
376 std::vector<int32_t> users;
377 AccountDelegate::GetInstance()->QueryUsers(users);
378 syncOption.users.push_back(std::to_string(users[0]));
379 } else {
380 syncOption.users.push_back(std::to_string(storeInfo_.user));
381 }
382 return delegate_->Sync(syncOption, GetDBProcessCB(async));
383 }
384
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)385 void KVDBGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
386 {
387 ArkDataFaultMsg msg = { .faultType = faultType,
388 .bundleName = storeInfo_.bundleName,
389 .moduleName = ModuleName::KV_STORE,
390 .storeName = storeInfo_.storeName,
391 .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
392 .appendixMsg = appendix };
393 Reporter::GetInstance()->CloudSyncFault()->Report(msg);
394 }
395
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)396 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
397 const SyncParam &syncParam)
398 {
399 auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
400 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
401 if (delegate_ == nullptr) {
402 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
403 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
404 return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
405 }
406 DBStatus dbStatus;
407 auto dbMode = DistributedDB::SyncMode(syncMode);
408 if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
409 if (!enableCloud_) {
410 return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
411 }
412 dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
413 if (dbStatus != DBStatus::OK) {
414 Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
415 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
416 }
417 } else {
418 if (devices.empty()) {
419 ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
420 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
421 }
422 KVDBQuery *kvQuery = nullptr;
423 auto ret = query.QueryInterface(kvQuery);
424 DistributedDB::Query dbQuery;
425 if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
426 dbQuery = kvQuery->GetDBQuery();
427 } else {
428 return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
429 }
430 if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
431 dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
432 } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
433 dbStatus =
434 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
435 } else if (syncMode < NEARBY_END) {
436 if (kvQuery->IsEmpty()) {
437 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
438 } else {
439 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
440 }
441 } else {
442 ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
443 dbStatus = DistributedDB::INVALID_ARGS;
444 }
445 }
446 return { ConvertStatus(dbStatus), dbStatus };
447 }
448
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)449 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
450 {
451 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
452 if (delegate_ == nullptr) {
453 ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
454 Anonymous::Change(storeId).c_str());
455 return;
456 }
457 std::vector<std::string> sameAccountDevs{};
458 std::vector<std::string> defaultAccountDevs{};
459 auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
460 if (uuids.empty()) {
461 ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
462 return;
463 }
464 GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
465 GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
466 if (!sameAccountDevs.empty()) {
467 auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
468 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
469 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
470 ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}.10s, appId:%{public}s",
471 Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
472 DistributedData::Serializable::Marshall(sameAccountDevs).c_str(), convertedIds.first.c_str());
473 delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
474 }
475 if (!defaultAccountDevs.empty()) {
476 auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
477 auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
478 ZLOGI("no account store:%{public}s, device:%{public}.10s, appId:%{public}s", Anonymous::Change(storeId).c_str(),
479 DistributedData::Serializable::Marshall(defaultAccountDevs).c_str(), convertedIds.first.c_str());
480 delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
481 }
482 }
483
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)484 void KVDBGeneralStore::GetIdentifierParams(
485 std::vector<std::string> &devices, const std::vector<std::string> &uuids, int32_t authType)
486 {
487 for (const auto &devId : uuids) {
488 if (DMAdapter::GetInstance().IsOHOSType(devId)) {
489 continue;
490 }
491 if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
492 continue;
493 }
494 devices.push_back(devId);
495 }
496 ZLOGI("devices size: %{public}zu", devices.size());
497 }
498
PreSharing(GenQuery & query)499 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
500 {
501 return { GeneralError::E_NOT_SUPPORT, nullptr };
502 }
503
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)504 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
505 {
506 if (mode < 0 || mode > CLEAN_MODE_BUTT) {
507 return GeneralError::E_INVALID_ARGS;
508 }
509 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
510 if (delegate_ == nullptr) {
511 ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
512 devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
513 return GeneralError::E_ALREADY_CLOSED;
514 }
515 DBStatus status = OK;
516 switch (mode) {
517 case CLOUD_INFO:
518 status = delegate_->RemoveDeviceData(
519 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
520 break;
521 case CLOUD_DATA:
522 status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
523 break;
524 case NEARBY_DATA:
525 if (devices.empty()) {
526 status = delegate_->RemoveDeviceData();
527 break;
528 }
529 for (auto device : devices) {
530 status = delegate_->RemoveDeviceData(device);
531 }
532 break;
533 default:
534 return GeneralError::E_ERROR;
535 }
536 return ConvertStatus(status);
537 }
538
Watch(int32_t origin,Watcher & watcher)539 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
540 {
541 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
542 return GeneralError::E_INVALID_ARGS;
543 }
544
545 observer_.watcher_ = &watcher;
546 return GeneralError::E_OK;
547 }
548
Unwatch(int32_t origin,Watcher & watcher)549 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
550 {
551 if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
552 return GeneralError::E_INVALID_ARGS;
553 }
554
555 observer_.watcher_ = nullptr;
556 return GeneralError::E_OK;
557 }
558
Release()559 int32_t KVDBGeneralStore::Release()
560 {
561 auto ref = 1;
562 {
563 std::lock_guard<decltype(mutex_)> lock(mutex_);
564 if (ref_ == 0) {
565 return 0;
566 }
567 ref = --ref_;
568 }
569 ZLOGD("ref:%{public}d", ref);
570 if (ref == 0) {
571 delete this;
572 }
573 return ref;
574 }
575
AddRef()576 int32_t KVDBGeneralStore::AddRef()
577 {
578 std::lock_guard<decltype(mutex_)> lock(mutex_);
579 if (ref_ == 0) {
580 return 0;
581 }
582 return ++ref_;
583 }
584
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)585 int32_t KVDBGeneralStore::SetDistributedTables(
586 const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
587 {
588 return GeneralError::E_OK;
589 }
590
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)591 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
592 const std::set<std::string> &extendColNames, bool isForceUpgrade)
593 {
594 return GeneralError::E_OK;
595 }
596
ConvertStatus(DistributedDB::DBStatus status)597 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
598 {
599 auto it = dbStatusMap_.find(status);
600 if (it != dbStatusMap_.end()) {
601 return it->second;
602 }
603 ZLOGI("status:0x%{public}x", status);
604 return GenErr::E_ERROR;
605 }
606
IsValid()607 bool KVDBGeneralStore::IsValid()
608 {
609 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
610 return delegate_ != nullptr;
611 }
612
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)613 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
614 {
615 return GenErr::E_OK;
616 }
617
UnregisterDetailProgressObserver()618 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
619 {
620 return GenErr::E_OK;
621 }
622
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)623 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
624 {
625 if (!HasWatcher()) {
626 return;
627 }
628 GenOrigin genOrigin;
629 genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
630 genOrigin.id.push_back(originalId);
631 genOrigin.store = storeId_;
632 Watcher::ChangeInfo changeInfo;
633 for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
634 auto &info = changeInfo[storeId_][i];
635 for (auto &priData : data.primaryData[i]) {
636 Watcher::PRIValue value;
637 if (priData.empty()) {
638 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
639 Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
640 Anonymous::Change(originalId).c_str(), i);
641 continue;
642 }
643 Convert(std::move(*(priData.begin())), value);
644 info.push_back(std::move(value));
645 }
646 }
647 watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
648 }
649
OnChange(const DistributedDB::KvStoreChangedData & data)650 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
651 {
652 if (!HasWatcher()) {
653 return;
654 }
655 const auto &inserts = data.GetEntriesInserted();
656 const auto &deletes = data.GetEntriesDeleted();
657 const auto &updates = data.GetEntriesUpdated();
658 Watcher::ChangeData changeData;
659 ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
660 ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
661 ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
662 GenOrigin genOrigin;
663 genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
664 genOrigin.store = storeId_;
665
666 watcher_->OnChange(genOrigin, {}, std::move(changeData));
667 }
668
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)669 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
670 {
671 for (auto &entry : entries) {
672 auto value = std::vector<Value>{ entry.key, entry.value };
673 values.push_back(value);
674 }
675 }
676
GetDBProcessCB(DetailAsync async)677 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
678 {
679 return [async](const std::map<std::string, SyncProcess> &processes) {
680 if (!async) {
681 return;
682 }
683 DistributedData::GenDetails details;
684 for (auto &[id, process] : processes) {
685 auto &detail = details[id];
686 detail.progress = process.process;
687 detail.code = ConvertStatus(process.errCode);
688 detail.dbCode = process.errCode;
689 for (auto [key, value] : process.tableProcess) {
690 auto &table = detail.details[key];
691 table.upload.total = value.upLoadInfo.total;
692 table.upload.success = value.upLoadInfo.successCount;
693 table.upload.failed = value.upLoadInfo.failCount;
694 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
695 table.download.total = value.downLoadInfo.total;
696 table.download.success = value.downLoadInfo.successCount;
697 table.download.failed = value.downLoadInfo.failCount;
698 table.download.untreated = table.download.total - table.download.success - table.download.failed;
699 detail.changeCount = (process.process == FINISHED)
700 ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
701 value.downLoadInfo.deleteCount
702 : 0;
703 }
704 }
705 if (async) {
706 async(details);
707 }
708 };
709 }
710
SetDBPushDataInterceptor(int32_t storeType)711 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
712 {
713 delegate_->SetPushDataInterceptor(
714 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
715 int errCode = DBStatus::OK;
716 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
717 return errCode;
718 }
719 if (targetID.empty()) {
720 ZLOGE("targetID empty");
721 return static_cast<int>(DBStatus::DB_ERROR);
722 }
723 auto entries = data.GetEntries();
724 for (size_t i = 0; i < entries.size(); i++) {
725 if (entries[i].key.empty()) {
726 continue;
727 }
728 auto oriKey = entries[i].key;
729 auto newKey = GetNewKey(oriKey, sourceID);
730 errCode = data.ModifyKey(i, newKey);
731 if (errCode != DBStatus::OK) {
732 ZLOGE("ModifyKey err: %{public}d", errCode);
733 break;
734 }
735 }
736 return errCode;
737 });
738 }
739
SetDBReceiveDataInterceptor(int32_t storeType)740 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
741 {
742 delegate_->SetReceiveDataInterceptor(
743 [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
744 int errCode = DBStatus::OK;
745 if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
746 return errCode;
747 }
748 if (sourceID.empty()) {
749 ZLOGE("sourceID empty");
750 return static_cast<int>(DBStatus::DB_ERROR);
751 }
752 auto entries = data.GetEntries();
753 for (size_t i = 0; i < entries.size(); i++) {
754 if (entries[i].key.empty()) {
755 continue;
756 }
757
758 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
759 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
760 if (encyptedUuid.empty()) {
761 ZLOGE("get encyptedUuid failed");
762 continue;
763 }
764
765 auto oriKey = entries[i].key;
766 auto newKey = GetNewKey(oriKey, encyptedUuid);
767 errCode = data.ModifyKey(i, newKey);
768 if (errCode != DBStatus::OK) {
769 ZLOGE("ModifyKey err: %{public}d", errCode);
770 break;
771 }
772 }
773 return errCode;
774 });
775 }
776
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)777 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
778 {
779 uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
780 remoteLen = le32toh(remoteLen);
781 uint32_t uuidLen = uuid.size();
782
783 std::vector<uint8_t> out;
784 std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
785 out.insert(out.end(), uuid.begin(), uuid.end());
786 out.insert(out.end(), oriKey.begin(), oriKey.end());
787 uuidLen = htole32(uuidLen);
788 uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
789 out.insert(out.end(), buf, buf + sizeof(uuidLen));
790 return out;
791 }
792
SetConfig(const GeneralStore::StoreConfig & storeConfig)793 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
794 {
795 enableCloud_ = storeConfig.enableCloud_;
796 }
797
LockCloudDB()798 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
799 {
800 return { GeneralError::E_NOT_SUPPORT, 0 };
801 }
802
UnLockCloudDB()803 int32_t KVDBGeneralStore::UnLockCloudDB()
804 {
805 return GeneralError::E_NOT_SUPPORT;
806 }
807
SetExecutor(std::shared_ptr<Executor> executor)808 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
809 {
810 return;
811 }
812 } // namespace OHOS::DistributedKv