1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34
35 namespace DistributedDB {
MarkFlagAsAssetAsyncDownload(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)36 int RelationalSyncAbleStorage::MarkFlagAsAssetAsyncDownload(const std::string &tableName,
37 const DownloadData &downloadData, const std::set<std::string> &gidFilters)
38 {
39 if (transactionHandle_ == nullptr) {
40 LOGE("[RelationalSyncAbleStorage] the transaction has not been started, tableName:%s, length:%zu",
41 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
42 return -E_INVALID_DB;
43 }
44 int errCode = transactionHandle_->MarkFlagAsAssetAsyncDownload(tableName, downloadData, gidFilters);
45 if (errCode != E_OK) {
46 LOGE("[RelationalSyncAbleStorage] mark flag as asset async download failed.%d, tableName:%s, length:%zu",
47 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
48 }
49 return errCode;
50 }
51
GetDownloadAssetTable()52 std::pair<int, std::vector<std::string>> RelationalSyncAbleStorage::GetDownloadAssetTable()
53 {
54 int errCode = E_OK;
55 auto *handle = GetHandle(false, errCode);
56 if (handle == nullptr || errCode != E_OK) {
57 LOGE("[RelationalSyncAbleStorage] Get handle failed when get downloading asset table: %d", errCode);
58 return {errCode, {}};
59 }
60 std::vector<std::string> tableNames;
61 auto allTableNames = storageEngine_->GetSchema().GetTableNames();
62 for (const auto &it : allTableNames) {
63 int32_t count = 0;
64 errCode = handle->GetDownloadingCount(it, count);
65 if (errCode != E_OK) {
66 LOGE("[RelationalSyncAbleStorage] Get downloading asset count failed: %d", errCode);
67 ReleaseHandle(handle);
68 return {errCode, tableNames};
69 }
70 if (count > 0) {
71 tableNames.push_back(it);
72 }
73 }
74 ReleaseHandle(handle);
75 return {errCode, tableNames};
76 }
77
GetDownloadAssetRecords(const std::string & tableName,int64_t beginTime)78 std::pair<int, std::vector<std::string>> RelationalSyncAbleStorage::GetDownloadAssetRecords(
79 const std::string &tableName, int64_t beginTime)
80 {
81 TableSchema schema;
82 int errCode = GetCloudTableSchema(tableName, schema);
83 if (errCode != E_OK) {
84 LOGE("[RelationalSyncAbleStorage] Get schema when get asset records failed:%d, tableName:%s, length:%zu",
85 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
86 return {errCode, {}};
87 }
88 auto *handle = GetHandle(false, errCode);
89 if (handle == nullptr || errCode != E_OK) {
90 LOGE("[RelationalSyncAbleStorage] Get handle when get asset records failed:%d, tableName:%s, length:%zu",
91 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
92 return {errCode, {}};
93 }
94 std::vector<std::string> gids;
95 errCode = handle->GetDownloadAssetRecordsInner(schema, beginTime, gids);
96 if (errCode != E_OK) {
97 LOGE("[RelationalSyncAbleStorage] Get downloading asset records failed:%d, tableName:%s, length:%zu",
98 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
99 }
100 ReleaseHandle(handle);
101 return {errCode, gids};
102 }
103
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,bool useTransaction,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)104 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
105 bool useTransaction, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
106 {
107 if (useTransaction && transactionHandle_ == nullptr) {
108 LOGE(" the transaction has not been started");
109 return -E_INVALID_DB;
110 }
111 SQLiteSingleVerRelationalStorageExecutor *handle;
112 int errCode = E_OK;
113 if (useTransaction) {
114 handle = transactionHandle_;
115 } else {
116 errCode = E_OK;
117 handle = GetHandle(false, errCode);
118 if (errCode != E_OK) {
119 return errCode;
120 }
121 }
122 errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, vBucket, dataInfoWithLog, assetInfo);
123 if (!useTransaction) {
124 ReleaseHandle(handle);
125 }
126 return errCode;
127 }
128
UpdateAssetStatusForAssetOnly(const std::string & tableName,VBucket & asset)129 int RelationalSyncAbleStorage::UpdateAssetStatusForAssetOnly(const std::string &tableName, VBucket &asset)
130 {
131 if (transactionHandle_ == nullptr) {
132 LOGE("the transaction has not been started");
133 return -E_INVALID_DB;
134 }
135
136 TableSchema tableSchema;
137 int errCode = GetCloudTableSchema(tableName, tableSchema);
138 if (errCode != E_OK) {
139 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
140 return errCode;
141 }
142 RelationalSchemaObject localSchema = GetSchemaInfo();
143 transactionHandle_->SetLocalSchema(localSchema);
144 transactionHandle_->SetLogicDelete(IsCurrentLogicDelete());
145 errCode = transactionHandle_->UpdateAssetStatusForAssetOnly(tableSchema, asset);
146 transactionHandle_->SetLogicDelete(false);
147 return errCode;
148 }
149
PrintCursorChange(const std::string & tableName)150 void RelationalSyncAbleStorage::PrintCursorChange(const std::string &tableName)
151 {
152 std::lock_guard lock(cursorChangeMutex_);
153 auto iter = cursorChangeMap_.find(tableName);
154 if (iter == cursorChangeMap_.end()) {
155 return;
156 }
157 LOGI("[RelationalSyncAbleStorage] Upgrade cursor from %d to %d when asset download success.",
158 cursorChangeMap_[tableName].first, cursorChangeMap_[tableName].second);
159 cursorChangeMap_.erase(tableName);
160 }
161
SaveCursorChange(const std::string & tableName,uint64_t currCursor)162 void RelationalSyncAbleStorage::SaveCursorChange(const std::string &tableName, uint64_t currCursor)
163 {
164 std::lock_guard lock(cursorChangeMutex_);
165 auto iter = cursorChangeMap_.find(tableName);
166 if (iter == cursorChangeMap_.end()) {
167 std::pair<uint64_t, uint64_t> initCursors = {currCursor, currCursor};
168 cursorChangeMap_.insert(std::pair<std::string, std::pair<uint64_t, uint64_t>>(tableName, initCursors));
169 return;
170 }
171 std::pair<uint64_t, uint64_t> minMaxCursors = iter->second;
172 uint64_t minCursor = std::min(minMaxCursors.first, currCursor);
173 uint64_t maxCursor = std::max(minMaxCursors.second, currCursor);
174 cursorChangeMap_[tableName] = {minCursor, maxCursor};
175 }
176
FillCloudAssetForAsyncDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)177 int RelationalSyncAbleStorage::FillCloudAssetForAsyncDownload(const std::string &tableName, VBucket &asset,
178 bool isDownloadSuccess)
179 {
180 if (storageEngine_ == nullptr) {
181 LOGE("[RelationalSyncAbleStorage]storage is null when fill asset for async download");
182 return -E_INVALID_DB;
183 }
184 if (asyncDownloadTransactionHandle_ == nullptr) {
185 LOGE("the transaction has not been started when fill asset for async download.");
186 return -E_INVALID_DB;
187 }
188 TableSchema tableSchema;
189 int errCode = GetCloudTableSchema(tableName, tableSchema);
190 if (errCode != E_OK) {
191 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
192 return errCode;
193 }
194 uint64_t currCursor = DBConstant::INVALID_CURSOR;
195 errCode = asyncDownloadTransactionHandle_->FillCloudAssetForDownload(
196 tableSchema, asset, isDownloadSuccess, currCursor);
197 if (errCode != E_OK) {
198 LOGE("fill cloud asset for async download failed.%d", errCode);
199 }
200 return errCode;
201 }
202
UpdateRecordFlagForAsyncDownload(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)203 int RelationalSyncAbleStorage::UpdateRecordFlagForAsyncDownload(const std::string &tableName, bool recordConflict,
204 const LogInfo &logInfo)
205 {
206 if (asyncDownloadTransactionHandle_ == nullptr) {
207 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
208 return -E_INVALID_DB;
209 }
210 TableSchema tableSchema;
211 GetCloudTableSchema(tableName, tableSchema);
212 std::vector<VBucket> assets;
213 int errCode = asyncDownloadTransactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
214 if (errCode != E_OK) {
215 LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d", logInfo.cloudGid.c_str(), errCode);
216 return errCode;
217 }
218 bool isInconsistency = !assets.empty();
219 UpdateRecordFlagStruct updateRecordFlag = {
220 .tableName = tableName,
221 .isRecordConflict = recordConflict,
222 .isInconsistency = isInconsistency
223 };
224 std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
225 return asyncDownloadTransactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
226 }
227
SetLogTriggerStatusForAsyncDownload(bool status)228 int RelationalSyncAbleStorage::SetLogTriggerStatusForAsyncDownload(bool status)
229 {
230 int errCode = E_OK;
231 auto *handle = GetHandleExpectTransactionForAsyncDownload(false, errCode);
232 if (handle == nullptr) {
233 return errCode;
234 }
235 errCode = handle->SetLogTriggerStatus(status);
236 if (asyncDownloadTransactionHandle_ == nullptr) {
237 ReleaseHandle(handle);
238 }
239 return errCode;
240 }
241
GetAssetsByGidOrHashKeyForAsyncDownload(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)242 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKeyForAsyncDownload(
243 const TableSchema &tableSchema, const std::string &gid, const Bytes &hashKey, VBucket &assets)
244 {
245 if (gid.empty() && hashKey.empty()) {
246 LOGE("both gid and hashKey are empty.");
247 return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
248 }
249 int errCode = E_OK;
250 auto *handle = GetHandle(false, errCode);
251 if (handle == nullptr) {
252 LOGE("executor is null when get assets by gid or hash for async download.");
253 return {errCode, static_cast<uint32_t>(LockStatus::UNLOCK)};
254 }
255 auto [ret, status] = handle->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
256 if (ret != E_OK && ret != -E_NOT_FOUND && ret != -E_CLOUD_GID_MISMATCH) {
257 LOGE("get assets by gid or hashKey failed for async download. %d", ret);
258 }
259 ReleaseHandle(handle);
260 return {ret, status};
261 }
262
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)263 int RelationalSyncAbleStorage::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
264 LockStatus &status)
265 {
266 if (tableName.empty() || gid.empty()) {
267 LOGE("[RelationalSyncAbleStorage] invalid table name or gid.");
268 return -E_INVALID_ARGS;
269 }
270 int errCode = E_OK;
271 auto *handle = GetHandle(false, errCode);
272 if (handle == nullptr) {
273 LOGE("[RelationalSyncAbleStorage] handle is null when get lock status by gid.");
274 return errCode;
275 }
276 errCode = handle->GetLockStatusByGid(tableName, gid, status);
277 ReleaseHandle(handle);
278 return errCode;
279 }
280
IsExistTableContainAssets()281 bool RelationalSyncAbleStorage::IsExistTableContainAssets()
282 {
283 std::shared_ptr<DataBaseSchema> cloudSchema = nullptr;
284 int errCode = GetCloudDbSchema(cloudSchema);
285 if (errCode != E_OK) {
286 LOGE("Cannot get cloud schema: %d when check contain assets table", errCode);
287 return false;
288 }
289 if (cloudSchema == nullptr) {
290 LOGE("Not set cloud schema when check contain assets table");
291 return false;
292 }
293 auto schema = GetSchemaInfo();
294 for (const auto &table : cloudSchema->tables) {
295 auto tableInfo = schema.GetTable(table.name);
296 if (tableInfo.GetTableName().empty()) {
297 continue; // ignore not distributed table
298 }
299 for (const auto &field : table.fields) {
300 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
301 return true;
302 }
303 }
304 }
305 return false;
306 }
307
IsSetDistributedSchema(const std::string & tableName,RelationalSchemaObject & schemaObj)308 bool RelationalSyncAbleStorage::IsSetDistributedSchema(const std::string &tableName, RelationalSchemaObject &schemaObj)
309 {
310 if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION) {
311 const std::vector<DistributedTable> &tables = schemaObj.GetDistributedSchema().tables;
312 if (tables.empty()) {
313 LOGE("[RelationalSyncAbleStorage] Distributed schema not set in COLLABORATION mode");
314 return false;
315 }
316 auto iter = std::find_if(tables.begin(), tables.end(), [tableName](const DistributedTable &table) {
317 return DBCommon::CaseInsensitiveCompare(table.tableName, tableName);
318 });
319 if (iter == tables.end()) {
320 LOGE("[RelationalSyncAbleStorage] table name mismatch");
321 return false;
322 }
323 }
324 return true;
325 }
326
CommitForAsyncDownload()327 int RelationalSyncAbleStorage::CommitForAsyncDownload()
328 {
329 std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
330 if (asyncDownloadTransactionHandle_ == nullptr) {
331 LOGE("relation database is null or the transaction has not been started");
332 return -E_INVALID_DB;
333 }
334 int errCode = asyncDownloadTransactionHandle_->Commit();
335 ReleaseHandle(asyncDownloadTransactionHandle_);
336 asyncDownloadTransactionHandle_ = nullptr;
337 LOGD("connection commit transaction!");
338 return errCode;
339 }
340
RollbackForAsyncDownload()341 int RelationalSyncAbleStorage::RollbackForAsyncDownload()
342 {
343 std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
344 if (asyncDownloadTransactionHandle_ == nullptr) {
345 LOGE("Invalid handle for rollback or the transaction has not been started.");
346 return -E_INVALID_DB;
347 }
348
349 int errCode = asyncDownloadTransactionHandle_->Rollback();
350 ReleaseHandle(asyncDownloadTransactionHandle_);
351 asyncDownloadTransactionHandle_ = nullptr;
352 LOGI("connection rollback transaction!");
353 return errCode;
354 }
355
GetHandleExpectTransactionForAsyncDownload(bool isWrite,int & errCode,OperatePerm perm) const356 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransactionForAsyncDownload(
357 bool isWrite, int &errCode, OperatePerm perm) const
358 {
359 if (storageEngine_ == nullptr) {
360 errCode = -E_INVALID_DB;
361 return nullptr;
362 }
363 if (asyncDownloadTransactionHandle_ != nullptr) {
364 return asyncDownloadTransactionHandle_;
365 }
366 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
367 storageEngine_->FindExecutor(isWrite, perm, errCode));
368 if (errCode != E_OK) {
369 ReleaseHandle(handle);
370 handle = nullptr;
371 }
372 return handle;
373 }
374
StartTransactionForAsyncDownload(TransactType type)375 int RelationalSyncAbleStorage::StartTransactionForAsyncDownload(TransactType type)
376 {
377 if (storageEngine_ == nullptr) {
378 return -E_INVALID_DB;
379 }
380 std::unique_lock<std::shared_mutex> lock(asyncDownloadtransactionMutex_);
381 if (asyncDownloadTransactionHandle_ != nullptr) {
382 LOGD("async download transaction started already.");
383 return -E_TRANSACT_STATE;
384 }
385 int errCode = E_OK;
386 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
387 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
388 if (handle == nullptr) {
389 ReleaseHandle(handle);
390 return errCode;
391 }
392 errCode = handle->StartTransaction(type);
393 if (errCode != E_OK) {
394 ReleaseHandle(handle);
395 return errCode;
396 }
397 asyncDownloadTransactionHandle_ = handle;
398 return errCode;
399 }
400
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)401 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
402 Timestamp localMark)
403 {
404 uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
405 }
406
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const407 int RelationalSyncAbleStorage::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
408 {
409 needCompressOnSync = storageEngine_->GetRelationalProperties().GetBoolProp(DBProperties::COMPRESS_ON_SYNC, false);
410 compressionRate = storageEngine_->GetRelationalProperties().GetIntProp(DBProperties::COMPRESSION_RATE,
411 DBConstant::DEFAULT_COMPTRESS_RATE);
412 return E_OK;
413 }
414 }
415 #endif
416