• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17 
18 #include <endian.h>
19 
20 #include "app_id_mapping/app_id_mapping_config_manager.h"
21 #include "bootstrap.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_sync_finished_event.h"
24 #include "cloud/schema_meta.h"
25 #include "crypto_manager.h"
26 #include "device_manager_adapter.h"
27 #include "device_matrix.h"
28 #include "dfx_types.h"
29 #include "directory/directory_manager.h"
30 #include "eventcenter/event_center.h"
31 #include "kvdb_query.h"
32 #include "log_print.h"
33 #include "metadata/meta_data_manager.h"
34 #include "metadata/secret_key_meta_data.h"
35 #include "metadata/store_meta_data_local.h"
36 #include "query_helper.h"
37 #include "rdb_cloud.h"
38 #include "reporter.h"
39 #include "snapshot/bind_event.h"
40 #include "types.h"
41 #include "user_delegate.h"
42 #include "utils/anonymous.h"
43 
44 namespace OHOS::DistributedKv {
45 using namespace DistributedData;
46 using namespace DistributedDB;
47 using namespace DistributedDataDfx;
48 using DBField = DistributedDB::Field;
49 using DBTable = DistributedDB::TableSchema;
50 using DBSchema = DistributedDB::DataBaseSchema;
51 using ClearMode = DistributedDB::ClearMode;
52 using DMAdapter = DistributedData::DeviceManagerAdapter;
53 using DBInterceptedData = DistributedDB::InterceptedData;
54 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
55 constexpr int UUID_WIDTH = 4;
56 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
57     { DBStatus::OK, GenErr::E_OK },
58     { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
59     { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
60     { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
61     { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
62     { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
63     { DBStatus::BUSY, GenErr::E_DB_ERROR },
64     { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
65     { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
66     { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
67     { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
68     { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
69     { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
70     { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
71     { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
72     { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
73     { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
74     { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
75     { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
76     { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
77     { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
78 };
79 
80 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)81 static DBSchema GetDBSchema(const Database &database)
82 {
83     DBSchema schema;
84     schema.tables.resize(database.tables.size());
85     for (size_t i = 0; i < database.tables.size(); i++) {
86         const Table &table = database.tables[i];
87         DBTable &dbTable = schema.tables[i];
88         dbTable.name = table.name;
89         dbTable.sharedTableName = table.sharedTableName;
90         for (auto &field : table.fields) {
91             DBField dbField;
92             dbField.colName = field.colName;
93             dbField.type = field.type;
94             dbField.primary = field.primary;
95             dbField.nullable = field.nullable;
96             dbTable.fields.push_back(std::move(dbField));
97         }
98     }
99     return schema;
100 }
GetDBPassword(const StoreMetaData & data)101 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
102 {
103     DBPassword dbPassword;
104     if (!data.isEncrypt) {
105         return dbPassword;
106     }
107 
108     SecretKeyMetaData secretKey;
109     secretKey.storeType = data.storeType;
110     auto storeKey = data.GetSecretKey();
111     MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
112     std::vector<uint8_t> password;
113     StoreMetaData metaData;
114     MetaDataManager::GetInstance().LoadMeta(data.GetKey(), metaData, true);
115     CryptoManager::GetInstance().Decrypt(metaData, secretKey, password);
116     dbPassword.SetValue(password.data(), password.size());
117     password.assign(password.size(), 0);
118     return dbPassword;
119 }
120 
GetDBSecurity(int32_t secLevel)121 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
122 {
123     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
124         return { DistributedDB::NOT_SET, DistributedDB::ECE };
125     }
126     if (secLevel == SecurityLevel::S3) {
127         return { DistributedDB::S3, DistributedDB::SECE };
128     }
129     if (secLevel == SecurityLevel::S4) {
130         return { DistributedDB::S4, DistributedDB::ECE };
131     }
132     return { secLevel, DistributedDB::ECE };
133 }
134 
GetDBOption(const StoreMetaData & data,const DBPassword & password)135 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
136 {
137     DBOption dbOption;
138     dbOption.createIfNecessary = false;
139     dbOption.isMemoryDb = false;
140     dbOption.isEncryptedDb = data.isEncrypt;
141     dbOption.isNeedCompressOnSync = data.isNeedCompress;
142     if (data.isEncrypt) {
143         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
144         dbOption.passwd = password;
145     }
146     StoreMetaDataLocal local;
147     MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
148     if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
149         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
150     } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
151         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
152     }
153     if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
154         dbOption.compressionRate = META_COMPRESS_RATE;
155         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
156     } else {
157         dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
158     }
159     dbOption.schema = data.schema;
160     dbOption.createDirByStoreIdOnly = true;
161     dbOption.secOption = GetDBSecurity(data.securityLevel);
162     return dbOption;
163 }
164 
KVDBGeneralStore(const StoreMetaData & meta)165 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
166     : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
167           meta.instanceId)
168 {
169     observer_.storeId_ = meta.storeId;
170     StoreMetaDataLocal local;
171     MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
172     isPublic_ = local.isPublic;
173     DBStatus status = DBStatus::NOT_FOUND;
174     manager_.SetKvStoreConfig({ meta.dataDir });
175     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
176     manager_.GetKvStore(
177         meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
178             status = dbStatus;
179             delegate_ = tmpStore;
180         });
181     if (delegate_ == nullptr || status != DBStatus::OK) {
182         ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
183         manager_.CloseKvStore(delegate_);
184         return;
185     }
186     SetDBPushDataInterceptor(meta.storeType);
187     SetDBReceiveDataInterceptor(meta.storeType);
188     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
189     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
190     if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
191         delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
192             DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
193         });
194     }
195     if (meta.isAutoSync) {
196         bool param = true;
197         auto data = static_cast<DistributedDB::PragmaData>(&param);
198         delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
199     }
200     storeInfo_.tokenId = meta.tokenId;
201     storeInfo_.bundleName = meta.bundleName;
202     storeInfo_.storeName = meta.storeId;
203     storeInfo_.instanceId = meta.instanceId;
204     storeInfo_.user = std::atoi(meta.user.c_str());
205     enableCloud_ = meta.enableCloud;
206 }
207 
~KVDBGeneralStore()208 KVDBGeneralStore::~KVDBGeneralStore()
209 {
210     {
211         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
212         if (delegate_ != nullptr) {
213             delegate_->UnRegisterObserver(&observer_);
214         }
215         manager_.CloseKvStore(delegate_);
216         delegate_ = nullptr;
217     }
218     {
219         std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
220         for (auto &[userId, bindInfo] : bindInfos_) {
221             if (bindInfo.db_ != nullptr) {
222                 bindInfo.db_->Close();
223             }
224         }
225         bindInfos_.clear();
226     }
227 }
228 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)229 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
230 {
231     return GenErr::E_NOT_SUPPORT;
232 }
233 
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)234 int32_t KVDBGeneralStore::Bind(
235     const Database &database, const std::map<uint32_t, BindInfo> &bindInfos, const CloudConfig &config)
236 {
237     if (bindInfos.empty()) {
238         ZLOGW("No cloudDB!");
239         return GeneralError::E_OK;
240     }
241     std::map<std::string, DataBaseSchema> schemas;
242     std::map<std::string, std::shared_ptr<DistributedDB::ICloudDb>> dbClouds{};
243     auto dbSchema = GetDBSchema(database);
244     {
245         std::unique_lock<decltype(bindMutex_)> lock(bindMutex_);
246         for (auto &[userId, bindInfo] : bindInfos) {
247             if (bindInfo.db_ == nullptr) {
248                 return GeneralError::E_INVALID_ARGS;
249             }
250             dbClouds.insert({ std::to_string(userId),
251                 std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
252             bindInfos_.insert(std::make_pair(userId, std::move(bindInfo)));
253             schemas.insert({ std::to_string(userId), dbSchema });
254         }
255     }
256     DistributedDB::CloudSyncConfig dbConfig;
257     dbConfig.maxUploadCount = config.maxNumber;
258     dbConfig.maxUploadSize = config.maxSize;
259     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
260     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
261     if (delegate_ == nullptr) {
262         return GeneralError::E_ALREADY_CLOSED;
263     }
264     delegate_->SetCloudDB(std::move(dbClouds));
265     delegate_->SetCloudDbSchema(std::move(schemas));
266     delegate_->SetCloudSyncConfig(dbConfig);
267     return GeneralError::E_OK;
268 }
269 
IsBound(uint32_t user)270 bool KVDBGeneralStore::IsBound(uint32_t user)
271 {
272     std::shared_lock<std::shared_mutex> lock(bindMutex_);
273     return bindInfos_.find(user) != bindInfos_.end();
274 }
275 
Close(bool isForce)276 int32_t KVDBGeneralStore::Close(bool isForce)
277 {
278     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
279     if (!lock) {
280         return GeneralError::E_BUSY;
281     }
282 
283     if (delegate_ == nullptr) {
284         return GeneralError::E_OK;
285     }
286     if (!isForce && delegate_->GetTaskCount() > 0) {
287         return GeneralError::E_BUSY;
288     }
289     if (delegate_ != nullptr) {
290         delegate_->UnRegisterObserver(&observer_);
291     }
292     auto status = manager_.CloseKvStore(delegate_);
293     if (status != DBStatus::OK) {
294         return status;
295     }
296     delegate_ = nullptr;
297     return GeneralError::E_OK;
298 }
299 
Execute(const std::string & table,const std::string & sql)300 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
301 {
302     return GeneralError::E_NOT_SUPPORT;
303 }
304 
Insert(const std::string & table,VBuckets && values)305 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
306 {
307     return GeneralError::E_NOT_SUPPORT;
308 }
309 
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)310 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
311     const std::string &whereSql, Values &&conditions)
312 {
313     return GeneralError::E_NOT_SUPPORT;
314 }
315 
Delete(const std::string & table,const std::string & sql,Values && args)316 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
317 {
318     return GeneralError::E_NOT_SUPPORT;
319 }
320 
Replace(const std::string & table,VBucket && value)321 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
322 {
323     return GeneralError::E_NOT_SUPPORT;
324 }
325 
Query(const std::string & table,const std::string & sql,Values && args)326 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
327     __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
328 {
329     return { GeneralError::E_NOT_SUPPORT, nullptr };
330 }
331 
Query(const std::string & table,GenQuery & query)332 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
333 {
334     return { GeneralError::E_NOT_SUPPORT, nullptr };
335 }
336 
MergeMigratedData(const std::string & tableName,VBuckets && values)337 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
338 {
339     return GeneralError::E_NOT_SUPPORT;
340 }
341 
CleanTrackerData(const std::string & tableName,int64_t cursor)342 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
343 {
344     return GeneralError::E_NOT_SUPPORT;
345 }
346 
GetDBSyncCompleteCB(DetailAsync async)347 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
348 {
349     if (!async) {
350         return [](auto &) {};
351     }
352     return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
353         GenDetails details;
354         for (auto &[key, dbStatus] : status) {
355             auto &value = details[key];
356             value.progress = FINISHED;
357             value.code = GeneralError::E_OK;
358             if (dbStatus != DBStatus::OK) {
359                 value.code = dbStatus;
360             }
361         }
362         async(details);
363     };
364 }
365 
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)366 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
367     int64_t wait, const std::string &prepareTraceId)
368 {
369     DistributedDB::CloudSyncOption syncOption;
370     syncOption.devices = devices;
371     syncOption.mode = cloudSyncMode;
372     syncOption.waitTime = wait;
373     syncOption.prepareTraceId = prepareTraceId;
374     syncOption.lockAction = DistributedDB::LockAction::NONE;
375     if (storeInfo_.user == 0) {
376         std::vector<int32_t> users;
377         AccountDelegate::GetInstance()->QueryUsers(users);
378         syncOption.users.push_back(std::to_string(users[0]));
379     } else {
380         syncOption.users.push_back(std::to_string(storeInfo_.user));
381     }
382     return delegate_->Sync(syncOption, GetDBProcessCB(async));
383 }
384 
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)385 void KVDBGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
386 {
387     ArkDataFaultMsg msg = { .faultType = faultType,
388         .bundleName = storeInfo_.bundleName,
389         .moduleName = ModuleName::KV_STORE,
390         .storeName = storeInfo_.storeName,
391         .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
392         .appendixMsg = appendix };
393     Reporter::GetInstance()->CloudSyncFault()->Report(msg);
394 }
395 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)396 std::pair<int32_t, int32_t> KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
397     const SyncParam &syncParam)
398 {
399     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
400     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
401     if (delegate_ == nullptr) {
402         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
403             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
404         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
405     }
406     DBStatus dbStatus;
407     auto dbMode = DistributedDB::SyncMode(syncMode);
408     if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
409         if (!enableCloud_) {
410             return { GeneralError::E_NOT_SUPPORT, DBStatus::OK };
411         }
412         dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
413         if (dbStatus != DBStatus::OK) {
414             Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
415                 "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
416         }
417     } else {
418         if (devices.empty()) {
419             ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
420             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
421         }
422         KVDBQuery *kvQuery = nullptr;
423         auto ret = query.QueryInterface(kvQuery);
424         DistributedDB::Query dbQuery;
425         if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
426             dbQuery = kvQuery->GetDBQuery();
427         } else {
428             return { GeneralError::E_INVALID_ARGS, DBStatus::OK };
429         }
430         if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
431             dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
432         } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
433             dbStatus =
434                 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
435         } else if (syncMode < NEARBY_END) {
436             if (kvQuery->IsEmpty()) {
437                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
438             } else {
439                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
440             }
441         } else {
442             ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
443             dbStatus = DistributedDB::INVALID_ARGS;
444         }
445     }
446     return { ConvertStatus(dbStatus), dbStatus };
447 }
448 
SetEqualIdentifier(const std::string & appId,const std::string & storeId,std::string account)449 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId, std::string account)
450 {
451     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
452     if (delegate_ == nullptr) {
453         ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
454             Anonymous::Change(storeId).c_str());
455         return;
456     }
457     std::vector<std::string> sameAccountDevs{};
458     std::vector<std::string> defaultAccountDevs{};
459     auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
460     if (uuids.empty()) {
461         ZLOGI("no remote device to sync.appId:%{public}s", appId.c_str());
462         return;
463     }
464     GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
465     GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
466     if (!sameAccountDevs.empty()) {
467         auto accountId = account.empty() ? AccountDelegate::GetInstance()->GetUnencryptedAccountId() : account;
468         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, accountId);
469         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
470         ZLOGI("same account store:%{public}s, user:%{public}s, device:%{public}.10s, appId:%{public}s",
471             Anonymous::Change(storeId).c_str(), Anonymous::Change(convertedIds.second).c_str(),
472             DistributedData::Serializable::Marshall(sameAccountDevs).c_str(), convertedIds.first.c_str());
473         delegate_->SetEqualIdentifier(identifier, sameAccountDevs);
474     }
475     if (!defaultAccountDevs.empty()) {
476         auto convertedIds = AppIdMappingConfigManager::GetInstance().Convert(appId, defaultAccountId);
477         auto identifier = KvManager::GetKvStoreIdentifier(convertedIds.second, convertedIds.first, storeId);
478         ZLOGI("no account store:%{public}s, device:%{public}.10s, appId:%{public}s", Anonymous::Change(storeId).c_str(),
479             DistributedData::Serializable::Marshall(defaultAccountDevs).c_str(), convertedIds.first.c_str());
480         delegate_->SetEqualIdentifier(identifier, defaultAccountDevs);
481     }
482 }
483 
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)484 void KVDBGeneralStore::GetIdentifierParams(
485     std::vector<std::string> &devices, const std::vector<std::string> &uuids, int32_t authType)
486 {
487     for (const auto &devId : uuids) {
488         if (DMAdapter::GetInstance().IsOHOSType(devId)) {
489             continue;
490         }
491         if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
492             continue;
493         }
494         devices.push_back(devId);
495     }
496     ZLOGI("devices size: %{public}zu", devices.size());
497 }
498 
PreSharing(GenQuery & query)499 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
500 {
501     return { GeneralError::E_NOT_SUPPORT, nullptr };
502 }
503 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)504 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
505 {
506     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
507         return GeneralError::E_INVALID_ARGS;
508     }
509     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
510     if (delegate_ == nullptr) {
511         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
512             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
513         return GeneralError::E_ALREADY_CLOSED;
514     }
515     DBStatus status = OK;
516     switch (mode) {
517         case CLOUD_INFO:
518             status = delegate_->RemoveDeviceData(
519                 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
520             break;
521         case CLOUD_DATA:
522             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
523             break;
524         case NEARBY_DATA:
525             if (devices.empty()) {
526                 status = delegate_->RemoveDeviceData();
527                 break;
528             }
529             for (auto device : devices) {
530                 status = delegate_->RemoveDeviceData(device);
531             }
532             break;
533         default:
534             return GeneralError::E_ERROR;
535     }
536     return ConvertStatus(status);
537 }
538 
Watch(int32_t origin,Watcher & watcher)539 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
540 {
541     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
542         return GeneralError::E_INVALID_ARGS;
543     }
544 
545     observer_.watcher_ = &watcher;
546     return GeneralError::E_OK;
547 }
548 
Unwatch(int32_t origin,Watcher & watcher)549 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
550 {
551     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
552         return GeneralError::E_INVALID_ARGS;
553     }
554 
555     observer_.watcher_ = nullptr;
556     return GeneralError::E_OK;
557 }
558 
Release()559 int32_t KVDBGeneralStore::Release()
560 {
561     auto ref = 1;
562     {
563         std::lock_guard<decltype(mutex_)> lock(mutex_);
564         if (ref_ == 0) {
565             return 0;
566         }
567         ref = --ref_;
568     }
569     ZLOGD("ref:%{public}d", ref);
570     if (ref == 0) {
571         delete this;
572     }
573     return ref;
574 }
575 
AddRef()576 int32_t KVDBGeneralStore::AddRef()
577 {
578     std::lock_guard<decltype(mutex_)> lock(mutex_);
579     if (ref_ == 0) {
580         return 0;
581     }
582     return ++ref_;
583 }
584 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)585 int32_t KVDBGeneralStore::SetDistributedTables(
586     const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
587 {
588     return GeneralError::E_OK;
589 }
590 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)591 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
592     const std::set<std::string> &extendColNames, bool isForceUpgrade)
593 {
594     return GeneralError::E_OK;
595 }
596 
ConvertStatus(DistributedDB::DBStatus status)597 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
598 {
599     auto it = dbStatusMap_.find(status);
600     if (it != dbStatusMap_.end()) {
601         return it->second;
602     }
603     ZLOGI("status:0x%{public}x", status);
604     return GenErr::E_ERROR;
605 }
606 
IsValid()607 bool KVDBGeneralStore::IsValid()
608 {
609     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
610     return delegate_ != nullptr;
611 }
612 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)613 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
614 {
615     return GenErr::E_OK;
616 }
617 
UnregisterDetailProgressObserver()618 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
619 {
620     return GenErr::E_OK;
621 }
622 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)623 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
624 {
625     if (!HasWatcher()) {
626         return;
627     }
628     GenOrigin genOrigin;
629     genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
630     genOrigin.id.push_back(originalId);
631     genOrigin.store = storeId_;
632     Watcher::ChangeInfo changeInfo;
633     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
634         auto &info = changeInfo[storeId_][i];
635         for (auto &priData : data.primaryData[i]) {
636             Watcher::PRIValue value;
637             if (priData.empty()) {
638                 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
639                     Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
640                     Anonymous::Change(originalId).c_str(), i);
641                 continue;
642             }
643             Convert(std::move(*(priData.begin())), value);
644             info.push_back(std::move(value));
645         }
646     }
647     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
648 }
649 
OnChange(const DistributedDB::KvStoreChangedData & data)650 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
651 {
652     if (!HasWatcher()) {
653         return;
654     }
655     const auto &inserts = data.GetEntriesInserted();
656     const auto &deletes = data.GetEntriesDeleted();
657     const auto &updates = data.GetEntriesUpdated();
658     Watcher::ChangeData changeData;
659     ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
660     ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
661     ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
662     GenOrigin genOrigin;
663     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
664     genOrigin.store = storeId_;
665 
666     watcher_->OnChange(genOrigin, {}, std::move(changeData));
667 }
668 
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)669 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
670 {
671     for (auto &entry : entries) {
672         auto value = std::vector<Value>{ entry.key, entry.value };
673         values.push_back(value);
674     }
675 }
676 
GetDBProcessCB(DetailAsync async)677 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
678 {
679     return [async](const std::map<std::string, SyncProcess> &processes) {
680         if (!async) {
681             return;
682         }
683         DistributedData::GenDetails details;
684         for (auto &[id, process] : processes) {
685             auto &detail = details[id];
686             detail.progress = process.process;
687             detail.code = ConvertStatus(process.errCode);
688             detail.dbCode = process.errCode;
689             for (auto [key, value] : process.tableProcess) {
690                 auto &table = detail.details[key];
691                 table.upload.total = value.upLoadInfo.total;
692                 table.upload.success = value.upLoadInfo.successCount;
693                 table.upload.failed = value.upLoadInfo.failCount;
694                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
695                 table.download.total = value.downLoadInfo.total;
696                 table.download.success = value.downLoadInfo.successCount;
697                 table.download.failed = value.downLoadInfo.failCount;
698                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
699                 detail.changeCount = (process.process == FINISHED)
700                                          ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
701                                                value.downLoadInfo.deleteCount
702                                          : 0;
703             }
704         }
705         if (async) {
706             async(details);
707         }
708     };
709 }
710 
SetDBPushDataInterceptor(int32_t storeType)711 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
712 {
713     delegate_->SetPushDataInterceptor(
714         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
715             int errCode = DBStatus::OK;
716             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
717                 return errCode;
718             }
719             if (targetID.empty()) {
720                 ZLOGE("targetID empty");
721                 return static_cast<int>(DBStatus::DB_ERROR);
722             }
723             auto entries = data.GetEntries();
724             for (size_t i = 0; i < entries.size(); i++) {
725                 if (entries[i].key.empty()) {
726                     continue;
727                 }
728                 auto oriKey = entries[i].key;
729                 auto newKey = GetNewKey(oriKey, sourceID);
730                 errCode = data.ModifyKey(i, newKey);
731                 if (errCode != DBStatus::OK) {
732                     ZLOGE("ModifyKey err: %{public}d", errCode);
733                     break;
734                 }
735             }
736             return errCode;
737         });
738 }
739 
SetDBReceiveDataInterceptor(int32_t storeType)740 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
741 {
742     delegate_->SetReceiveDataInterceptor(
743         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
744             int errCode = DBStatus::OK;
745             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
746                 return errCode;
747             }
748             if (sourceID.empty()) {
749                 ZLOGE("sourceID empty");
750                 return static_cast<int>(DBStatus::DB_ERROR);
751             }
752             auto entries = data.GetEntries();
753             for (size_t i = 0; i < entries.size(); i++) {
754                 if (entries[i].key.empty()) {
755                     continue;
756                 }
757 
758                 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
759                 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
760                 if (encyptedUuid.empty()) {
761                     ZLOGE("get encyptedUuid failed");
762                     continue;
763                 }
764 
765                 auto oriKey = entries[i].key;
766                 auto newKey = GetNewKey(oriKey, encyptedUuid);
767                 errCode = data.ModifyKey(i, newKey);
768                 if (errCode != DBStatus::OK) {
769                     ZLOGE("ModifyKey err: %{public}d", errCode);
770                     break;
771                 }
772             }
773             return errCode;
774         });
775 }
776 
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)777 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
778 {
779     uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
780     remoteLen = le32toh(remoteLen);
781     uint32_t uuidLen = uuid.size();
782 
783     std::vector<uint8_t> out;
784     std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
785     out.insert(out.end(), uuid.begin(), uuid.end());
786     out.insert(out.end(), oriKey.begin(), oriKey.end());
787     uuidLen = htole32(uuidLen);
788     uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
789     out.insert(out.end(), buf, buf + sizeof(uuidLen));
790     return out;
791 }
792 
SetConfig(const GeneralStore::StoreConfig & storeConfig)793 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
794 {
795     enableCloud_ = storeConfig.enableCloud_;
796 }
797 
LockCloudDB()798 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
799 {
800     return { GeneralError::E_NOT_SUPPORT, 0 };
801 }
802 
UnLockCloudDB()803 int32_t KVDBGeneralStore::UnLockCloudDB()
804 {
805     return GeneralError::E_NOT_SUPPORT;
806 }
807 
SetExecutor(std::shared_ptr<Executor> executor)808 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
809 {
810     return;
811 }
812 } // namespace OHOS::DistributedKv