• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KVDBGeneralStore"
16 #include "kvdb_general_store.h"
17 
18 #include <endian.h>
19 #include "bootstrap.h"
20 #include "checker/checker_manager.h"
21 #include "cloud/cloud_sync_finished_event.h"
22 #include "cloud/schema_meta.h"
23 #include "crypto_manager.h"
24 #include "device_matrix.h"
25 #include "directory/directory_manager.h"
26 #include "eventcenter/event_center.h"
27 #include "kvdb_query.h"
28 #include "log_print.h"
29 #include "metadata/meta_data_manager.h"
30 #include "metadata/secret_key_meta_data.h"
31 #include "metadata/store_meta_data_local.h"
32 #include "query_helper.h"
33 #include "rdb_cloud.h"
34 #include "snapshot/bind_event.h"
35 #include "types.h"
36 #include "user_delegate.h"
37 #include "utils/anonymous.h"
38 #include "water_version_manager.h"
39 #include "device_manager_adapter.h"
40 
41 namespace OHOS::DistributedKv {
42 using namespace DistributedData;
43 using namespace DistributedDB;
44 using DBField = DistributedDB::Field;
45 using DBTable = DistributedDB::TableSchema;
46 using DBSchema = DistributedDB::DataBaseSchema;
47 using ClearMode = DistributedDB::ClearMode;
48 using DMAdapter = DistributedData::DeviceManagerAdapter;
49 using DBInterceptedData = DistributedDB::InterceptedData;
50 constexpr int UUID_WIDTH = 4;
51 const std::map<DBStatus, KVDBGeneralStore::GenErr> KVDBGeneralStore::dbStatusMap_ = {
52     { DBStatus::OK, GenErr::E_OK },
53     { DBStatus::CLOUD_NETWORK_ERROR, GenErr::E_NETWORK_ERROR },
54     { DBStatus::CLOUD_LOCK_ERROR, GenErr::E_LOCKED_BY_OTHERS },
55     { DBStatus::CLOUD_FULL_RECORDS, GenErr::E_RECODE_LIMIT_EXCEEDED },
56     { DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT, GenErr::E_NO_SPACE_FOR_ASSET },
57     { DBStatus::CLOUD_SYNC_TASK_MERGED, GenErr::E_SYNC_TASK_MERGED },
58     { DBStatus::BUSY, GenErr::E_DB_ERROR },
59     { DBStatus::DB_ERROR, GenErr::E_DB_ERROR },
60     { DBStatus::INVALID_ARGS, GenErr::E_INVALID_ARGS },
61     { DBStatus::NOT_FOUND, GenErr::E_RECORD_NOT_FOUND },
62     { DBStatus::INVALID_VALUE_FIELDS, GenErr::E_INVALID_VALUE_FIELDS },
63     { DBStatus::INVALID_FIELD_TYPE, GenErr::E_INVALID_FIELD_TYPE },
64     { DBStatus::CONSTRAIN_VIOLATION, GenErr::E_CONSTRAIN_VIOLATION },
65     { DBStatus::INVALID_FORMAT, GenErr::E_INVALID_FORMAT },
66     { DBStatus::INVALID_QUERY_FORMAT, GenErr::E_INVALID_QUERY_FORMAT },
67     { DBStatus::INVALID_QUERY_FIELD, GenErr::E_INVALID_QUERY_FIELD },
68     { DBStatus::NOT_SUPPORT, GenErr::E_NOT_SUPPORT },
69     { DBStatus::TIME_OUT, GenErr::E_TIME_OUT },
70     { DBStatus::OVER_MAX_LIMITS, GenErr::E_OVER_MAX_LIMITS },
71     { DBStatus::EKEYREVOKED_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
72     { DBStatus::SECURITY_OPTION_CHECK_ERROR, GenErr::E_SECURITY_LEVEL_ERROR },
73 };
74 
75 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
GetDBSchema(const Database & database)76 static DBSchema GetDBSchema(const Database &database)
77 {
78     DBSchema schema;
79     schema.tables.resize(database.tables.size());
80     for (size_t i = 0; i < database.tables.size(); i++) {
81         const Table &table = database.tables[i];
82         DBTable &dbTable = schema.tables[i];
83         dbTable.name = table.name;
84         dbTable.sharedTableName = table.sharedTableName;
85         for (auto &field : table.fields) {
86             DBField dbField;
87             dbField.colName = field.colName;
88             dbField.type = field.type;
89             dbField.primary = field.primary;
90             dbField.nullable = field.nullable;
91             dbTable.fields.push_back(std::move(dbField));
92         }
93     }
94     return schema;
95 }
GetDBPassword(const StoreMetaData & data)96 KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data)
97 {
98     DBPassword dbPassword;
99     if (!data.isEncrypt) {
100         return dbPassword;
101     }
102 
103     SecretKeyMetaData secretKey;
104     secretKey.storeType = data.storeType;
105     auto storeKey = data.GetSecretKey();
106     MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
107     std::vector<uint8_t> password;
108     CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
109     dbPassword.SetValue(password.data(), password.size());
110     password.assign(password.size(), 0);
111     return dbPassword;
112 }
113 
GetDBSecurity(int32_t secLevel)114 KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel)
115 {
116     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
117         return { DistributedDB::NOT_SET, DistributedDB::ECE };
118     }
119     if (secLevel == SecurityLevel::S3) {
120         return { DistributedDB::S3, DistributedDB::SECE };
121     }
122     if (secLevel == SecurityLevel::S4) {
123         return { DistributedDB::S4, DistributedDB::ECE };
124     }
125     return { secLevel, DistributedDB::ECE };
126 }
127 
GetDBOption(const StoreMetaData & data,const DBPassword & password)128 KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password)
129 {
130     DBOption dbOption;
131     dbOption.createIfNecessary = false;
132     dbOption.isMemoryDb = false;
133     dbOption.isEncryptedDb = data.isEncrypt;
134     dbOption.isNeedCompressOnSync = data.isNeedCompress;
135     if (data.isEncrypt) {
136         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
137         dbOption.passwd = password;
138     }
139     StoreMetaDataLocal local;
140     MetaDataManager::GetInstance().LoadMeta(data.GetKeyLocal(), local, true);
141     if (local.isPublic || data.storeType == KvStoreType::DEVICE_COLLABORATION) {
142         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
143     } else if (data.storeType == KvStoreType::SINGLE_VERSION) {
144         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
145     }
146     if (data.appId == Bootstrap::GetInstance().GetProcessLabel()) {
147         dbOption.compressionRate = META_COMPRESS_RATE;
148         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
149     } else {
150         dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
151     }
152     dbOption.schema = data.schema;
153     dbOption.createDirByStoreIdOnly = true;
154     dbOption.secOption = GetDBSecurity(data.securityLevel);
155     return dbOption;
156 }
157 
KVDBGeneralStore(const StoreMetaData & meta)158 KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta)
159     : manager_(meta.appId, meta.appId == Bootstrap::GetInstance().GetProcessLabel() ? defaultAccountId : meta.user,
160           meta.instanceId)
161 {
162     observer_.storeId_ = meta.storeId;
163     StoreMetaDataLocal local;
164     MetaDataManager::GetInstance().LoadMeta(meta.GetKeyLocal(), local, true);
165     isPublic_ = local.isPublic;
166     DBStatus status = DBStatus::NOT_FOUND;
167     manager_.SetKvStoreConfig({ meta.dataDir });
168     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
169     manager_.GetKvStore(
170         meta.storeId, GetDBOption(meta, GetDBPassword(meta)), [&status, this](auto dbStatus, auto *tmpStore) {
171             status = dbStatus;
172             delegate_ = tmpStore;
173         });
174     if (delegate_ == nullptr || status != DBStatus::OK) {
175         ZLOGE("GetKvStore failed. delegate is null?[%{public}d], status = %{public}d", delegate_ == nullptr, status);
176         manager_.CloseKvStore(delegate_);
177         return;
178     }
179     SetDBPushDataInterceptor(meta.storeType);
180     SetDBReceiveDataInterceptor(meta.storeType);
181     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &observer_);
182     delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_CLOUD, &observer_);
183     if (DeviceMatrix::GetInstance().IsDynamic(meta) || DeviceMatrix::GetInstance().IsStatics(meta)) {
184         delegate_->SetRemotePushFinishedNotify([meta](const DistributedDB::RemotePushNotifyInfo &info) {
185             DeviceMatrix::GetInstance().OnExchanged(info.deviceId, meta, DeviceMatrix::ChangeType::CHANGE_REMOTE);
186         });
187     }
188     if (meta.isAutoSync) {
189         bool param = true;
190         auto data = static_cast<DistributedDB::PragmaData>(&param);
191         delegate_->Pragma(DistributedDB::SET_SYNC_RETRY, data);
192     }
193     storeInfo_.tokenId = meta.tokenId;
194     storeInfo_.bundleName = meta.bundleName;
195     storeInfo_.storeName = meta.storeId;
196     storeInfo_.instanceId = meta.instanceId;
197     storeInfo_.user = std::stoi(meta.user);
198     enableCloud_ = meta.enableCloud;
199 }
200 
~KVDBGeneralStore()201 KVDBGeneralStore::~KVDBGeneralStore()
202 {
203     {
204         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
205         if (delegate_ != nullptr) {
206             delegate_->UnRegisterObserver(&observer_);
207         }
208         manager_.CloseKvStore(delegate_);
209         delegate_ = nullptr;
210     }
211     for (auto &bindInfo_ : bindInfos_) {
212         if (bindInfo_.db_ != nullptr) {
213             bindInfo_.db_->Close();
214         }
215     }
216     bindInfos_.clear();
217     dbClouds_.clear();
218 }
219 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)220 int32_t KVDBGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
221 {
222     return GenErr::E_NOT_SUPPORT;
223 }
224 
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)225 int32_t KVDBGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
226     const CloudConfig &config)
227 {
228     if (bindInfos.empty()) {
229         ZLOGW("No cloudDB!");
230         return GeneralError::E_OK;
231     }
232 
233     std::map<std::string, DataBaseSchema> schemas;
234     auto dbSchema = GetDBSchema(database);
235     for (auto &[userId, bindInfo] : bindInfos) {
236         if (bindInfo.db_ == nullptr) {
237             return GeneralError::E_INVALID_ARGS;
238         }
239 
240         if (isBound_.exchange(true)) {
241             return GeneralError::E_OK;
242         }
243 
244         dbClouds_.insert({ std::to_string(userId), std::make_shared<DistributedRdb::RdbCloud>(bindInfo.db_, nullptr) });
245         bindInfos_.insert(std::move(bindInfo));
246         schemas.insert({ std::to_string(userId), dbSchema });
247     }
248     DistributedDB::CloudSyncConfig dbConfig;
249     dbConfig.maxUploadCount = config.maxNumber;
250     dbConfig.maxUploadSize = config.maxSize;
251     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
252     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
253     if (delegate_ == nullptr) {
254         return GeneralError::E_ALREADY_CLOSED;
255     }
256     delegate_->SetCloudDB(dbClouds_);
257     delegate_->SetCloudDbSchema(std::move(schemas));
258     delegate_->SetCloudSyncConfig(dbConfig);
259     return GeneralError::E_OK;
260 }
261 
IsBound()262 bool KVDBGeneralStore::IsBound()
263 {
264     return isBound_;
265 }
266 
Close(bool isForce)267 int32_t KVDBGeneralStore::Close(bool isForce)
268 {
269     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
270     if (!lock) {
271         return GeneralError::E_BUSY;
272     }
273 
274     if (delegate_ == nullptr) {
275         return GeneralError::E_OK;
276     }
277     if (!isForce && delegate_->GetTaskCount() > 0) {
278         return GeneralError::E_BUSY;
279     }
280     if (delegate_ != nullptr) {
281         delegate_->UnRegisterObserver(&observer_);
282     }
283     auto status = manager_.CloseKvStore(delegate_);
284     if (status != DBStatus::OK) {
285         return status;
286     }
287     delegate_ = nullptr;
288     return GeneralError::E_OK;
289 }
290 
Execute(const std::string & table,const std::string & sql)291 int32_t KVDBGeneralStore::Execute(const std::string &table, const std::string &sql)
292 {
293     return GeneralError::E_NOT_SUPPORT;
294 }
295 
Insert(const std::string & table,VBuckets && values)296 int32_t KVDBGeneralStore::Insert(const std::string &table, VBuckets &&values)
297 {
298     return GeneralError::E_NOT_SUPPORT;
299 }
300 
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)301 int32_t KVDBGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
302     const std::string &whereSql, Values &&conditions)
303 {
304     return GeneralError::E_NOT_SUPPORT;
305 }
306 
Delete(const std::string & table,const std::string & sql,Values && args)307 int32_t KVDBGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
308 {
309     return GeneralError::E_NOT_SUPPORT;
310 }
311 
Replace(const std::string & table,VBucket && value)312 int32_t KVDBGeneralStore::Replace(const std::string &table, VBucket &&value)
313 {
314     return GeneralError::E_NOT_SUPPORT;
315 }
316 
Query(const std::string & table,const std::string & sql,Values && args)317 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(
318     __attribute__((unused)) const std::string &table, const std::string &sql, Values &&args)
319 {
320     return { GeneralError::E_NOT_SUPPORT, nullptr };
321 }
322 
Query(const std::string & table,GenQuery & query)323 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::Query(const std::string &table, GenQuery &query)
324 {
325     return { GeneralError::E_NOT_SUPPORT, nullptr };
326 }
327 
MergeMigratedData(const std::string & tableName,VBuckets && values)328 int32_t KVDBGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
329 {
330     return GeneralError::E_NOT_SUPPORT;
331 }
332 
CleanTrackerData(const std::string & tableName,int64_t cursor)333 int32_t KVDBGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
334 {
335     return GeneralError::E_NOT_SUPPORT;
336 }
337 
GetDBSyncCompleteCB(DetailAsync async)338 KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsync async)
339 {
340     if (!async) {
341         return [](auto &) {};
342     }
343     return [async = std::move(async)](const std::map<std::string, DBStatus> &status) {
344         GenDetails details;
345         for (auto &[key, dbStatus] : status) {
346             auto &value = details[key];
347             value.progress = FINISHED;
348             value.code = GeneralError::E_OK;
349             if (dbStatus != DBStatus::OK) {
350                 value.code = dbStatus;
351             }
352         }
353         async(details);
354     };
355 }
356 
CloudSync(const Devices & devices,DistributedDB::SyncMode cloudSyncMode,DetailAsync async,int64_t wait,const std::string & prepareTraceId)357 DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async,
358     int64_t wait, const std::string &prepareTraceId)
359 {
360     DistributedDB::CloudSyncOption syncOption;
361     syncOption.devices = devices;
362     syncOption.mode = cloudSyncMode;
363     syncOption.waitTime = wait;
364     syncOption.prepareTraceId = prepareTraceId;
365     syncOption.lockAction = DistributedDB::LockAction::NONE;
366     if (storeInfo_.user == 0) {
367         std::vector<int32_t> users;
368         AccountDelegate::GetInstance()->QueryUsers(users);
369         syncOption.users.push_back(std::to_string(users[0]));
370     } else {
371         syncOption.users.push_back(std::to_string(storeInfo_.user));
372     }
373     return delegate_->Sync(syncOption, GetDBProcessCB(async));
374 }
375 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,SyncParam & syncParam)376 int32_t KVDBGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParam)
377 {
378     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
379     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
380     if (delegate_ == nullptr) {
381         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
382             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode);
383         return GeneralError::E_ALREADY_CLOSED;
384     }
385     DBStatus dbStatus;
386     auto dbMode = DistributedDB::SyncMode(syncMode);
387     if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
388         if (!enableCloud_) {
389             return GeneralError::E_NOT_SUPPORT;
390         }
391         dbStatus = CloudSync(devices, dbMode, async, syncParam.wait, syncParam.prepareTraceId);
392     } else {
393         if (devices.empty()) {
394             ZLOGE("Devices is empty! mode:%{public}d", syncParam.mode);
395             return GeneralError::E_INVALID_ARGS;
396         }
397         KVDBQuery *kvQuery = nullptr;
398         auto ret = query.QueryInterface(kvQuery);
399         DistributedDB::Query dbQuery;
400         if (ret == GeneralError::E_OK && kvQuery != nullptr && kvQuery->IsValidQuery()) {
401             dbQuery = kvQuery->GetDBQuery();
402         } else {
403             return GeneralError::E_INVALID_ARGS;
404         }
405         if (syncMode == NEARBY_SUBSCRIBE_REMOTE) {
406             dbStatus = delegate_->SubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
407         } else if (syncMode == NEARBY_UNSUBSCRIBE_REMOTE) {
408             dbStatus =
409                 delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
410         } else if (syncMode < NEARBY_END) {
411             if (kvQuery->IsEmpty()) {
412                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false);
413             } else {
414                 dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false);
415             }
416         } else {
417             ZLOGE("Err sync mode! sync mode:%{public}d", syncMode);
418             dbStatus = DistributedDB::INVALID_ARGS;
419         }
420     }
421     return ConvertStatus(dbStatus);
422 }
423 
SetEqualIdentifier(const std::string & appId,const std::string & storeId)424 void KVDBGeneralStore::SetEqualIdentifier(const std::string &appId, const std::string &storeId)
425 {
426     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
427     if (delegate_ == nullptr) {
428         ZLOGE("store already closed! appId:%{public}s storeId:%{public}s", appId.c_str(),
429             Anonymous::Change(storeId).c_str());
430         return;
431     }
432     std::vector<std::string> sameAccountDevs {};
433     std::vector<std::string> defaultAccountDevs {};
434     auto uuids = DMAdapter::ToUUID(DMAdapter::GetInstance().GetRemoteDevices());
435     GetIdentifierParams(sameAccountDevs, uuids, IDENTICAL_ACCOUNT);
436     GetIdentifierParams(defaultAccountDevs, uuids, NO_ACCOUNT);
437     if (!sameAccountDevs.empty()) {
438         auto accountId = AccountDelegate::GetInstance()->GetUnencryptedAccountId();
439         auto syncIdentifier = KvManager::GetKvStoreIdentifier(accountId, appId, storeId);
440         ZLOGI("same account set compatible identifier store:%{public}s, user:%{public}s, device:%{public}.10s",
441             Anonymous::Change(storeId).c_str(), Anonymous::Change(accountId).c_str(),
442             DistributedData::Serializable::Marshall(sameAccountDevs).c_str());
443         delegate_->SetEqualIdentifier(syncIdentifier, sameAccountDevs);
444     }
445     if (!defaultAccountDevs.empty()) {
446         auto syncIdentifier = KvManager::GetKvStoreIdentifier(defaultAccountId, appId, storeId);
447         ZLOGI("no account set compatible identifier store:%{public}s, device:%{public}.10s",
448             Anonymous::Change(storeId).c_str(),
449             DistributedData::Serializable::Marshall(defaultAccountDevs).c_str());
450         delegate_->SetEqualIdentifier(syncIdentifier, defaultAccountDevs);
451     }
452 }
453 
GetIdentifierParams(std::vector<std::string> & devices,const std::vector<std::string> & uuids,int32_t authType)454 void KVDBGeneralStore::GetIdentifierParams(std::vector<std::string> &devices,
455     const std::vector<std::string> &uuids, int32_t authType)
456 {
457     for (const auto &devId : uuids) {
458         if (DMAdapter::GetInstance().IsOHOSType(devId)) {
459             continue;
460         }
461         if (DMAdapter::GetInstance().GetAuthType(devId) != authType) {
462             continue;
463         }
464         devices.push_back(devId);
465     }
466     ZLOGI("devices size: %{public}zu", devices.size());
467 }
468 
PreSharing(GenQuery & query)469 std::pair<int32_t, std::shared_ptr<Cursor>> KVDBGeneralStore::PreSharing(GenQuery &query)
470 {
471     return { GeneralError::E_NOT_SUPPORT, nullptr };
472 }
473 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)474 int32_t KVDBGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
475 {
476     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
477         return GeneralError::E_INVALID_ARGS;
478     }
479     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
480     if (delegate_ == nullptr) {
481         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d", devices.size(),
482             devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode);
483         return GeneralError::E_ALREADY_CLOSED;
484     }
485     DBStatus status = OK;
486     switch (mode) {
487         case CLOUD_INFO:
488             status = delegate_->RemoveDeviceData(
489                 "", isPublic_ ? static_cast<ClearMode>(CLOUD_DATA) : static_cast<ClearMode>(CLOUD_INFO));
490             break;
491         case CLOUD_DATA:
492             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
493             break;
494         case NEARBY_DATA:
495             if (devices.empty()) {
496                 status = delegate_->RemoveDeviceData();
497                 break;
498             }
499             for (auto device : devices) {
500                 status = delegate_->RemoveDeviceData(device);
501             }
502             break;
503         default:
504             return GeneralError::E_ERROR;
505     }
506     return ConvertStatus(status);
507 }
508 
Watch(int32_t origin,Watcher & watcher)509 int32_t KVDBGeneralStore::Watch(int32_t origin, Watcher &watcher)
510 {
511     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
512         return GeneralError::E_INVALID_ARGS;
513     }
514 
515     observer_.watcher_ = &watcher;
516     return GeneralError::E_OK;
517 }
518 
Unwatch(int32_t origin,Watcher & watcher)519 int32_t KVDBGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
520 {
521     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
522         return GeneralError::E_INVALID_ARGS;
523     }
524 
525     observer_.watcher_ = nullptr;
526     return GeneralError::E_OK;
527 }
528 
Release()529 int32_t KVDBGeneralStore::Release()
530 {
531     auto ref = 1;
532     {
533         std::lock_guard<decltype(mutex_)> lock(mutex_);
534         if (ref_ == 0) {
535             return 0;
536         }
537         ref = --ref_;
538     }
539     ZLOGD("ref:%{public}d", ref);
540     if (ref == 0) {
541         delete this;
542     }
543     return ref;
544 }
545 
AddRef()546 int32_t KVDBGeneralStore::AddRef()
547 {
548     std::lock_guard<decltype(mutex_)> lock(mutex_);
549     if (ref_ == 0) {
550         return 0;
551     }
552     return ++ref_;
553 }
554 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)555 int32_t KVDBGeneralStore::SetDistributedTables(
556     const std::vector<std::string> &tables, int32_t type, const std::vector<Reference> &references)
557 {
558     return GeneralError::E_OK;
559 }
560 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)561 int32_t KVDBGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
562     const std::string &extendColName, bool isForceUpgrade)
563 {
564     return GeneralError::E_OK;
565 }
566 
ConvertStatus(DistributedDB::DBStatus status)567 KVDBGeneralStore::GenErr KVDBGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
568 {
569     auto it = dbStatusMap_.find(status);
570     if (it != dbStatusMap_.end()) {
571         return it->second;
572     }
573     ZLOGI("status:0x%{public}x", status);
574     return GenErr::E_ERROR;
575 }
576 
IsValid()577 bool KVDBGeneralStore::IsValid()
578 {
579     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
580     return delegate_ != nullptr;
581 }
582 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)583 int32_t KVDBGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
584 {
585     return GenErr::E_OK;
586 }
587 
UnregisterDetailProgressObserver()588 int32_t KVDBGeneralStore::UnregisterDetailProgressObserver()
589 {
590     return GenErr::E_OK;
591 }
592 
GetWaterVersion(const std::string & deviceId)593 std::vector<std::string> KVDBGeneralStore::GetWaterVersion(const std::string &deviceId)
594 {
595     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
596     if (delegate_ == nullptr) {
597         ZLOGE("store already closed! deviceId:%{public}s", Anonymous::Change(deviceId).c_str());
598         return {};
599     }
600     auto [status, versions] = delegate_->GetCloudVersion(deviceId);
601     if (status != DBStatus::OK || versions.empty()) {
602         return {};
603     }
604     std::vector<std::string> res;
605     for (auto &[_, version] : versions) {
606         res.push_back(std::move(version));
607     }
608     return res;
609 }
610 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)611 void KVDBGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
612 {
613     if (!HasWatcher()) {
614         return;
615     }
616     GenOrigin genOrigin;
617     genOrigin.origin = (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
618     genOrigin.id.push_back(originalId);
619     genOrigin.store = storeId_;
620     Watcher::ChangeInfo changeInfo;
621     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
622         auto &info = changeInfo[storeId_][i];
623         for (auto &priData : data.primaryData[i]) {
624             Watcher::PRIValue value;
625             Convert(std::move(*(priData.begin())), value);
626             info.push_back(std::move(value));
627         }
628     }
629     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
630 }
631 
OnChange(const DistributedDB::KvStoreChangedData & data)632 void KVDBGeneralStore::ObserverProxy::OnChange(const DistributedDB::KvStoreChangedData &data)
633 {
634     if (!HasWatcher()) {
635         return;
636     }
637     const auto &inserts = data.GetEntriesInserted();
638     const auto &deletes = data.GetEntriesDeleted();
639     const auto &updates = data.GetEntriesUpdated();
640     Watcher::ChangeData changeData;
641     ConvertChangeData(inserts, changeData[storeId_][DistributedDB::OP_INSERT]);
642     ConvertChangeData(deletes, changeData[storeId_][DistributedDB::OP_DELETE]);
643     ConvertChangeData(updates, changeData[storeId_][DistributedDB::OP_UPDATE]);
644     GenOrigin genOrigin;
645     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
646     genOrigin.store = storeId_;
647 
648     watcher_->OnChange(genOrigin, {}, std::move(changeData));
649 }
650 
ConvertChangeData(const std::list<DBEntry> & entries,std::vector<Values> & values)651 void KVDBGeneralStore::ObserverProxy::ConvertChangeData(const std::list<DBEntry> &entries, std::vector<Values> &values)
652 {
653     for (auto &entry : entries) {
654         auto value = std::vector<Value>{ entry.key, entry.value };
655         values.push_back(value);
656     }
657 }
658 
GetDBProcessCB(DetailAsync async)659 KVDBGeneralStore::DBProcessCB KVDBGeneralStore::GetDBProcessCB(DetailAsync async)
660 {
661     return [async](const std::map<std::string, SyncProcess> &processes) {
662         if (!async) {
663             return;
664         }
665         DistributedData::GenDetails details;
666         for (auto &[id, process] : processes) {
667             auto &detail = details[id];
668             detail.progress = process.process;
669             detail.code = ConvertStatus(process.errCode);
670             detail.dbCode = DB_ERR_OFFSET + process.errCode;
671             for (auto [key, value] : process.tableProcess) {
672                 auto &table = detail.details[key];
673                 table.upload.total = value.upLoadInfo.total;
674                 table.upload.success = value.upLoadInfo.successCount;
675                 table.upload.failed = value.upLoadInfo.failCount;
676                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
677                 table.download.total = value.downLoadInfo.total;
678                 table.download.success = value.downLoadInfo.successCount;
679                 table.download.failed = value.downLoadInfo.failCount;
680                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
681                 detail.changeCount = (process.process == FINISHED)
682                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
683                                               value.downLoadInfo.deleteCount
684                                         : 0;
685             }
686         }
687         if (async) {
688             async(details);
689         }
690     };
691 }
692 
SetDBPushDataInterceptor(int32_t storeType)693 void KVDBGeneralStore::SetDBPushDataInterceptor(int32_t storeType)
694 {
695     delegate_->SetPushDataInterceptor(
696         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
697             int errCode = DBStatus::OK;
698             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(targetID)) {
699                 return errCode;
700             }
701             if (targetID.empty()) {
702                 ZLOGE("targetID empty");
703                 return static_cast<int>(DBStatus::DB_ERROR);
704             }
705             auto entries = data.GetEntries();
706             for (size_t i = 0; i < entries.size(); i++) {
707                 if (entries[i].key.empty()) {
708                     continue;
709                 }
710                 auto oriKey = entries[i].key;
711                 auto newKey = GetNewKey(oriKey, sourceID);
712                 errCode = data.ModifyKey(i, newKey);
713                 if (errCode != DBStatus::OK) {
714                     ZLOGE("ModifyKey err: %{public}d", errCode);
715                     break;
716                 }
717             }
718             return errCode;
719         }
720     );
721 }
722 
SetDBReceiveDataInterceptor(int32_t storeType)723 void KVDBGeneralStore::SetDBReceiveDataInterceptor(int32_t storeType)
724 {
725     delegate_->SetReceiveDataInterceptor(
726         [this, storeType](DBInterceptedData &data, const std::string &sourceID, const std::string &targetID) {
727             int errCode = DBStatus::OK;
728             if (storeType != KvStoreType::DEVICE_COLLABORATION || DMAdapter::GetInstance().IsOHOSType(sourceID)) {
729                 return errCode;
730             }
731             if (sourceID.empty()) {
732                 ZLOGE("targetID empty");
733                 return static_cast<int>(DBStatus::DB_ERROR);
734             }
735             auto entries = data.GetEntries();
736             for (size_t i = 0; i < entries.size(); i++) {
737                 if (entries[i].key.empty()) {
738                     continue;
739                 }
740 
741                 auto networkId = DMAdapter::GetInstance().ToNetworkID(sourceID);
742                 auto encyptedUuid = DMAdapter::GetInstance().GetEncryptedUuidByNetworkId(networkId);
743                 if (encyptedUuid.empty()) {
744                     ZLOGE("get encyptedUuid failed");
745                     continue;
746                 }
747 
748                 auto oriKey = entries[i].key;
749                 auto newKey = GetNewKey(oriKey, encyptedUuid);
750                 errCode = data.ModifyKey(i, newKey);
751                 if (errCode != DBStatus::OK) {
752                     ZLOGE("ModifyKey err: %{public}d", errCode);
753                     break;
754                 }
755             }
756             return errCode;
757         }
758     );
759 }
760 
GetNewKey(std::vector<uint8_t> & key,const std::string & uuid)761 std::vector<uint8_t> KVDBGeneralStore::GetNewKey(std::vector<uint8_t> &key, const std::string &uuid)
762 {
763     uint32_t remoteLen = *(reinterpret_cast<uint32_t *>(&(*(key.end() - sizeof(uint32_t)))));
764     remoteLen = le32toh(remoteLen);
765     uint32_t uuidLen = uuid.size();
766 
767     std::vector<uint8_t> out;
768     std::vector<uint8_t> oriKey(key.begin() + remoteLen, key.end() - UUID_WIDTH);
769     out.insert(out.end(), uuid.begin(), uuid.end());
770     out.insert(out.end(), oriKey.begin(), oriKey.end());
771     uuidLen = htole32(uuidLen);
772     uint8_t *buf = reinterpret_cast<uint8_t *>(&uuidLen);
773     out.insert(out.end(), buf, buf + sizeof(uuidLen));
774     return out;
775 }
776 
SetConfig(const GeneralStore::StoreConfig & storeConfig)777 void KVDBGeneralStore::SetConfig(const GeneralStore::StoreConfig &storeConfig)
778 {
779     enableCloud_ = storeConfig.enableCloud_;
780 }
781 
LockCloudDB()782 std::pair<int32_t, uint32_t> KVDBGeneralStore::LockCloudDB()
783 {
784     return { GeneralError::E_NOT_SUPPORT, 0 };
785 }
786 
UnLockCloudDB()787 int32_t KVDBGeneralStore::UnLockCloudDB()
788 {
789     return GeneralError::E_NOT_SUPPORT;
790 }
791 
SetExecutor(std::shared_ptr<Executor> executor)792 void KVDBGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
793 {
794     return;
795 }
796 } // namespace OHOS::DistributedKv
797