• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbGeneralStore"
16 
17 #include "rdb_general_store.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "eventcenter/event_center.h"
35 #include "log_print.h"
36 #include "metadata/meta_data_manager.h"
37 #include "metadata/secret_key_meta_data.h"
38 #include "rdb_cursor.h"
39 #include "rdb_helper.h"
40 #include "rdb_query.h"
41 #include "rdb_result_set_impl.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 namespace OHOS::DistributedRdb {
47 using namespace DistributedData;
48 using namespace DistributedDB;
49 using namespace NativeRdb;
50 using namespace CloudData;
51 using namespace std::chrono;
52 using DBField = DistributedDB::Field;
53 using DBTable = DistributedDB::TableSchema;
54 using DBSchema = DistributedDB::DataBaseSchema;
55 using ClearMode = DistributedDB::ClearMode;
56 using DBStatus = DistributedDB::DBStatus;
57 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
58 
59 constexpr const char *INSERT = "INSERT INTO ";
60 constexpr const char *REPLACE = "REPLACE INTO ";
61 constexpr const char *VALUES = " VALUES ";
62 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
63 constexpr const LockAction LOCK_ACTION = static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) |
64     static_cast<uint32_t>(LockAction::UPDATE) | static_cast<uint32_t>(LockAction::DELETE) |
65     static_cast<uint32_t>(LockAction::DOWNLOAD));
66 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
67 constexpr uint32_t SEARCHABLE_FLAG = 2;
68 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
69 
GetDBSchema(const Database & database)70 static DBSchema GetDBSchema(const Database &database)
71 {
72     DBSchema schema;
73     schema.tables.resize(database.tables.size());
74     for (size_t i = 0; i < database.tables.size(); i++) {
75         const Table &table = database.tables[i];
76         DBTable &dbTable = schema.tables[i];
77         dbTable.name = table.name;
78         dbTable.sharedTableName = table.sharedTableName;
79         for (auto &field : table.fields) {
80             DBField dbField;
81             dbField.colName = field.colName;
82             dbField.type = field.type;
83             dbField.primary = field.primary;
84             dbField.nullable = field.nullable;
85             dbTable.fields.push_back(std::move(dbField));
86         }
87     }
88     return schema;
89 }
90 
RdbGeneralStore(const StoreMetaData & meta)91 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
92     : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
93 {
94     observer_.storeId_ = meta.storeId;
95     observer_.meta_ = meta;
96     RelationalStoreDelegate::Option option;
97     option.observer = &observer_;
98     if (meta.isEncrypt) {
99         std::string key = meta.GetSecretKey();
100         SecretKeyMetaData secretKeyMeta;
101         MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
102         std::vector<uint8_t> decryptKey;
103         CryptoManager::GetInstance().Decrypt(secretKeyMeta.sKey, decryptKey);
104         option.passwd.SetValue(decryptKey.data(), decryptKey.size());
105         std::fill(decryptKey.begin(), decryptKey.end(), 0);
106         option.isEncryptedDb = meta.isEncrypt;
107         option.cipher = CipherType::AES_256_GCM;
108         for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
109             option.iterateTimes = ITERS[i];
110             auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
111             if (ret == DBStatus::OK && delegate_ != nullptr) {
112                 break;
113             }
114             manager_.CloseStore(delegate_);
115             delegate_ = nullptr;
116         }
117     } else {
118         auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
119         if (ret != DBStatus::OK || delegate_ == nullptr) {
120             manager_.CloseStore(delegate_);
121             delegate_ = nullptr;
122         }
123     }
124 
125     storeInfo_.tokenId = meta.tokenId;
126     storeInfo_.bundleName = meta.bundleName;
127     storeInfo_.storeName = meta.storeId;
128     storeInfo_.instanceId = meta.instanceId;
129     storeInfo_.user = std::stoi(meta.user);
130     storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
131     if (meta.isSearchable) {
132         syncNotifyFlag_ |= SEARCHABLE_FLAG;
133     }
134 
135     if (delegate_ != nullptr && meta.isManualClean) {
136         PragmaData data =
137             static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
138         delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
139     }
140 }
141 
~RdbGeneralStore()142 RdbGeneralStore::~RdbGeneralStore()
143 {
144     manager_.CloseStore(delegate_);
145     delegate_ = nullptr;
146     bindInfo_.loader_ = nullptr;
147     if (bindInfo_.db_ != nullptr) {
148         bindInfo_.db_->Close();
149     }
150     bindInfo_.db_ = nullptr;
151     rdbCloud_ = nullptr;
152     rdbLoader_ = nullptr;
153     RemoveTasks();
154     tasks_ = nullptr;
155     executor_ = nullptr;
156 }
157 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)158 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
159 {
160     if (snapshots_.bindAssets == nullptr) {
161         snapshots_.bindAssets = bindAssets;
162     }
163     return GenErr::E_OK;
164 }
165 
Bind(Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)166 int32_t RdbGeneralStore::Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
167     const CloudConfig &config)
168 {
169     if (bindInfos.empty()) {
170         return GeneralError::E_OK;
171     }
172     auto bindInfo = bindInfos.begin()->second;
173     if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
174         return GeneralError::E_INVALID_ARGS;
175     }
176 
177     if (isBound_.exchange(true)) {
178         return GeneralError::E_OK;
179     }
180 
181     BindEvent::BindEventInfo eventInfo;
182     eventInfo.tokenId = storeInfo_.tokenId;
183     eventInfo.bundleName = storeInfo_.bundleName;
184     eventInfo.storeName = storeInfo_.storeName;
185     eventInfo.user = storeInfo_.user;
186     eventInfo.instanceId = storeInfo_.instanceId;
187 
188     auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
189     EventCenter::GetInstance().PostEvent(std::move(evt));
190     bindInfo_ = std::move(bindInfo);
191     {
192         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
193         rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
194         rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
195     }
196 
197     DistributedDB::CloudSyncConfig dbConfig;
198     dbConfig.maxUploadCount = config.maxNumber;
199     dbConfig.maxUploadSize = config.maxSize;
200     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
201     DBSchema schema = GetDBSchema(database);
202     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
203     if (delegate_ == nullptr) {
204         ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
205         return GeneralError::E_ALREADY_CLOSED;
206     }
207     delegate_->SetCloudDB(rdbCloud_);
208     delegate_->SetIAssetLoader(rdbLoader_);
209     delegate_->SetCloudDbSchema(std::move(schema));
210     delegate_->SetCloudSyncConfig(dbConfig);
211 
212     syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
213     return GeneralError::E_OK;
214 }
215 
IsBound()216 bool RdbGeneralStore::IsBound()
217 {
218     return isBound_;
219 }
220 
Close(bool isForce)221 int32_t RdbGeneralStore::Close(bool isForce)
222 {
223     {
224         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
225         if (!lock) {
226             return GeneralError::E_BUSY;
227         }
228 
229         if (delegate_ == nullptr) {
230             return GeneralError::E_OK;
231         }
232         if (!isForce && delegate_->GetCloudSyncTaskCount() > 0) {
233             return GeneralError::E_BUSY;
234         }
235         if (isForce && bindInfo_.loader_ != nullptr) {
236             bindInfo_.loader_->Cancel();
237         }
238         auto status = manager_.CloseStore(delegate_);
239         if (status != DBStatus::OK) {
240             return status;
241         }
242         delegate_ = nullptr;
243     }
244     RemoveTasks();
245     bindInfo_.loader_ = nullptr;
246     if (bindInfo_.db_ != nullptr) {
247         bindInfo_.db_->Close();
248     }
249     bindInfo_.db_ = nullptr;
250     {
251         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
252         rdbCloud_ = nullptr;
253         rdbLoader_ = nullptr;
254     }
255     return GeneralError::E_OK;
256 }
257 
Execute(const std::string & table,const std::string & sql)258 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
259 {
260     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
261     if (delegate_ == nullptr) {
262         ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
263             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
264             Anonymous::Change(sql).c_str());
265         return GeneralError::E_ERROR;
266     }
267     std::vector<DistributedDB::VBucket> changedData;
268     auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
269     if (status != DBStatus::OK) {
270         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
271               changedData.size());
272         return GeneralError::E_ERROR;
273     }
274     return GeneralError::E_OK;
275 }
276 
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)277 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
278 {
279     strColumnSql += " (";
280     strRowValueSql += " (";
281     auto columnSize = value.size();
282     for (auto column = value.begin(); column != value.end(); ++column) {
283         strRowValueSql += " ?,";
284         strColumnSql += " " + column->first + ",";
285     }
286     if (columnSize != 0) {
287         strColumnSql.pop_back();
288         strColumnSql += ")";
289         strRowValueSql.pop_back();
290         strRowValueSql += ")";
291     }
292     return columnSize;
293 }
294 
Insert(const std::string & table,VBuckets && values)295 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
296 {
297     if (table.empty() || values.size() == 0) {
298         ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
299         return GeneralError::E_INVALID_ARGS;
300     }
301 
302     std::string strColumnSql;
303     std::string strRowValueSql;
304     auto value = values.front();
305     size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
306     if (columnSize == 0) {
307         ZLOGE("Insert: columnSize error, can't be 0!");
308         return GeneralError::E_INVALID_ARGS;
309     }
310 
311     Values args;
312     std::string strValueSql;
313     for (auto &rowData : values) {
314         if (rowData.size() != columnSize) {
315             ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
316             return GeneralError::E_INVALID_ARGS;
317         }
318         for (auto column = rowData.begin(); column != rowData.end(); ++column) {
319             args.push_back(std::move(column->second));
320         }
321         strValueSql += strRowValueSql + ",";
322     }
323     strValueSql.pop_back();
324     std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
325 
326     std::vector<DistributedDB::VBucket> changedData;
327     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
328     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
329     if (delegate_ == nullptr) {
330         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
331             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
332         return GeneralError::E_ERROR;
333     }
334     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
335     if (status != DBStatus::OK) {
336         if (IsPrintLog(status)) {
337             auto time =
338                 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
339                 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
340                 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
341         }
342         return GeneralError::E_ERROR;
343     }
344     return GeneralError::E_OK;
345 }
346 
IsPrintLog(DBStatus status)347 bool RdbGeneralStore::IsPrintLog(DBStatus status)
348 {
349     bool isPrint = false;
350     if (status == lastError_) {
351         if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
352             isPrint = true;
353         }
354     } else {
355         isPrint = true;
356         lastErrCnt_ = 0;
357         lastError_ = status;
358     }
359     return isPrint;
360 }
361 
362 /**
363  * This function does not support batch updates in rdb, it only supports single data updates.
364  */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)365 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
366     const std::string &whereSql, Values &&conditions)
367 {
368     if (table.empty()) {
369         ZLOGE("Update: table can't be empty!");
370         return GeneralError::E_INVALID_ARGS;
371     }
372     if (setSql.empty() || values.size() == 0) {
373         ZLOGE("Update: setSql and values can't be empty!");
374         return GeneralError::E_INVALID_ARGS;
375     }
376     if (whereSql.empty() || conditions.size() == 0) {
377         ZLOGE("Update: whereSql and conditions can't be empty!");
378         return GeneralError::E_INVALID_ARGS;
379     }
380 
381     std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
382     Values args;
383     for (auto &value : values) {
384         args.push_back(std::move(value));
385     }
386     for (auto &condition : conditions) {
387         args.push_back(std::move(condition));
388     }
389 
390     std::vector<DistributedDB::VBucket> changedData;
391     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
392     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
393     if (delegate_ == nullptr) {
394         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
395             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
396         return GeneralError::E_ERROR;
397     }
398     auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
399     if (status != DBStatus::OK) {
400         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
401               changedData.size());
402         return GeneralError::E_ERROR;
403     }
404     return GeneralError::E_OK;
405 }
406 
Replace(const std::string & table,VBucket && value)407 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
408 {
409     if (table.empty() || value.size() == 0) {
410         return GeneralError::E_INVALID_ARGS;
411     }
412     std::string columnSql;
413     std::string valueSql;
414     SqlConcatenate(value, columnSql, valueSql);
415     std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
416 
417     Values args;
418     for (auto &[k, v] : value) {
419         args.emplace_back(std::move(v));
420     }
421     std::vector<DistributedDB::VBucket> changedData;
422     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
423     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
424     if (delegate_ == nullptr) {
425         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
426             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
427         return GeneralError::E_ERROR;
428     }
429     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
430     if (status != DBStatus::OK) {
431         ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
432             status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
433         return GeneralError::E_ERROR;
434     }
435     return GeneralError::E_OK;
436 }
437 
Delete(const std::string & table,const std::string & sql,Values && args)438 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
439 {
440     return 0;
441 }
442 
Query(const std::string & table,const std::string & sql,Values && args)443 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
444     const std::string &sql, Values &&args)
445 {
446     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
447     if (delegate_ == nullptr) {
448         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
449         return { GeneralError::E_ALREADY_CLOSED, nullptr };
450     }
451     auto [errCode, records] = QuerySql(sql, std::move(args));
452     return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
453 }
454 
Query(const std::string & table,GenQuery & query)455 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
456 {
457     RdbQuery *rdbQuery = nullptr;
458     auto ret = query.QueryInterface(rdbQuery);
459     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
460         ZLOGE("not RdbQuery!");
461         return { GeneralError::E_INVALID_ARGS, nullptr };
462     }
463     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
464     if (delegate_ == nullptr) {
465         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
466             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
467         return { GeneralError::E_ALREADY_CLOSED, nullptr };
468     }
469     if (rdbQuery->IsRemoteQuery()) {
470         if (rdbQuery->GetDevices().size() != 1) {
471             ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
472             return { GeneralError::E_ERROR, nullptr };
473         }
474         auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
475         return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
476     }
477     return { GeneralError::E_ERROR, nullptr };
478 }
479 
MergeMigratedData(const std::string & tableName,VBuckets && values)480 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
481 {
482     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
483     if (delegate_ == nullptr) {
484         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
485             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
486         return GeneralError::E_ERROR;
487     }
488 
489     auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
490     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
491 }
492 
CleanTrackerData(const std::string & tableName,int64_t cursor)493 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
494 {
495     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
496     if (delegate_ == nullptr) {
497         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
498               Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
499         return GeneralError::E_ERROR;
500     }
501 
502     auto status = delegate_->CleanTrackerData(tableName, cursor);
503     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
504 }
505 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,SyncParam & syncParam)506 int32_t RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async, SyncParam &syncParam)
507 {
508     DistributedDB::Query dbQuery;
509     RdbQuery *rdbQuery = nullptr;
510     bool isPriority = false;
511     auto ret = query.QueryInterface(rdbQuery);
512     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
513         dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
514     } else {
515         dbQuery = rdbQuery->GetQuery();
516         isPriority = rdbQuery->IsPriority();
517     }
518     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
519     auto dbMode = DistributedDB::SyncMode(syncMode);
520     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
521     if (delegate_ == nullptr) {
522         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
523               "wait:%{public}d", devices.size(),
524               devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
525         return GeneralError::E_ALREADY_CLOSED;
526     }
527     auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
528     SyncId syncId = ++syncTaskId_;
529     auto dbStatus = DistributedDB::INVALID_ARGS;
530     if (syncMode < NEARBY_END) {
531         dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
532     } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
533         auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
534         if (executor_ != nullptr && tasks_ != nullptr) {
535             auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
536             tasks_->Insert(syncId, { id, callback });
537         }
538         dbStatus = delegate_->Sync({ devices, dbMode, dbQuery, syncParam.wait,
539                                        (isPriority || highMode == MANUAL_SYNC_MODE), syncParam.isCompensation, {},
540                                        highMode == AUTO_SYNC_MODE, LOCK_ACTION, syncParam.prepareTraceId },
541             tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
542         if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
543             return ConvertStatus(dbStatus);
544         }
545         tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
546             if (executor != nullptr) {
547                 executor->Remove(task.taskId);
548             }
549             return false;
550         });
551     }
552     return ConvertStatus(dbStatus);
553 }
554 
PreSharing(GenQuery & query)555 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
556 {
557     RdbQuery *rdbQuery = nullptr;
558     auto ret = query.QueryInterface(rdbQuery);
559     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
560         ZLOGE("not RdbQuery!");
561         return { GeneralError::E_INVALID_ARGS, nullptr };
562     }
563     auto tables = rdbQuery->GetTables();
564     auto statement = rdbQuery->GetStatement();
565     if (statement.empty() || tables.empty()) {
566         ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
567         return { GeneralError::E_INVALID_ARGS, nullptr };
568     }
569     std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
570     VBuckets values;
571     {
572         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
573         if (delegate_ == nullptr) {
574             ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
575             return { GeneralError::E_ALREADY_CLOSED, nullptr };
576         }
577         auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
578         values = std::move(ret);
579     }
580     auto rdbCloud = GetRdbCloud();
581     if (rdbCloud == nullptr || values.empty()) {
582         ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
583             values.size());
584         return { GeneralError::E_CLOUD_DISABLED, nullptr };
585     }
586     VBuckets extends = ExtractExtend(values);
587     rdbCloud->PreSharing(*tables.begin(), extends);
588     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
589          ++value, ++extend) {
590         value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
591         value->erase(CLOUD_GID);
592     }
593     return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
594 }
595 
ExtractExtend(VBuckets & values) const596 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
597 {
598     VBuckets extends(values.size());
599     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
600          ++value, ++extend) {
601         auto it = value->find(CLOUD_GID);
602         if (it == value->end()) {
603             continue;
604         }
605         auto gid = std::get_if<std::string>(&(it->second));
606         if (gid == nullptr || gid->empty()) {
607             continue;
608         }
609         extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
610     }
611     return extends;
612 }
613 
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const614 std::string RdbGeneralStore::BuildSql(
615     const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
616 {
617     std::string sql = "select ";
618     sql.append(CLOUD_GID);
619     std::string sqlNode = "select rowid";
620     for (auto &column : columns) {
621         sql.append(", ").append(column);
622         sqlNode.append(", ").append(column);
623     }
624     sqlNode.append(" from ").append(table).append(statement);
625     auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
626     sql.append(" from ").append(logTable).append(", (").append(sqlNode);
627     sql.append(") where ").append(DATE_KEY).append(" = rowid");
628     return sql;
629 }
630 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)631 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
632 {
633     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
634         return GeneralError::E_INVALID_ARGS;
635     }
636     DBStatus status = DistributedDB::DB_ERROR;
637     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
638     if (delegate_ == nullptr) {
639         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
640               "tableName:%{public}s",
641             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
642             Anonymous::Change(tableName).c_str());
643         return GeneralError::E_ALREADY_CLOSED;
644     }
645     switch (mode) {
646         case CLOUD_INFO:
647             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
648             if (status == DistributedDB::OK) {
649                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
650                 break;
651             }
652             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
653             break;
654         case CLOUD_DATA:
655             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
656             if (status == DistributedDB::OK) {
657                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
658                 break;
659             }
660             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
661             break;
662         case NEARBY_DATA:
663             if (devices.empty()) {
664                 status = delegate_->RemoveDeviceData();
665                 break;
666             }
667             for (auto device : devices) {
668                 status = delegate_->RemoveDeviceData(device, tableName);
669             }
670             break;
671         default:
672             return GeneralError::E_ERROR;
673     }
674     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
675 }
676 
Watch(int32_t origin,Watcher & watcher)677 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
678 {
679     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
680         return GeneralError::E_INVALID_ARGS;
681     }
682 
683     observer_.watcher_ = &watcher;
684     return GeneralError::E_OK;
685 }
686 
Unwatch(int32_t origin,Watcher & watcher)687 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
688 {
689     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
690         return GeneralError::E_INVALID_ARGS;
691     }
692 
693     observer_.watcher_ = nullptr;
694     return GeneralError::E_OK;
695 }
696 
GetDBBriefCB(DetailAsync async)697 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
698 {
699     if (!async) {
700         return [](auto &) {};
701     }
702     return [async = std::move(async)](
703         const std::map<std::string, std::vector<TableStatus>> &result) {
704         DistributedData::GenDetails details;
705         for (auto &[key, tables] : result) {
706             auto &value = details[key];
707             value.progress = FINISHED;
708             value.code = GeneralError::E_OK;
709             for (auto &table : tables) {
710                 if (table.status != DBStatus::OK) {
711                     value.code = GeneralError::E_ERROR;
712                 }
713             }
714         }
715         async(details);
716     };
717 }
718 
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)719 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
720     uint32_t highMode)
721 {
722     std::shared_lock<std::shared_mutex> lock(asyncMutex_);
723     return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
724         rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
725         DistributedData::GenDetails details;
726         for (auto &[id, process] : processes) {
727             bool isDownload = false;
728             auto &detail = details[id];
729             detail.progress = process.process;
730             detail.code = ConvertStatus(process.errCode);
731             detail.dbCode = DB_ERR_OFFSET + process.errCode;
732             uint32_t totalCount = 0;
733             for (auto [key, value] : process.tableProcess) {
734                 auto &table = detail.details[key];
735                 table.upload.total = value.upLoadInfo.total;
736                 table.upload.success = value.upLoadInfo.successCount;
737                 table.upload.failed = value.upLoadInfo.failCount;
738                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
739                 totalCount += table.upload.total;
740                 isDownload = table.download.total > 0;
741                 table.download.total = value.downLoadInfo.total;
742                 table.download.success = value.downLoadInfo.successCount;
743                 table.download.failed = value.downLoadInfo.failCount;
744                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
745                 detail.changeCount = (process.process == FINISHED)
746                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
747                                               value.downLoadInfo.deleteCount
748                                         : 0;
749                 totalCount += table.download.total;
750             }
751             if (process.process == FINISHED) {
752                 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
753             } else {
754                 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
755             }
756 
757             if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
758                 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
759                 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
760             }
761         }
762         if (async) {
763             async(details);
764         }
765 
766         if (highMode == AUTO_SYNC_MODE && autoAsync
767             && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
768             autoAsync(details);
769         }
770     };
771 }
772 
Release()773 int32_t RdbGeneralStore::Release()
774 {
775     auto ref = 1;
776     {
777         std::lock_guard<decltype(mutex_)> lock(mutex_);
778         if (ref_ == 0) {
779             return 0;
780         }
781         ref = --ref_;
782     }
783     ZLOGD("ref:%{public}d", ref);
784     if (ref == 0) {
785         delete this;
786     }
787     return ref;
788 }
789 
AddRef()790 int32_t RdbGeneralStore::AddRef()
791 {
792     std::lock_guard<decltype(mutex_)> lock(mutex_);
793     if (ref_ == 0) {
794         return 0;
795     }
796     return ++ref_;
797 }
798 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)799 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
800     const std::vector<Reference> &references)
801 {
802     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
803     if (delegate_ == nullptr) {
804         ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
805             Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
806         return GeneralError::E_ALREADY_CLOSED;
807     }
808     for (const auto &table : tables) {
809         ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
810         auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
811         if (dBStatus != DistributedDB::DBStatus::OK) {
812             ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
813                 Anonymous::Change(table).c_str(), dBStatus);
814             return GeneralError::E_ERROR;
815         }
816     }
817     std::vector<DistributedDB::TableReferenceProperty> properties;
818     for (const auto &reference : references) {
819         properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
820     }
821     auto status = delegate_->SetReference(properties);
822     if (status != DistributedDB::DBStatus::OK) {
823         ZLOGE("distributed table set reference failed, err:%{public}d", status);
824         return GeneralError::E_ERROR;
825     }
826     return GeneralError::E_OK;
827 }
828 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::string & extendColName,bool isForceUpgrade)829 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
830     const std::string &extendColName, bool isForceUpgrade)
831 {
832     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
833     if (delegate_ == nullptr) {
834         ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
835             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
836         return GeneralError::E_ALREADY_CLOSED;
837     }
838     auto status = delegate_->SetTrackerTable({ tableName, extendColName, trackerColNames, isForceUpgrade });
839     if (status == DBStatus::WITH_INVENTORY_DATA) {
840         ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
841             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
842         return GeneralError::E_WITH_INVENTORY_DATA;
843     }
844     if (status != DBStatus::OK) {
845         ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
846             status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
847         return GeneralError::E_ERROR;
848     }
849     return GeneralError::E_OK;
850 }
851 
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)852 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
853     const DistributedDB::RemoteCondition &remoteCondition)
854 {
855     std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
856     DistributedDB::DBStatus status =
857         delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
858     if (status != DistributedDB::DBStatus::OK) {
859         ZLOGE("DistributedDB remote query failed, device:%{public}s, status is  %{public}d.",
860             Anonymous::Change(device).c_str(), status);
861         return nullptr;
862     }
863     return std::make_shared<RdbCursor>(dbResultSet);
864 }
865 
ConvertStatus(DistributedDB::DBStatus status)866 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
867 {
868     switch (status) {
869         case DBStatus::OK:
870             return GenErr::E_OK;
871         case DBStatus::CLOUD_NETWORK_ERROR:
872             return GenErr::E_NETWORK_ERROR;
873         case DBStatus::CLOUD_LOCK_ERROR:
874             return GenErr::E_LOCKED_BY_OTHERS;
875         case DBStatus::CLOUD_FULL_RECORDS:
876             return GenErr::E_RECODE_LIMIT_EXCEEDED;
877         case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
878             return GenErr::E_NO_SPACE_FOR_ASSET;
879         case DBStatus::BUSY:
880             return GenErr::E_BUSY;
881         case DBStatus::CLOUD_SYNC_TASK_MERGED:
882             return GenErr::E_SYNC_TASK_MERGED;
883         default:
884             ZLOGI("status:0x%{public}x", status);
885             break;
886     }
887     return GenErr::E_ERROR;
888 }
889 
IsValid()890 bool RdbGeneralStore::IsValid()
891 {
892     return delegate_ != nullptr;
893 }
894 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)895 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
896 {
897     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
898     async_ = std::move(async);
899     return GenErr::E_OK;
900 }
901 
UnregisterDetailProgressObserver()902 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
903 {
904     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
905     async_ = nullptr;
906     return GenErr::E_OK;
907 }
908 
QuerySql(const std::string & sql,Values && args)909 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
910 {
911     std::vector<DistributedDB::VBucket> changedData;
912     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
913     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
914     if (status != DBStatus::OK) {
915         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sql).c_str(),
916             changedData.size());
917         return { GenErr::E_ERROR, {} };
918     }
919     return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
920 }
921 
GetWaterVersion(const std::string & deviceId)922 std::vector<std::string> RdbGeneralStore::GetWaterVersion(const std::string &deviceId)
923 {
924     return {};
925 }
926 
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)927 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
928     uint32_t syncCount)
929 {
930     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
931     if (requiredFlag != (requiredFlag & flag)) {
932         return;
933     }
934     StoreInfo info = storeInfo;
935     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
936         traceId, syncCount);
937     EventCenter::GetInstance().PostEvent(std::move(evt));
938 }
939 
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)940 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
941 {
942     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
943     if (requiredFlag != (requiredFlag & flag)) {
944         return;
945     }
946     StoreInfo info = storeInfo;
947     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
948         traceId);
949     EventCenter::GetInstance().PostEvent(std::move(evt));
950 }
951 
GetTables()952 std::set<std::string> RdbGeneralStore::GetTables()
953 {
954     std::set<std::string> tables;
955     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
956     if (delegate_ == nullptr) {
957         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
958         return tables;
959     }
960     auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
961     if (errCode != GenErr::E_OK) {
962         return tables;
963     }
964     for (auto &table : res) {
965         auto it = table.find("name");
966         if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
967             ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
968             continue;
969         }
970         tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
971     }
972     return tables;
973 }
974 
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)975 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
976     const std::set<std::string> &localTables)
977 {
978     std::vector<std::string> res;
979     for (auto &it : syncTables) {
980         if (localTables.count(it)) {
981             res.push_back(std::move(it));
982         }
983     }
984     return res;
985 }
986 
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)987 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
988     const std::vector<std::string> &tables, ChangeType type)
989 {
990     RemoteChangeEvent::DataInfo info;
991     info.userId = meta.user;
992     info.storeId = meta.storeId;
993     info.deviceId = meta.deviceId;
994     info.bundleName = meta.bundleName;
995     info.tables = tables;
996     info.changeType = type;
997     auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
998     EventCenter::GetInstance().PostEvent(std::move(evt));
999 }
1000 
OnChange(const DBChangedIF & data)1001 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1002 {
1003     if (!HasWatcher()) {
1004         return;
1005     }
1006     std::string device = data.GetDataChangeDevice();
1007     auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1008     ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1009         Anonymous::Change(device).c_str());
1010     GenOrigin genOrigin;
1011     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1012     genOrigin.dataType = GenOrigin::BASIC_DATA;
1013     DistributedDB::StoreProperty property;
1014     data.GetStoreProperty(property);
1015     genOrigin.id.push_back(networkId);
1016     genOrigin.store = storeId_;
1017     GeneralWatcher::ChangeInfo changeInfo{};
1018     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1019     return;
1020 }
1021 
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1022 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1023 {
1024     if (!HasWatcher()) {
1025         return;
1026     }
1027     ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1028         Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1029     GenOrigin genOrigin;
1030     genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1031                            ? GenOrigin::ORIGIN_LOCAL
1032                            : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1033     genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1034     genOrigin.id.push_back(originalId);
1035     genOrigin.store = storeId_;
1036     Watcher::PRIFields fields;
1037     Watcher::ChangeInfo changeInfo;
1038     bool notifyFlag = false;
1039     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1040         auto &info = changeInfo[data.tableName][i];
1041         for (auto &priData : data.primaryData[i]) {
1042             Watcher::PRIValue value;
1043             Convert(std::move(*(priData.begin())), value);
1044             if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1045                 info.push_back(std::move(value));
1046                 continue;
1047             }
1048             auto deleteKey = std::get_if<std::string>(&value);
1049             if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1050                 // notify to start app
1051                 notifyFlag = true;
1052             }
1053             info.push_back(std::move(value));
1054         }
1055     }
1056     if (notifyFlag) {
1057         PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1058     }
1059     if (!data.field.empty()) {
1060         fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1061     }
1062     watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1063 }
1064 
LockCloudDB()1065 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1066 {
1067     auto rdbCloud = GetRdbCloud();
1068     if (rdbCloud == nullptr) {
1069         return { GeneralError::E_ERROR, 0 };
1070     }
1071     return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1072 }
1073 
UnLockCloudDB()1074 int32_t RdbGeneralStore::UnLockCloudDB()
1075 {
1076     auto rdbCloud = GetRdbCloud();
1077     if (rdbCloud == nullptr) {
1078         return GeneralError::E_ERROR;
1079     }
1080     return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1081 }
1082 
GetRdbCloud() const1083 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1084 {
1085     std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1086     return rdbCloud_;
1087 }
1088 
IsFinished(SyncId syncId) const1089 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1090 {
1091     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1092     if (delegate_ == nullptr) {
1093         ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1094         return true;
1095     }
1096     return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1097 }
1098 
GetFinishTask(SyncId syncId)1099 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1100 {
1101     return [this, executor = executor_, task = tasks_, syncId]() {
1102         auto [exist, finishTask] = task->Find(syncId);
1103         if (!exist || finishTask.cb == nullptr) {
1104             task->Erase(syncId);
1105             return;
1106         }
1107         if (!IsFinished(syncId)) {
1108             task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1109                 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1110                 return true;
1111             });
1112             return;
1113         }
1114         DBProcessCB cb;
1115         task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1116             cb = task.cb;
1117             return false;
1118         });
1119         if (cb != nullptr) {
1120             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1121                   Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1122             std::map<std::string, SyncProcess> result;
1123             result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1124             cb(result);
1125         }
1126     };
1127 }
1128 
SetExecutor(std::shared_ptr<Executor> executor)1129 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1130 {
1131     if (executor_ == nullptr) {
1132         executor_ = executor;
1133     }
1134 }
1135 
RemoveTasks()1136 void RdbGeneralStore::RemoveTasks()
1137 {
1138     if (tasks_ == nullptr) {
1139         return;
1140     }
1141     std::list<DBProcessCB> cbs;
1142     std::list<TaskId> taskIds;
1143     tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1144         if (task.cb != nullptr) {
1145             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1146                   syncId);
1147         }
1148         cbs.push_back(std::move(task.cb));
1149         taskIds.push_back(task.taskId);
1150         return true;
1151     });
1152     if (executor_ != nullptr) {
1153         for (auto taskId : taskIds) {
1154             executor_->Remove(taskId, true);
1155         }
1156     }
1157     std::map<std::string, SyncProcess> result;
1158     result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1159     for (auto &cb : cbs) {
1160         if (cb != nullptr) {
1161             cb(result);
1162         }
1163     }
1164 }
1165 
GetCB(SyncId syncId)1166 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1167 {
1168     return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1169         if (task == nullptr) {
1170             return;
1171         }
1172         DBProcessCB cb;
1173         task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1174             cb = finishTask.cb;
1175             bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1176             if (isFinished) {
1177                 finishTask.cb = nullptr;
1178             }
1179             return true;
1180         });
1181         if (cb != nullptr) {
1182             cb(progress);
1183         }
1184         return;
1185     };
1186 }
1187 } // namespace OHOS::DistributedRdb