• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17 
18 #include <endian.h>
19 
20 #include "app_id_mapping/app_id_mapping_config_manager.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_sync_finished_event.h"
24 #include "cloud/schema_meta.h"
25 #include "crypto/crypto_manager.h"
26 #include "device_manager_adapter.h"
27 #include "device_matrix.h"
28 #include "dfx/dfx_types.h"
29 #include "dfx/reporter.h"
30 #include "directory/directory_manager.h"
31 #include "eventcenter/event_center.h"
32 #include "kvdb_query.h"
33 #include "log_print.h"
34 #include "metadata/meta_data_manager.h"
35 #include "metadata/secret_key_meta_data.h"
36 #include "metadata/store_meta_data_local.h"
37 #include "query_helper.h"
38 #include "rdb_cloud.h"
39 #include "snapshot/bind_event.h"
40 #include "types.h"
41 #include "user_delegate.h"
42 #include "utils/anonymous.h"
43 
44 namespace OHOS::DistributedKv {
45 using namespace DistributedData;
46 using namespace DistributedDB;
47 using namespace DistributedDataDfx;
48 using DBField = DistributedDB::Field;
49 using DBTable = DistributedDB::TableSchema;
50 using DBSchema = DistributedDB::DataBaseSchema;
51 using ClearMode = DistributedDB::ClearMode;
52 using DMAdapter = DistributedData::DeviceManagerAdapter;
53 using DBInterceptedData = DistributedDB::InterceptedData;
54 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
55 constexpr int UUID_WIDTH = 4;
56 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
57     { DBStatus::OK, GenErr::E_OK },
58     { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
59     { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
60     { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
61     { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
62     { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
63     { DBStatus::BUSY, GenErr::E_DB_ERROR },
64     { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
65     { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
66     { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
67     { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
68     { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
69     { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
70     { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
71     { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
72     { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
73     { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
74     { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
75     { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
76     { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
77     { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
78 };
79 
80 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)81 static DBSchema GetDBSchema(const Database &database)
82 {
83     DBSchema schema;
84     schema.tables.resize(database.tables.size());
85     for (size_t i = 0; i < database.tables.size(); i++) {
86         const Table &table = database.tables[i];
87         DBTable &dbTable = schema.tables[i];
88         dbTable.name = table.name;
89         dbTable.sharedTableName = table.sharedTableName;
90         for (auto &field : table.fields) {
91             DBField dbField;
92             dbField.colName = field.colName;
93             dbField.type = field.type;
94             dbField.primary = field.primary;
95             dbField.nullable = field.nullable;
96             dbTable.fields.push_back(std::move(dbField));
97         }
98     }
99     return schema;
100 }
101 
GetDBPassword(const StoreMetaData & data)102 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
103 {
104     DBPassword dbPassword;
105     if (!data.isEncrypt) {
106         return dbPassword;
107     }
108     SecretKeyMetaData secretKey;
109     auto metaKey = data.GetSecretKey();
110     if (!MetaDataManager::GetInstance().LoadMeta(metaKey, secretKey, true) || secretKey.sKey.empty()) {
111         return dbPassword;
112     }
113     CryptoManager::CryptoParams decryptParams = { .area = secretKey.area, .userId = data.user,
114         .nonce = secretKey.nonce };
115     auto password = CryptoManager::GetInstance().Decrypt(secretKey.sKey, decryptParams);
116     if (password.empty()) {
117         return dbPassword;
118     }
119     // update secret key of area or nonce
120     CryptoManager::GetInstance().UpdateSecretMeta(password, data, metaKey, secretKey);
121     dbPassword.SetValue(password.data(), password.size());
122     password.assign(password.size(), 0);
123     return dbPassword;
124 }
125 
GetDBSecurity(int32_t secLevel)126 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
127 {
128     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
129         return { DistributedDB::NOT_SET, DistributedDB::ECE };
130     }
131     if (secLevel == SecurityLevel::S3) {
132         return { DistributedDB::S3, DistributedDB::SECE };
133     }
134     if (secLevel == SecurityLevel::S4) {
135         return { DistributedDB::S4, DistributedDB::ECE };
136     }
137     return { secLevel, DistributedDB::ECE };
138 }
139 
GetDBOption(const StoreMetaData & data,const DBPassword & password)140 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
141 {
142     DBOption dbOption;
143     dbOption.createIfNecessary = false;
144     dbOption.isMemoryDb = false;
145     dbOption.isEncryptedDb = data.isEncrypt;
146     dbOption.isNeedCompressOnSync = data.isNeedCompress;
147     if (data.isEncrypt) {
148         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
149         dbOption.passwd = password;
150     }
151     StoreMetaDataLocal local;
152     MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
153     if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
154         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
155     } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
156         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
157     }
158     if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
159         dbOption.compressionRate = META_COMPRESS_RATE;
160         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
161     } else {
162         dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
163     }
164     dbOption.schema = data.schema;
165     dbOption.createDirByStoreIdOnly = true;
166     dbOption.secOption = GetDBSecurity(data.securityLevel);
167     return dbOption;
168 }
169 
KVDBGeneralStore(const StoreMetaData & meta)170 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
171     : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
172           meta.instanceId)
173 {
174     observer_.storeId_ = meta.storeId;
175     StoreMetaDataLocal local;
176     MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
177     isPublic_ = local.isPublic;
178     DBStatus status = DBStatus::NOT_FOUND;
179     manager_.SetKvStoreConfig({ meta.dataDir });
180     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
181     manager_.GetKvStore(
182         meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
183             status = dbStatus;
184             delegate_ = tmpStore;
185         });
186     if (delegate_ == nullptr || status != DBStatus::OK) {
187         ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
188         manager_.CloseKvStore(delegate_);
189         return;
190     }
191     SetDBPushDataInterceptor(meta.storeType);
192     SetDBReceiveDataInterceptor(meta.storeType);
193     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
194     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
195     if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
196         delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
197             DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
198         });
199     }
200     if (meta.isAutoSync) {
201         bool param = true;
202         auto data = static_cast<DistributedDB::PragmaData>(&param);
203         delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
204     }
205     storeInfo_.tokenId = meta.tokenId;
206     storeInfo_.bundleName = meta.bundleName;
207     storeInfo_.storeName = meta.storeId;
208     storeInfo_.instanceId = meta.instanceId;
209     storeInfo_.user = std::atoi(meta.user.c_str());
210     enableCloud_ = meta.enableCloud;
211 }
212 
~KVDBGeneralStore()213 KVDBGeneralStore::~KVDBGeneralStore()
214 {
215     {
216         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
217         if (delegate_ != nullptr) {
218             delegate_->UnRegisterObserver(&observer_);
219         }
220         manager_.CloseKvStore(delegate_);
221         delegate_ = nullptr;
222     }
223     {
224         std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
225         for (auto &[userId, bindInfo] : bindInfos_) {
226             if (bindInfo.db_ != nullptr) {
227                 bindInfo.db_->Close();
228             }
229         }
230         bindInfos_.clear();
231     }
232 }
233 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)234 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
235 {
236     return GenErr::E_NOT_SUPPORT;
237 }
238 
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)239 int32_t KVDBGeneralStore::Bind(
240     const Database &database, const std::map<uint32_t, BindInfo> &bindInfos, const CloudConfig &config)
241 {
242     if (bindInfos.empty()) {
243         ZLOGW("No cloudDB!");
244         return GeneralError::E_OK;
245     }
246     std::map<std::string, DataBaseSchema> schemas;
247     std::map<std::string, std::shared_ptr<DistributedDB::ICloudDb>> dbClouds{};
248     auto dbSchema = GetDBSchema(database);
249     {
250         std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
251         for (auto &[userId, bindInfo] : bindInfos) {
252             if (bindInfo.db_ == nullptr) {
253                 return GeneralError::E_INVALID_ARGS;
254             }
255             dbClouds.insert({ std::to_string(userId),
256                 std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
257             bindInfos_.insert(std::make_pair(userId, std::move(bindInfo)));
258             schemas.insert({ std::to_string(userId), dbSchema });
259         }
260     }
261     DistributedDB::CloudSyncConfig dbConfig;
262     dbConfig.maxUploadCount = config.maxNumber;
263     dbConfig.maxUploadSize = config.maxSize;
264     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
265     dbConfig.isSupportEncrypt = config.isSupportEncrypt;
266     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
267     if (delegate_ == nullptr) {
268         return GeneralError::E_ALREADY_CLOSED;
269     }
270     delegate_->SetCloudDB(std::move(dbClouds));
271     delegate_->SetCloudDbSchema(std::move(schemas));
272     delegate_->SetCloudSyncConfig(dbConfig);
273     return GeneralError::E_OK;
274 }
275 
IsBound(uint32_t user)276 bool KVDBGeneralStore::IsBound(uint32_t user)
277 {
278     std::shared_lock<std::shared_mutex> lock(bindMutex_);
279     return bindInfos_.find(user) != bindInfos_.end();
280 }
281 
Close(bool isForce)282 int32_t KVDBGeneralStore::Close(bool isForce)
283 {
284     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
285     if (!lock) {
286         return GeneralError::E_BUSY;
287     }
288 
289     if (delegate_ == nullptr) {
290         return GeneralError::E_OK;
291     }
292     if (!isForce && delegate_->GetTaskCount() > 0) {
293         return GeneralError::E_BUSY;
294     }
295     if (delegate_ != nullptr) {
296         delegate_->UnRegisterObserver(&observer_);
297     }
298     auto status = manager_.CloseKvStore(delegate_);
299     if (status != DBStatus::OK) {
300         return status;
301     }
302     delegate_ = nullptr;
303     return GeneralError::E_OK;
304 }
305 
Execute(const std::string & table,const std::string & sql)306 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
307 {
308     return GeneralError::E_NOT_SUPPORT;
309 }
310 
Insert(const std::string & table,VBuckets && values)311 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
312 {
313     return GeneralError::E_NOT_SUPPORT;
314 }
315 
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)316 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
317     const std::string &whereSql, Values &&conditions)
318 {
319     return GeneralError::E_NOT_SUPPORT;
320 }
321 
Delete(const std::string & table,const std::string & sql,Values && args)322 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
323 {
324     return GeneralError::E_NOT_SUPPORT;
325 }
326 
Replace(const std::string & table,VBucket && value)327 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
328 {
329     return GeneralError::E_NOT_SUPPORT;
330 }
331 
Query(const std::string & table,const std::string & sql,Values && args)332 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
333     __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
334 {
335     return { GeneralError::E_NOT_SUPPORT, nullptr };
336 }
337 
Query(const std::string & table,GenQuery & query)338 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
339 {
340     return { GeneralError::E_NOT_SUPPORT, nullptr };
341 }
342 
MergeMigratedData(const std::string & tableName,VBuckets && values)343 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
344 {
345     return GeneralError::E_NOT_SUPPORT;
346 }
347 
CleanTrackerData(const std::string & tableName,int64_t cursor)348 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
349 {
350     return GeneralError::E_NOT_SUPPORT;
351 }
352 
GetDBSyncCompleteCB(DetailAsync async)353 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
354 {
355     if (!async) {
356         return [](auto &) {};
357     }
358     return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
359         GenDetails details;
360         for (auto &[key, dbStatus] : status) {
361             auto &value = details[key];
362             value.progress = FINISHED;
363             value.code = GeneralError::E_OK;
364             if (dbStatus != DBStatus::OK) {
365                 value.code = dbStatus;
366             }
367         }
368         async(details);
369     };
370 }
371 
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)372 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
373     int64_t wait, const std::string &prepareTraceId)
374 {
375     DistributedDB::CloudSyncOption syncOption;
376     syncOption.devices = devices;
377     syncOption.mode = cloudSyncMode;
378     syncOption.waitTime = wait;
379     syncOption.prepareTraceId = prepareTraceId;
380     syncOption.lockAction = DistributedDB::LockAction::NONE;
381     if (storeInfo_.user == 0) {
382         std::vector<int32_t> users;
383         auto ret = AccountDelegate::GetInstance()->QueryUsers(users);
384         if (!ret || users.empty()) {
385             ZLOGE("failed to query os accounts, ret:%{public}d", ret);
386             return DBStatus::DB_ERROR;
387         }
388         syncOption.users.push_back(std::to_string(users[0]));
389     } else {
390         syncOption.users.push_back(std::to_string(storeInfo_.user));
391     }
392     return delegate_->Sync(syncOption, GetDBProcessCB(async));
393 }
394 
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)395 void KVDBGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
396 {
397     ArkDataFaultMsg msg = { .faultType = faultType,
398         .bundleName = storeInfo_.bundleName,
399         .moduleName = ModuleName::KV_STORE,
400         .storeName = storeInfo_.storeName,
401         .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
402         .appendixMsg = appendix };
403     Reporter::GetInstance()->CloudSyncFault()->Report(msg);
404 }
405 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)406 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
407     const SyncParam &syncParam)
408 {
409     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
410     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
411     if (delegate_ == nullptr) {
412         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
413             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
414         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
415     }
416     DBStatus dbStatus;
417     auto dbMode = DistributedDB::SyncMode(syncMode);
418     if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
419         if (!enableCloud_) {
420             return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
421         }
422         dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
423         if (dbStatus != DBStatus::OK) {
424             Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
425                 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
426         }
427     } else {
428         if (devices.empty()) {
429             ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
430             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
431         }
432         KVDBQuery *kvQuery = nullptr;
433         auto ret = query.QueryInterface(kvQuery);
434         DistributedDB::Query dbQuery;
435         if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
436             dbQuery = kvQuery->GetDBQuery();
437         } else {
438             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
439         }
440         if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
441             dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
442         } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
443             dbStatus =
444                 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
445         } else if (syncMode < NEARBY_END) {
446             DeviceSyncOption syncOption = { .devices = devices, .mode = dbMode, .query = dbQuery, .isWait = false,
447                 .isRetry = syncParam.isRetry };
448             dbStatus = delegate_->Sync(syncOption, GetDBSyncCompleteCB(std::move(async)));
449         } else {
450             ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
451             dbStatus = DistributedDB::INVALID_ARGS;
452         }
453     }
454     return { ConvertStatus(dbStatus), dbStatus };
455 }
456 
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)457 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
458 {
459     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
460     if (delegate_ == nullptr) {
461         ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
462             Anonymous::Change(storeId).c_str());
463         return;
464     }
465     std::vector<std::string> sameAccountDevs{};
466     std::vector<std::string> defaultAccountDevs{};
467     auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
468     if (uuids.empty()) {
469         ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
470         return;
471     }
472     GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
473     GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
474     if (!sameAccountDevs.empty()) {
475         auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
476         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
477         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
478         ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}s, appId:%{public}s",
479             Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
480             Anonymous::Change(DistributedData::Serializable::Marshall(sameAccountDevs)).c_str(),
481             convertedIds.first.c_str());
482         delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
483     }
484     if (!defaultAccountDevs.empty()) {
485         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
486         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
487         ZLOGI("no account store:%{public}s, device:%{public}s, appId:%{public}s", Anonymous::Change(storeId).c_str(),
488             Anonymous::Change(DistributedData::Serializable::Marshall(defaultAccountDevs)).c_str(),
489             convertedIds.first.c_str());
490         delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
491     }
492 }
493 
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)494 void KVDBGeneralStore::GetIdentifierParams(
495     std::vector<std::string> &devices, const std::vector<std::string> &uuids, int32_t authType)
496 {
497     for (const auto &devId : uuids) {
498         if (DMAdapter::GetInstance().IsOHOSType(devId)) {
499             continue;
500         }
501         if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
502             continue;
503         }
504         devices.push_back(devId);
505     }
506     ZLOGI("devices size: %{public}zu", devices.size());
507 }
508 
PreSharing(GenQuery & query)509 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
510 {
511     return { GeneralError::E_NOT_SUPPORT, nullptr };
512 }
513 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)514 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
515 {
516     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
517         return GeneralError::E_INVALID_ARGS;
518     }
519     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
520     if (delegate_ == nullptr) {
521         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
522             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
523         return GeneralError::E_ALREADY_CLOSED;
524     }
525     DBStatus status = OK;
526     ClearKvMetaDataOption option;
527     switch (mode) {
528         case CLOUD_INFO:
529             status = delegate_->RemoveDeviceData(
530                 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
531             break;
532         case CLOUD_DATA:
533             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
534             break;
535         case CLEAN_WATER:
536             option.type = ClearKvMetaOpType::CLEAN_CLOUD_WATERMARK;
537             status = delegate_->ClearMetaData(option);
538             break;
539         case NEARBY_DATA:
540             if (devices.empty()) {
541                 status = delegate_->RemoveDeviceData();
542                 break;
543             }
544             for (auto device : devices) {
545                 status = delegate_->RemoveDeviceData(device);
546             }
547             break;
548         default:
549             return GeneralError::E_ERROR;
550     }
551     return ConvertStatus(status);
552 }
553 
Watch(int32_t origin,Watcher & watcher)554 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
555 {
556     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
557         return GeneralError::E_INVALID_ARGS;
558     }
559 
560     observer_.watcher_ = &watcher;
561     return GeneralError::E_OK;
562 }
563 
Unwatch(int32_t origin,Watcher & watcher)564 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
565 {
566     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
567         return GeneralError::E_INVALID_ARGS;
568     }
569 
570     observer_.watcher_ = nullptr;
571     return GeneralError::E_OK;
572 }
573 
Release()574 int32_t KVDBGeneralStore::Release()
575 {
576     auto ref = 1;
577     {
578         std::lock_guard<decltype(mutex_)> lock(mutex_);
579         if (ref_ == 0) {
580             return 0;
581         }
582         ref = --ref_;
583     }
584     ZLOGD("ref:%{public}d", ref);
585     if (ref == 0) {
586         delete this;
587     }
588     return ref;
589 }
590 
AddRef()591 int32_t KVDBGeneralStore::AddRef()
592 {
593     std::lock_guard<decltype(mutex_)> lock(mutex_);
594     if (ref_ == 0) {
595         return 0;
596     }
597     return ++ref_;
598 }
599 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)600 int32_t KVDBGeneralStore::SetDistributedTables(
601     const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
602 {
603     return GeneralError::E_OK;
604 }
605 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)606 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
607     const std::set<std::string> &extendColNames, bool isForceUpgrade)
608 {
609     return GeneralError::E_OK;
610 }
611 
ConvertStatus(DistributedDB::DBStatus status)612 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
613 {
614     auto it = dbStatusMap_.find(status);
615     if (it != dbStatusMap_.end()) {
616         return it->second;
617     }
618     ZLOGI("status:0x%{public}x", status);
619     return GenErr::E_ERROR;
620 }
621 
IsValid()622 bool KVDBGeneralStore::IsValid()
623 {
624     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
625     return delegate_ != nullptr;
626 }
627 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)628 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
629 {
630     return GenErr::E_OK;
631 }
632 
UnregisterDetailProgressObserver()633 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
634 {
635     return GenErr::E_OK;
636 }
637 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)638 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
639 {
640     if (!HasWatcher()) {
641         return;
642     }
643     GenOrigin genOrigin;
644     genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
645     genOrigin.id.push_back(originalId);
646     genOrigin.store = storeId_;
647     Watcher::ChangeInfo changeInfo;
648     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
649         auto &info = changeInfo[storeId_][i];
650         for (auto &priData : data.primaryData[i]) {
651             Watcher::PRIValue value;
652             if (priData.empty()) {
653                 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
654                     Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
655                     Anonymous::Change(originalId).c_str(), i);
656                 continue;
657             }
658             Convert(std::move(*(priData.begin())), value);
659             info.push_back(std::move(value));
660         }
661     }
662     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
663 }
664 
OnChange(const DistributedDB::KvStoreChangedData & data)665 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
666 {
667     if (!HasWatcher()) {
668         return;
669     }
670     const auto &inserts = data.GetEntriesInserted();
671     const auto &deletes = data.GetEntriesDeleted();
672     const auto &updates = data.GetEntriesUpdated();
673     Watcher::ChangeData changeData;
674     ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
675     ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
676     ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
677     GenOrigin genOrigin;
678     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
679     genOrigin.store = storeId_;
680 
681     watcher_->OnChange(genOrigin, {}, std::move(changeData));
682 }
683 
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)684 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
685 {
686     for (auto &entry : entries) {
687         auto value = std::vector<Value>{ entry.key, entry.value };
688         values.push_back(value);
689     }
690 }
691 
GetDBProcessCB(DetailAsync async)692 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
693 {
694     return [async](const std::map<std::string, SyncProcess> &processes) {
695         if (!async) {
696             return;
697         }
698         DistributedData::GenDetails details;
699         for (auto &[id, process] : processes) {
700             auto &detail = details[id];
701             detail.progress = process.process;
702             detail.code = ConvertStatus(process.errCode);
703             detail.dbCode = process.errCode;
704             for (auto [key, value] : process.tableProcess) {
705                 auto &table = detail.details[key];
706                 table.upload.total = value.upLoadInfo.total;
707                 table.upload.success = value.upLoadInfo.successCount;
708                 table.upload.failed = value.upLoadInfo.failCount;
709                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
710                 table.download.total = value.downLoadInfo.total;
711                 table.download.success = value.downLoadInfo.successCount;
712                 table.download.failed = value.downLoadInfo.failCount;
713                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
714                 detail.changeCount = (process.process == FINISHED)
715                                          ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
716                                                value.downLoadInfo.deleteCount
717                                          : 0;
718             }
719         }
720         if (async) {
721             async(details);
722         }
723     };
724 }
725 
SetDBPushDataInterceptor(int32_t storeType)726 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
727 {
728     delegate_->SetPushDataInterceptor(
729         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
730             int errCode = DBStatus::OK;
731             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
732                 return errCode;
733             }
734             if (targetID.empty()) {
735                 ZLOGE("targetID empty");
736                 return static_cast<int>(DBStatus::DB_ERROR);
737             }
738             auto entries = data.GetEntries();
739             for (size_t i = 0; i < entries.size(); i++) {
740                 if (entries[i].key.empty()) {
741                     continue;
742                 }
743                 auto oriKey = entries[i].key;
744                 auto newKey = GetNewKey(oriKey, sourceID);
745                 errCode = data.ModifyKey(i, newKey);
746                 if (errCode != DBStatus::OK) {
747                     ZLOGE("ModifyKey err: %{public}d", errCode);
748                     break;
749                 }
750             }
751             return errCode;
752         });
753 }
754 
SetDBReceiveDataInterceptor(int32_t storeType)755 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
756 {
757     delegate_->SetReceiveDataInterceptor(
758         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
759             int errCode = DBStatus::OK;
760             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
761                 return errCode;
762             }
763             if (sourceID.empty()) {
764                 ZLOGE("sourceID empty");
765                 return static_cast<int>(DBStatus::DB_ERROR);
766             }
767             auto entries = data.GetEntries();
768             for (size_t i = 0; i < entries.size(); i++) {
769                 if (entries[i].key.empty()) {
770                     continue;
771                 }
772 
773                 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
774                 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
775                 if (encyptedUuid.empty()) {
776                     ZLOGE("get encyptedUuid failed");
777                     continue;
778                 }
779 
780                 auto oriKey = entries[i].key;
781                 auto newKey = GetNewKey(oriKey, encyptedUuid);
782                 errCode = data.ModifyKey(i, newKey);
783                 if (errCode != DBStatus::OK) {
784                     ZLOGE("ModifyKey err: %{public}d", errCode);
785                     break;
786                 }
787             }
788             return errCode;
789         });
790 }
791 
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)792 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
793 {
794     uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
795     remoteLen = le32toh(remoteLen);
796     uint32_t uuidLen = uuid.size();
797 
798     std::vector<uint8_t> out;
799     std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
800     out.insert(out.end(), uuid.begin(), uuid.end());
801     out.insert(out.end(), oriKey.begin(), oriKey.end());
802     uuidLen = htole32(uuidLen);
803     uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
804     out.insert(out.end(), buf, buf + sizeof(uuidLen));
805     return out;
806 }
807 
SetConfig(const GeneralStore::StoreConfig & storeConfig)808 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
809 {
810     enableCloud_ = storeConfig.enableCloud_;
811 }
812 
LockCloudDB()813 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
814 {
815     return { GeneralError::E_NOT_SUPPORT, 0 };
816 }
817 
UnLockCloudDB()818 int32_t KVDBGeneralStore::UnLockCloudDB()
819 {
820     return GeneralError::E_NOT_SUPPORT;
821 }
822 
SetExecutor(std::shared_ptr<Executor> executor)823 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
824 {
825     return;
826 }
827 } // namespace OHOS::DistributedKv