• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34 
35 namespace DistributedDB {
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)36 int RelationalSyncAbleStorage::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
37     const DownloadData &downloadData, const std::set<std::string> &gidFilters)
38 {
39     if (transactionHandle_ == nullptr) {
40         LOGE("[RelationalSyncAbleStorage] the transaction has not been started, tableName:%s, length:%zu",
41             DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
42         return -E_INVALID_DB;
43     }
44     int errCode = transactionHandle_->MarkFlagAsAssetAsyncDownload(tableName, downloadData, gidFilters);
45     if (errCode != E_OK) {
46         LOGE("[RelationalSyncAbleStorage] mark flag as asset async download failed.%d, tableName:%s, length:%zu",
47             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
48     }
49     return errCode;
50 }
51 
GetDownloadAssetTable()52 std::pair<int, std::vector<std::string>> RelationalSyncAbleStorage::GetDownloadAssetTable()
53 {
54     int errCode = E_OK;
55     auto *handle = GetHandle(false, errCode);
56     if (handle == nullptr || errCode != E_OK) {
57         LOGE("[RelationalSyncAbleStorage] Get handle failed when get downloading asset table: %d", errCode);
58         return {errCode, {}};
59     }
60     std::vector<std::string> tableNames;
61     auto allTableNames = storageEngine_->GetSchema().GetTableNames();
62     for (const auto &it : allTableNames) {
63         int32_t count = 0;
64         errCode = handle->GetDownloadingCount(it, count);
65         if (errCode != E_OK) {
66             LOGE("[RelationalSyncAbleStorage] Get downloading asset count failed: %d", errCode);
67             ReleaseHandle(handle);
68             return {errCode, tableNames};
69         }
70         if (count > 0) {
71             tableNames.push_back(it);
72         }
73     }
74     ReleaseHandle(handle);
75     return {errCode, tableNames};
76 }
77 
GetDownloadAssetRecords(const std::string & tableName,int64_t beginTime)78 std::pair<int, std::vector<std::string>> RelationalSyncAbleStorage::GetDownloadAssetRecords(
79     const std::string &tableName, int64_t beginTime)
80 {
81     TableSchema schema;
82     int errCode = GetCloudTableSchema(tableName, schema);
83     if (errCode != E_OK) {
84         LOGE("[RelationalSyncAbleStorage] Get schema when get asset records failed:%d, tableName:%s, length:%zu",
85             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
86         return {errCode, {}};
87     }
88     auto *handle = GetHandle(false, errCode);
89     if (handle == nullptr || errCode != E_OK) {
90         LOGE("[RelationalSyncAbleStorage] Get handle when get asset records failed:%d, tableName:%s, length:%zu",
91             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
92         return {errCode, {}};
93     }
94     std::vector<std::string> gids;
95     errCode = handle->GetDownloadAssetRecordsInner(schema, beginTime, gids);
96     if (errCode != E_OK) {
97         LOGE("[RelationalSyncAbleStorage] Get downloading asset records failed:%d, tableName:%s, length:%zu",
98             errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
99     }
100     ReleaseHandle(handle);
101     return {errCode, gids};
102 }
103 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,bool useTransaction,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)104 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
105     bool useTransaction, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
106 {
107     if (useTransaction && transactionHandle_ == nullptr) {
108         LOGE(" the transaction has not been started");
109         return -E_INVALID_DB;
110     }
111     SQLiteSingleVerRelationalStorageExecutor *handle;
112     int errCode = E_OK;
113     if (useTransaction) {
114         handle = transactionHandle_;
115     } else {
116         errCode = E_OK;
117         handle = GetHandle(false, errCode);
118         if (errCode != E_OK) {
119             return errCode;
120         }
121     }
122     errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, vBucket, dataInfoWithLog, assetInfo);
123     if (!useTransaction) {
124         ReleaseHandle(handle);
125     }
126     return errCode;
127 }
128 
UpdateAssetStatusForAssetOnly(const std::string & tableName,VBucket & asset)129 int RelationalSyncAbleStorage::UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset)
130 {
131     if (transactionHandle_ == nullptr) {
132         LOGE("the transaction has not been started");
133         return -E_INVALID_DB;
134     }
135 
136     TableSchema tableSchema;
137     int errCode = GetCloudTableSchema(tableName, tableSchema);
138     if (errCode != E_OK) {
139         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
140         return errCode;
141     }
142     RelationalSchemaObject localSchema = GetSchemaInfo();
143     transactionHandle_->SetLocalSchema(localSchema);
144     transactionHandle_->SetLogicDelete(IsCurrentLogicDelete());
145     errCode = transactionHandle_->UpdateAssetStatusForAssetOnly(tableSchema, asset);
146     transactionHandle_->SetLogicDelete(false);
147     return errCode;
148 }
149 
PrintCursorChange(const std::string & tableName)150 void RelationalSyncAbleStorage::PrintCursorChange(const std::string &tableName)
151 {
152     std::lock_guard lock(cursorChangeMutex_);
153     auto iter = cursorChangeMap_.find(tableName);
154     if (iter == cursorChangeMap_.end()) {
155         return;
156     }
157     LOGI("[RelationalSyncAbleStorage] Upgrade cursor from %d to %d when asset download success.",
158         cursorChangeMap_[tableName].first, cursorChangeMap_[tableName].second);
159     cursorChangeMap_.erase(tableName);
160 }
161 
SaveCursorChange(const std::string & tableName,uint64_t currCursor)162 void RelationalSyncAbleStorage::SaveCursorChange(const std::string &tableName, uint64_t currCursor)
163 {
164     std::lock_guard lock(cursorChangeMutex_);
165     auto iter = cursorChangeMap_.find(tableName);
166     if (iter == cursorChangeMap_.end()) {
167         std::pair<uint64_t, uint64_t> initCursors = {currCursor, currCursor};
168         cursorChangeMap_.insert(std::pair<std::string, std::pair<uint64_t, uint64_t>>(tableName, initCursors));
169         return;
170     }
171     std::pair<uint64_t, uint64_t> minMaxCursors = iter->second;
172     uint64_t minCursor = std::min(minMaxCursors.first, currCursor);
173     uint64_t maxCursor = std::max(minMaxCursors.second, currCursor);
174     cursorChangeMap_[tableName] = {minCursor, maxCursor};
175 }
176 
FillCloudAssetForAsyncDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)177 int RelationalSyncAbleStorage::FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset,
178     bool isDownloadSuccess)
179 {
180     if (storageEngine_ == nullptr) {
181         LOGE("[RelationalSyncAbleStorage]storage is null when fill asset for async download");
182         return -E_INVALID_DB;
183     }
184     if (asyncDownloadTransactionHandle_ == nullptr) {
185         LOGE("the transaction has not been started when fill asset for async download.");
186         return -E_INVALID_DB;
187     }
188     TableSchema tableSchema;
189     int errCode = GetCloudTableSchema(tableName, tableSchema);
190     if (errCode != E_OK) {
191         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
192         return errCode;
193     }
194     uint64_t currCursor = DBConstant::INVALID_CURSOR;
195     errCode = asyncDownloadTransactionHandle_->FillCloudAssetForDownload(
196         tableSchema, asset, isDownloadSuccess, currCursor);
197     if (errCode != E_OK) {
198         LOGE("fill cloud asset for async download failed.%d", errCode);
199     }
200     return errCode;
201 }
202 
UpdateRecordFlagForAsyncDownload(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)203 int RelationalSyncAbleStorage::UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict,
204     const LogInfo &logInfo)
205 {
206     if (asyncDownloadTransactionHandle_ == nullptr) {
207         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
208         return -E_INVALID_DB;
209     }
210     TableSchema tableSchema;
211     GetCloudTableSchema(tableName, tableSchema);
212     std::vector<VBucket> assets;
213     int errCode = asyncDownloadTransactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
214     if (errCode != E_OK) {
215         LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d", logInfo.cloudGid.c_str(), errCode);
216         return errCode;
217     }
218     bool isInconsistency = !assets.empty();
219     UpdateRecordFlagStruct updateRecordFlag = {
220         .tableName = tableName,
221         .isRecordConflict = recordConflict,
222         .isInconsistency = isInconsistency
223     };
224     std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
225     return asyncDownloadTransactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
226 }
227 
SetLogTriggerStatusForAsyncDownload(bool status)228 int RelationalSyncAbleStorage::SetLogTriggerStatusForAsyncDownload(bool status)
229 {
230     int errCode = E_OK;
231     auto *handle = GetHandleExpectTransactionForAsyncDownload(false, errCode);
232     if (handle == nullptr) {
233         return errCode;
234     }
235     errCode = handle->SetLogTriggerStatus(status);
236     if (asyncDownloadTransactionHandle_ == nullptr) {
237         ReleaseHandle(handle);
238     }
239     return errCode;
240 }
241 
GetAssetsByGidOrHashKeyForAsyncDownload(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)242 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKeyForAsyncDownload(
243     const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
244 {
245     if (gid.empty() && hashKey.empty()) {
246         LOGE("both gid and hashKey are empty.");
247         return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
248     }
249     int errCode = E_OK;
250     auto *handle = GetHandle(false, errCode);
251     if (handle == nullptr) {
252         LOGE("executor is null when get assets by gid or hash for async download.");
253         return {errCode, static_cast<uint32_t>(LockStatus::UNLOCK)};
254     }
255     auto [ret, status] = handle->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
256     if (ret != E_OK && ret != -E_NOT_FOUND && ret != -E_CLOUD_GID_MISMATCH) {
257         LOGE("get assets by gid or hashKey failed for async download. %d", ret);
258     }
259     ReleaseHandle(handle);
260     return {ret, status};
261 }
262 
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)263 int RelationalSyncAbleStorage::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
264     LockStatus &status)
265 {
266     if (tableName.empty() || gid.empty()) {
267         LOGE("[RelationalSyncAbleStorage] invalid table name or gid.");
268         return -E_INVALID_ARGS;
269     }
270     int errCode = E_OK;
271     auto *handle = GetHandle(false, errCode);
272     if (handle == nullptr) {
273         LOGE("[RelationalSyncAbleStorage] handle is null when get lock status by gid.");
274         return errCode;
275     }
276     errCode = handle->GetLockStatusByGid(tableName, gid, status);
277     ReleaseHandle(handle);
278     return errCode;
279 }
280 
IsExistTableContainAssets()281 bool RelationalSyncAbleStorage::IsExistTableContainAssets()
282 {
283     std::shared_ptr<DataBaseSchema> cloudSchema = nullptr;
284     int errCode = GetCloudDbSchema(cloudSchema);
285     if (errCode != E_OK) {
286         LOGE("Cannot get cloud schema: %d when check contain assets table", errCode);
287         return false;
288     }
289     if (cloudSchema == nullptr) {
290         LOGE("Not set cloud schema when check contain assets table");
291         return false;
292     }
293     auto schema = GetSchemaInfo();
294     for (const auto &table : cloudSchema->tables) {
295         auto tableInfo = schema.GetTable(table.name);
296         if (tableInfo.GetTableName().empty()) {
297             continue; // ignore not distributed table
298         }
299         for (const auto &field : table.fields) {
300             if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
301                 return true;
302             }
303         }
304     }
305     return false;
306 }
307 
IsSetDistributedSchema(const std::string & tableName,RelationalSchemaObject & schemaObj)308 bool RelationalSyncAbleStorage::IsSetDistributedSchema(const std::string &tableName, RelationalSchemaObject &schemaObj)
309 {
310     if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION) {
311         const std::vector<DistributedTable> &tables = schemaObj.GetDistributedSchema().tables;
312         if (tables.empty()) {
313             LOGE("[RelationalSyncAbleStorage] Distributed schema not set in COLLABORATION mode");
314             return false;
315         }
316         auto iter = std::find_if(tables.begin(), tables.end(), [tableName](const DistributedTable &table) {
317             return DBCommon::CaseInsensitiveCompare(table.tableName, tableName);
318         });
319         if (iter == tables.end()) {
320             LOGE("[RelationalSyncAbleStorage] table name mismatch");
321             return false;
322         }
323     }
324     return true;
325 }
326 
CommitForAsyncDownload()327 int RelationalSyncAbleStorage::CommitForAsyncDownload()
328 {
329     std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
330     if (asyncDownloadTransactionHandle_ == nullptr) {
331         LOGE("relation database is null or the transaction has not been started");
332         return -E_INVALID_DB;
333     }
334     int errCode = asyncDownloadTransactionHandle_->Commit();
335     ReleaseHandle(asyncDownloadTransactionHandle_);
336     asyncDownloadTransactionHandle_ = nullptr;
337     LOGD("connection commit transaction!");
338     return errCode;
339 }
340 
RollbackForAsyncDownload()341 int RelationalSyncAbleStorage::RollbackForAsyncDownload()
342 {
343     std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
344     if (asyncDownloadTransactionHandle_ == nullptr) {
345         LOGE("Invalid handle for rollback or the transaction has not been started.");
346         return -E_INVALID_DB;
347     }
348 
349     int errCode = asyncDownloadTransactionHandle_->Rollback();
350     ReleaseHandle(asyncDownloadTransactionHandle_);
351     asyncDownloadTransactionHandle_ = nullptr;
352     LOGI("connection rollback transaction!");
353     return errCode;
354 }
355 
GetHandleExpectTransactionForAsyncDownload(bool isWrite,int & errCode,OperatePerm perm) const356 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransactionForAsyncDownload(
357     bool isWrite, int &errCode, OperatePerm perm) const
358 {
359     if (storageEngine_ == nullptr) {
360         errCode = -E_INVALID_DB;
361         return nullptr;
362     }
363     if (asyncDownloadTransactionHandle_ != nullptr) {
364         return asyncDownloadTransactionHandle_;
365     }
366     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
367         storageEngine_->FindExecutor(isWrite, perm, errCode));
368     if (errCode != E_OK) {
369         ReleaseHandle(handle);
370         handle = nullptr;
371     }
372     return handle;
373 }
374 
StartTransactionForAsyncDownload(TransactType type)375 int RelationalSyncAbleStorage::StartTransactionForAsyncDownload(TransactType type)
376 {
377     if (storageEngine_ == nullptr) {
378         return -E_INVALID_DB;
379     }
380     std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
381     if (asyncDownloadTransactionHandle_ != nullptr) {
382         LOGD("async download transaction started already.");
383         return -E_TRANSACT_STATE;
384     }
385     int errCode = E_OK;
386     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
387         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
388     if (handle == nullptr) {
389         ReleaseHandle(handle);
390         return errCode;
391     }
392     errCode = handle->StartTransaction(type);
393     if (errCode != E_OK) {
394         ReleaseHandle(handle);
395         return errCode;
396     }
397     asyncDownloadTransactionHandle_ = handle;
398     return errCode;
399 }
400 
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)401 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
402     Timestamp localMark)
403 {
404     uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
405 }
406 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const407 int RelationalSyncAbleStorage::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
408 {
409     needCompressOnSync = storageEngine_->GetRelationalProperties().GetBoolProp(DBProperties::COMPRESS_ON_SYNC, false);
410     compressionRate = storageEngine_->GetRelationalProperties().GetIntProp(DBProperties::COMPRESSION_RATE,
411         DBConstant::DEFAULT_COMPTRESS_RATE);
412     return E_OK;
413 }
414 }
415 #endif
416