1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17
18 #include <utility>
19
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39 static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40 (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41 std::string(CLOSE_CONN_TASK),
42 [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43 );
44 }
45 }
46
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48 : storageEngine_(std::move(engine)),
49 reusedHandle_(nullptr),
50 isCachedOption_(false)
51 {}
52
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55 syncAbleEngine_ = nullptr;
56 }
57
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61 return SYNC_RELATION;
62 }
63
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67 LOGD("RelationalSyncAbleStorage ref +1");
68 IncObjRef(this);
69 }
70
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74 LOGD("RelationalSyncAbleStorage ref -1");
75 DecObjRef(this);
76 }
77
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81 std::string identifier = storageEngine_->GetIdentifier();
82 return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87 std::string identifier = storageEngine_->GetRelationalProperties().GetStringProp(
88 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90 return identifierVect;
91 }
92
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp ×tamp) const
95 {
96 int errCode = E_OK;
97 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98 if (handle == nullptr) {
99 return;
100 }
101 timestamp = 0;
102 errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103 if (errCode != E_OK) {
104 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
106 }
107 ReleaseHandle(handle);
108 }
109
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const110 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const
111 {
112 int errCode = E_OK;
113 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
114 if (handle == nullptr) {
115 return errCode;
116 }
117 timestamp = 0;
118 errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
119 if (errCode != E_OK) {
120 LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
121 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
122 }
123 ReleaseHandle(handle);
124 return errCode;
125 }
126
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const127 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
128 OperatePerm perm) const
129 {
130 if (storageEngine_ == nullptr) {
131 errCode = -E_INVALID_DB;
132 return nullptr;
133 }
134 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
135 storageEngine_->FindExecutor(isWrite, perm, errCode));
136 if (handle == nullptr) {
137 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
138 }
139 return handle;
140 }
141
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const142 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
143 int &errCode, OperatePerm perm) const
144 {
145 if (storageEngine_ == nullptr) {
146 errCode = -E_INVALID_DB;
147 return nullptr;
148 }
149 if (transactionHandle_ != nullptr) {
150 return transactionHandle_;
151 }
152 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
153 storageEngine_->FindExecutor(isWrite, perm, errCode));
154 if (errCode != E_OK) {
155 ReleaseHandle(handle);
156 handle = nullptr;
157 }
158 return handle;
159 }
160
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const161 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
162 {
163 if (storageEngine_ == nullptr) {
164 return;
165 }
166 StorageExecutor *databaseHandle = handle;
167 storageEngine_->Recycle(databaseHandle);
168 std::function<void()> listener = nullptr;
169 {
170 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
171 listener = heartBeatListener_;
172 }
173 if (listener) {
174 listener();
175 }
176 }
177
178 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const179 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
180 {
181 if (key.size() > DBConstant::MAX_KEY_SIZE) {
182 return -E_INVALID_ARGS;
183 }
184 int errCode = E_OK;
185 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
186 if (handle == nullptr) {
187 return errCode;
188 }
189 errCode = handle->GetKvData(key, value);
190 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
191 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
192 }
193 ReleaseHandle(handle);
194 return errCode;
195 }
196
GetMetaDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const197 int RelationalSyncAbleStorage::GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const
198 {
199 if (keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
200 return -E_INVALID_ARGS;
201 }
202 int errCode = E_OK;
203 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
204 if (handle == nullptr) {
205 return errCode;
206 }
207 errCode = handle->GetKvDataByPrefixKey(keyPrefix, data);
208 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
209 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
210 }
211 ReleaseHandle(handle);
212 return errCode;
213 }
214
215 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)216 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
217 {
218 int errCode = E_OK;
219 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
220 if (handle == nullptr) {
221 return errCode;
222 }
223
224 errCode = handle->PutKvData(key, value); // meta doesn't need time.
225 if (errCode != E_OK) {
226 LOGE("Put kv data err:%d", errCode);
227 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
228 }
229 ReleaseHandle(handle);
230 return errCode;
231 }
232
PutMetaData(const Key & key,const Value & value,bool isInTransaction)233 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
234 {
235 if (storageEngine_ == nullptr) {
236 return -E_INVALID_DB;
237 }
238 int errCode = E_OK;
239 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
240 std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
241
242 // try to recycle using the handle
243 if (isInTransaction) {
244 handLock.lock();
245 if (reusedHandle_ != nullptr) {
246 handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
247 } else {
248 isInTransaction = false;
249 handLock.unlock();
250 }
251 }
252
253 if (handle == nullptr) {
254 handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
255 if (handle == nullptr) {
256 return errCode;
257 }
258 }
259
260 errCode = handle->PutKvData(key, value);
261 if (errCode != E_OK) {
262 LOGE("Put kv data err:%d", errCode);
263 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
264 }
265 if (!isInTransaction) {
266 ReleaseHandle(handle);
267 }
268 return errCode;
269 }
270
271 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)272 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
273 {
274 for (const auto &key : keys) {
275 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
276 return -E_INVALID_ARGS;
277 }
278 }
279 int errCode = E_OK;
280 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
281 if (handle == nullptr) {
282 return errCode;
283 }
284
285 handle->StartTransaction(TransactType::IMMEDIATE);
286 errCode = handle->DeleteMetaData(keys);
287 if (errCode != E_OK) {
288 handle->Rollback();
289 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
290 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
291 } else {
292 handle->Commit();
293 }
294 ReleaseHandle(handle);
295 return errCode;
296 }
297
298 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const299 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
300 {
301 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
302 return -E_INVALID_ARGS;
303 }
304
305 int errCode = E_OK;
306 auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
307 if (handle == nullptr) {
308 return errCode;
309 }
310
311 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
312 if (errCode != E_OK) {
313 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
314 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
315 }
316 ReleaseHandle(handle);
317 return errCode;
318 }
319
320 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const321 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
322 {
323 int errCode = E_OK;
324 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
325 if (handle == nullptr) {
326 return errCode;
327 }
328
329 errCode = handle->GetAllMetaKeys(keys);
330 if (errCode != E_OK) {
331 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
332 }
333 ReleaseHandle(handle);
334 return errCode;
335 }
336
GetDbProperties() const337 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
338 {
339 return storageEngine_->GetProperties();
340 }
341
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)342 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
343 {
344 int errCode = E_OK;
345 for (auto &item : dataItems) {
346 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
347 if (entry == nullptr) {
348 errCode = -E_OUT_OF_MEMORY;
349 LOGE("GetKvEntries failed, errCode:%d", errCode);
350 SingleVerKvEntry::Release(entries);
351 break;
352 }
353 entry->SetEntryData(std::move(item));
354 entries.push_back(entry);
355 }
356 return errCode;
357 }
358
GetDataItemSerialSize(const DataItem & item,size_t appendLen)359 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
360 {
361 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
362 // the size would not be very large.
363 static const size_t maxOrigDevLength = 40;
364 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
365 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
366 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
367 return dataSize;
368 }
369
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)370 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
371 size_t appendLen)
372 {
373 bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
374 for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
375 blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
376 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
377 }
378 return !reachThreshold;
379 }
380
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)381 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
382 SQLiteSingleVerRelationalContinueToken *&token)
383 {
384 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
385 delete token;
386 token = nullptr;
387 return;
388 }
389
390 if (dataItems.empty()) {
391 errCode = -E_INTERNAL_ERROR;
392 LOGE("Get data unfinished but data items is empty.");
393 delete token;
394 token = nullptr;
395 return;
396 }
397 token->SetNextBeginTime(dataItems.back());
398 token->UpdateNextSyncOffset(dataItems.size());
399 }
400
401 /**
402 * Caller must ensure that parameter token is valid.
403 * If error happened, token will be deleted here.
404 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo,RelationalSchemaObject && filterSchema) const405 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
406 SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo,
407 RelationalSchemaObject &&filterSchema) const
408 {
409 if (storageEngine_ == nullptr) {
410 return -E_INVALID_DB;
411 }
412
413 int errCode = E_OK;
414 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
415 OperatePerm::NORMAL_PERM, errCode));
416 if (handle == nullptr) {
417 goto ERROR;
418 }
419 handle->SetLocalSchema(filterSchema);
420 do {
421 errCode = handle->GetSyncDataByQuery(dataItems,
422 Parcel::GetAppendedLen(),
423 dataSizeInfo,
424 [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
425 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
426 }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
427 if (errCode == -E_FINISHED) {
428 token->FinishGetData();
429 errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
430 }
431 } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
432
433 ERROR:
434 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
435 dataItems.clear();
436 }
437 ProcessContinueTokenForQuerySync(dataItems, errCode, token);
438 ReleaseHandle(handle);
439 return errCode;
440 }
441
442 // use kv struct data to sync
443 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const444 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
445 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
446 std::vector<SingleVerKvEntry *> &entries) const
447 {
448 if (!timeRange.IsValid()) {
449 return -E_INVALID_ARGS;
450 }
451 query.SetSchema(storageEngine_->GetSchema());
452 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
453 if (token == nullptr) {
454 LOGE("[SingleVerNStore] Allocate continue token failed.");
455 return -E_OUT_OF_MEMORY;
456 }
457
458 continueStmtToken = static_cast<ContinueToken>(token);
459 return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
460 }
461
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const462 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
463 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
464 {
465 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
466 if (token == nullptr) {
467 LOGE("[SingleVerNStore] Allocate continue stmt token failed.");
468 return -E_OUT_OF_MEMORY;
469 }
470 if (!token->CheckValid()) {
471 return -E_INVALID_ARGS;
472 }
473 RelationalSchemaObject schema = storageEngine_->GetSchema();
474 RelationalSchemaObject filterSchema;
475 if (token->IsUseLocalSchema()) {
476 filterSchema = schema;
477 } else {
478 int errCode = GetRemoteDeviceSchema(token->GetRemoteDev(), filterSchema);
479 if (errCode != E_OK) {
480 return errCode;
481 }
482 }
483 const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
484 std::vector<std::string> fieldNames;
485 fieldNames.reserve(fieldInfos.size());
486 for (const auto &fieldInfo : fieldInfos) { // order by cid
487 fieldNames.push_back(fieldInfo.GetFieldName());
488 }
489 token->SetFieldNames(fieldNames);
490
491 std::vector<DataItem> dataItems;
492 int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo, std::move(filterSchema));
493 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
494 continueStmtToken = static_cast<ContinueToken>(token);
495 return errCode;
496 }
497
498 int innerCode = GetKvEntriesByDataItems(entries, dataItems);
499 if (innerCode != E_OK) {
500 errCode = innerCode;
501 delete token;
502 token = nullptr;
503 }
504 continueStmtToken = static_cast<ContinueToken>(token);
505 return errCode;
506 }
507
508 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)509 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
510 {
511 std::vector<DataItem> dataItems;
512 for (const auto &itemEntry : entries) {
513 GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
514 if (entry != nullptr) {
515 DataItem item;
516 item.origDev = entry->GetOrigDevice();
517 item.flag = entry->GetFlag();
518 item.timestamp = entry->GetTimestamp();
519 item.writeTimestamp = entry->GetWriteTimestamp();
520 entry->GetKey(item.key);
521 entry->GetValue(item.value);
522 entry->GetHashKey(item.hashKey);
523 dataItems.push_back(item);
524 }
525 }
526 return dataItems;
527 }
528 }
529
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)530 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
531 const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
532 {
533 std::vector<DataItem> dataItems = ConvertEntries(entries);
534 return PutSyncData(object, dataItems, deviceName);
535 }
536
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)537 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
538 const std::string &deviceName)
539 {
540 int errCode = E_OK;
541 LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
542 QueryObject query = object;
543 auto localSchema = storageEngine_->GetSchema();
544 query.SetSchema(localSchema);
545
546 RelationalSchemaObject filterSchema;
547 errCode = GetRemoteDeviceSchema(deviceName, filterSchema);
548 if (errCode != E_OK) {
549 LOGE("Find remote schema failed. err=%d", errCode);
550 return errCode;
551 }
552 if (!IsSetDistributedSchema(query.GetTableName(), localSchema)) {
553 return -E_SCHEMA_MISMATCH;
554 }
555 if (query.IsUseLocalSchema()) {
556 // remote send always with its table col sort
557 filterSchema.SetDistributedSchema(localSchema.GetDistributedSchema());
558 }
559
560 StoreInfo info = GetStoreInfo();
561 SchemaInfo schemaInfo = {storageEngine_->GetSchema(), storageEngine_->GetTrackerSchema()};
562 auto inserter = RelationalSyncDataInserter::CreateInserter(
563 deviceName, query, schemaInfo, filterSchema.GetSyncFieldInfo(query.GetTableName()), info);
564 inserter.SetEntries(dataItems);
565
566 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
567 if (handle == nullptr) {
568 return errCode;
569 }
570
571 // To prevent certain abnormal scenarios from deleting the table,
572 // check if the table exists before each synchronization.
573 // If the table does not exist, create it.
574 // Because it is a fallback scenario, if the table creation fails, no failure will be returned
575 if (localSchema.GetTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
576 errCode = handle->CreateDistributedDeviceTable(deviceName,
577 storageEngine_->GetSchema().GetTable(query.GetTableName()), info);
578 if (errCode != E_OK) {
579 LOGW("[RelationalSyncAbleStorage::SaveSyncDataItems] Create distributed device table fail %d", errCode);
580 }
581 }
582 DBDfxAdapter::StartTracing();
583
584 handle->SetTableMode(localSchema.GetTableMode());
585 errCode = handle->SaveSyncItems(inserter);
586 ChangedData data = inserter.GetChangedData();
587 data.properties.isP2pSyncDataChange = !dataItems.empty();
588
589 DBDfxAdapter::FinishTracing();
590 bool isEmptyChangedData = data.field.empty() && data.primaryData[OP_INSERT].empty() &&
591 data.primaryData[OP_UPDATE].empty() && data.primaryData[OP_DELETE].empty();
592 if (errCode == E_OK && !isEmptyChangedData) {
593 // dataItems size > 0 now because already check before
594 // all dataItems will write into db now, so need to observer notify here
595 // if some dataItems will not write into db in the future, observer notify here need change
596 data.tableName = query.GetTableName();
597 // SPLIT_BY_DEVICE trigger observer with device, userId, appId and storeId, so trigger with isChangeData false
598 // COLLABORATION trigger observer with changeData, so trigger with isChangeData true
599 TriggerObserverAction(deviceName, std::move(data), storageEngine_->GetRelationalProperties()
600 .GetDistributedTableMode() == DistributedTableMode::COLLABORATION, Origin::ORIGIN_REMOTE);
601 }
602
603 ReleaseHandle(handle);
604 return errCode;
605 }
606
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)607 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
608 const std::string &deviceName)
609 {
610 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
611 LOGW("Device length is invalid for sync put");
612 return -E_INVALID_ARGS;
613 }
614
615 int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
616 if (errCode != E_OK) {
617 LOGE("[Relational] PutSyncData errCode:%d", errCode);
618 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
619 }
620 return errCode;
621 }
622
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)623 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
624 {
625 (void) deviceName;
626 (void) isNeedNotify;
627 return -E_NOT_SUPPORT;
628 }
629
GetSchemaInfo() const630 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
631 {
632 return storageEngine_->GetSchema();
633 }
634
GetSecurityOption(SecurityOption & option) const635 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
636 {
637 std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
638 if (isCachedOption_) {
639 option = securityOption_;
640 return E_OK;
641 }
642 std::string dbPath = storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::DATA_DIR, "");
643 int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
644 if (errCode == E_OK) {
645 option = securityOption_;
646 isCachedOption_ = true;
647 }
648 return errCode;
649 }
650
NotifyRemotePushFinished(const std::string & deviceId) const651 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
652 {
653 return;
654 }
655
656 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const657 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
658 {
659 return OS::GetCurrentSysTimeInMicrosecond(outTime);
660 }
661
GetTablesQuery()662 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
663 {
664 auto tableNames = storageEngine_->GetSchema().GetTableNames();
665 std::vector<QuerySyncObject> queries;
666 queries.reserve(tableNames.size());
667 for (const auto &it : tableNames) {
668 queries.emplace_back(Query::Select(it));
669 }
670 return queries;
671 }
672
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)673 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
674 {
675 (void) queryObj;
676 return -E_NOT_SUPPORT;
677 }
678
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const679 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
680 const std::string &targetID, bool isPush) const
681 {
682 return E_OK;
683 }
684
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)685 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
686 const RelationalSyncStrategy &syncStrategy)
687 {
688 auto mode = storageEngine_->GetRelationalProperties().GetDistributedTableMode();
689 if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
690 LOGD("No need create device table in COLLABORATION mode.");
691 return E_OK;
692 }
693
694 int errCode = E_OK;
695 auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
696 if (handle == nullptr) {
697 return errCode;
698 }
699
700 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
701 if (errCode != E_OK) {
702 LOGE("Start transaction failed:%d", errCode);
703 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
704 ReleaseHandle(handle);
705 return errCode;
706 }
707
708 StoreInfo info = GetStoreInfo();
709 for (const auto &[table, strategy] : syncStrategy) {
710 if (!strategy.permitSync) {
711 continue;
712 }
713
714 errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
715 if (errCode != E_OK) {
716 LOGE("Create distributed device table failed. %d", errCode);
717 break;
718 }
719 }
720
721 if (errCode == E_OK) {
722 errCode = handle->Commit();
723 } else {
724 (void)handle->Rollback();
725 }
726
727 ReleaseHandle(handle);
728 return errCode;
729 }
730
RegisterSchemaChangedCallback(const std::function<void ()> & callback)731 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
732 {
733 std::lock_guard lock(onSchemaChangedMutex_);
734 onSchemaChanged_ = callback;
735 return E_OK;
736 }
737
NotifySchemaChanged()738 void RelationalSyncAbleStorage::NotifySchemaChanged()
739 {
740 std::lock_guard lock(onSchemaChangedMutex_);
741 if (onSchemaChanged_) {
742 LOGD("Notify relational schema was changed");
743 onSchemaChanged_();
744 }
745 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const746 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
747 {
748 algorithmSet.clear();
749 DataCompression::GetCompressionAlgo(algorithmSet);
750 return E_OK;
751 }
752
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)753 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
754 const RelationalObserverAction &action)
755 {
756 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
757 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
758 auto it = dataChangeCallbackMap_.find(connectionId);
759 if (it != dataChangeCallbackMap_.end()) {
760 if (it->second.find(observer) != it->second.end()) {
761 LOGE("obsever already registered");
762 return -E_ALREADY_SET;
763 }
764 if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
765 LOGE("The number of relational observers has been over limit");
766 return -E_MAX_LIMITS;
767 }
768 it->second[observer] = action;
769 } else {
770 dataChangeCallbackMap_[connectionId][observer] = action;
771 }
772 LOGI("register relational observer ok");
773 return E_OK;
774 }
775
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)776 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
777 {
778 if (observer == nullptr) {
779 EraseDataChangeCallback(connectionId);
780 return E_OK;
781 }
782 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
783 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
784 auto it = dataChangeCallbackMap_.find(connectionId);
785 if (it != dataChangeCallbackMap_.end()) {
786 auto action = it->second.find(observer);
787 if (action != it->second.end()) {
788 it->second.erase(action);
789 LOGI("unregister relational observer.");
790 if (it->second.empty()) {
791 dataChangeCallbackMap_.erase(it);
792 LOGI("observer for this delegate is zero now");
793 }
794 return E_OK;
795 }
796 }
797 return -E_NOT_FOUND;
798 }
799
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,Origin origin)800 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
801 const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
802 const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin)
803 {
804 for (auto &action : item.second) {
805 if (action.second == nullptr) {
806 continue;
807 }
808 ChangedData observerChangeData = changedData;
809 if (action.first != nullptr) {
810 FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
811 }
812 action.second(deviceName, std::move(observerChangeData), isChangedData, origin);
813 }
814 }
815
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)816 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
817 ChangedData &&changedData, bool isChangedData)
818 {
819 TriggerObserverAction(deviceName, std::move(changedData), isChangedData, Origin::ORIGIN_CLOUD);
820 }
821
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData,Origin origin)822 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
823 bool isChangedData, Origin origin)
824 {
825 IncObjRef(this);
826 int taskErrCode =
827 ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData, origin] () mutable {
828 LOGD("begin to trigger relational observer.");
829 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
830 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
831 for (const auto &item : dataChangeCallbackMap_) {
832 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, origin);
833 }
834 DecObjRef(this);
835 }, &dataChangeCallbackMap_);
836 if (taskErrCode != E_OK) {
837 LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
838 DecObjRef(this);
839 }
840 }
841
RegisterHeartBeatListener(const std::function<void ()> & listener)842 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
843 {
844 std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
845 heartBeatListener_ = listener;
846 }
847
CheckAndInitQueryCondition(QueryObject & query) const848 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
849 {
850 RelationalSchemaObject schema = storageEngine_->GetSchema();
851 TableInfo table = schema.GetTable(query.GetTableName());
852 if (!table.IsValid()) {
853 LOGE("Query table is not a distributed table.");
854 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
855 }
856 if (table.GetTableSyncType() == CLOUD_COOPERATION) {
857 LOGE("cloud table mode is not support");
858 return -E_NOT_SUPPORT;
859 }
860 query.SetSchema(schema);
861
862 int errCode = E_OK;
863 auto *handle = GetHandle(true, errCode);
864 if (handle == nullptr) {
865 return errCode;
866 }
867
868 errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
869 if (errCode != E_OK) {
870 LOGE("Check relational query condition failed. %d", errCode);
871 TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
872 }
873
874 ReleaseHandle(handle);
875 return errCode;
876 }
877
CheckCompatible(const std::string & schema,uint8_t type) const878 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
879 {
880 // return true if is relational schema.
881 return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
882 }
883
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const884 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
885 std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
886 {
887 if (!storageEngine_->GetSchema().IsSchemaValid()) {
888 return -E_NOT_SUPPORT;
889 }
890 if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
891 LOGE("[ExecuteQuery] invalid args");
892 return -E_INVALID_ARGS;
893 }
894 int errCode = E_OK;
895 auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
896 if (handle == nullptr) {
897 LOGE("[ExecuteQuery] get handle fail:%d", errCode);
898 return errCode;
899 }
900 errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
901 if (errCode != E_OK) {
902 LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
903 }
904 ReleaseHandle(handle);
905 return errCode;
906 }
907
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const908 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
909 RelationalRowDataSet &dataSet, ContinueToken &token) const
910 {
911 dataSet.Clear();
912 if (token == nullptr) {
913 // start query
914 std::vector<std::string> colNames;
915 std::vector<RelationalRowData *> data;
916 ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
917
918 int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
919 if (errCode != E_OK) {
920 return errCode;
921 }
922
923 // create one token
924 token = static_cast<ContinueToken>(
925 new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
926 if (token == nullptr) {
927 LOGE("ExecuteQuery OOM");
928 return -E_OUT_OF_MEMORY;
929 }
930 }
931
932 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
933 if (!remoteToken->CheckValid()) {
934 LOGE("ExecuteQuery invalid token");
935 return -E_INVALID_ARGS;
936 }
937
938 int errCode = remoteToken->GetData(packetSize, dataSet);
939 if (errCode == -E_UNFINISHED) {
940 errCode = E_OK;
941 } else {
942 if (errCode != E_OK) {
943 dataSet.Clear();
944 }
945 delete remoteToken;
946 remoteToken = nullptr;
947 token = nullptr;
948 }
949 LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
950 return errCode;
951 }
952
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)953 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
954 uint8_t type)
955 {
956 if (ReadSchemaType(type) != SchemaType::RELATIVE) {
957 return -E_INVALID_ARGS;
958 }
959
960 RelationalSchemaObject schemaObj;
961 int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
962 if (errCode != E_OK) {
963 LOGE("Parse remote schema failed. err=%d", errCode);
964 return errCode;
965 }
966
967 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
968 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
969 Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
970 errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
971 if (errCode != E_OK) {
972 LOGE("Save remote schema failed. err=%d", errCode);
973 return errCode;
974 }
975
976 return remoteDeviceSchema_.Put(deviceId, remoteSchema);
977 }
978
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const979 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId,
980 RelationalSchemaObject &schemaObj) const
981 {
982 if (schemaObj.IsSchemaValid()) {
983 LOGE("schema is already valid");
984 return -E_INVALID_ARGS;
985 }
986
987 std::string remoteSchema;
988 int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
989 if (errCode == -E_NOT_FOUND) {
990 LOGW("Get remote device schema miss cached.");
991 std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
992 Key remoteSchemaKey(keyStr.begin(), keyStr.end());
993 Value remoteSchemaBuff;
994 errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
995 if (errCode != E_OK) {
996 LOGE("Get remote device schema from meta failed. err=%d", errCode);
997 return errCode;
998 }
999 remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
1000 errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
1001 }
1002
1003 if (errCode != E_OK) {
1004 LOGE("Get remote device schema failed. err=%d", errCode);
1005 return errCode;
1006 }
1007
1008 errCode = schemaObj.ParseFromSchemaString(remoteSchema);
1009 if (errCode != E_OK) {
1010 LOGE("Parse remote schema failed. err=%d", errCode);
1011 }
1012 return errCode;
1013 }
1014
SetReusedHandle(StorageExecutor * handle)1015 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
1016 {
1017 std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
1018 reusedHandle_ = handle;
1019 }
1020
ReleaseRemoteQueryContinueToken(ContinueToken & token) const1021 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
1022 {
1023 auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1024 delete remoteToken;
1025 remoteToken = nullptr;
1026 token = nullptr;
1027 }
1028
GetStoreInfo() const1029 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1030 {
1031 StoreInfo info = {
1032 storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, ""),
1033 storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, ""),
1034 storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "")
1035 };
1036 return info;
1037 }
1038
StartTransaction(TransactType type,bool isAsyncDownload)1039 int RelationalSyncAbleStorage::StartTransaction(TransactType type, bool isAsyncDownload)
1040 {
1041 if (isAsyncDownload) {
1042 return StartTransactionForAsyncDownload(type);
1043 }
1044 if (storageEngine_ == nullptr) {
1045 return -E_INVALID_DB;
1046 }
1047 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1048 if (transactionHandle_ != nullptr) {
1049 LOGD("Transaction started already.");
1050 return -E_TRANSACT_STATE;
1051 }
1052 int errCode = E_OK;
1053 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1054 storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1055 if (handle == nullptr) {
1056 ReleaseHandle(handle);
1057 return errCode;
1058 }
1059 errCode = handle->StartTransaction(type);
1060 if (errCode != E_OK) {
1061 ReleaseHandle(handle);
1062 return errCode;
1063 }
1064 transactionHandle_ = handle;
1065 return errCode;
1066 }
1067
Commit(bool isAsyncDownload)1068 int RelationalSyncAbleStorage::Commit(bool isAsyncDownload)
1069 {
1070 if (isAsyncDownload) {
1071 return CommitForAsyncDownload();
1072 }
1073 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1074 if (transactionHandle_ == nullptr) {
1075 LOGE("relation database is null or the transaction has not been started");
1076 return -E_INVALID_DB;
1077 }
1078 int errCode = transactionHandle_->Commit();
1079 ReleaseHandle(transactionHandle_);
1080 transactionHandle_ = nullptr;
1081 LOGD("connection commit transaction!");
1082 return errCode;
1083 }
1084
Rollback(bool isAsyncDownload)1085 int RelationalSyncAbleStorage::Rollback(bool isAsyncDownload)
1086 {
1087 if (isAsyncDownload) {
1088 return RollbackForAsyncDownload();
1089 }
1090 std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1091 if (transactionHandle_ == nullptr) {
1092 LOGE("Invalid handle for rollback or the transaction has not been started.");
1093 return -E_INVALID_DB;
1094 }
1095
1096 int errCode = transactionHandle_->Rollback();
1097 ReleaseHandle(transactionHandle_);
1098 transactionHandle_ = nullptr;
1099 LOGI("connection rollback transaction!");
1100 return errCode;
1101 }
1102
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1103 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1104 const std::vector<Timestamp> ×tampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1105 {
1106 int errCode = E_OK;
1107 auto *handle = GetHandleExpectTransaction(false, errCode);
1108 if (handle == nullptr) {
1109 return errCode;
1110 }
1111 QuerySyncObject queryObj = query;
1112 queryObj.SetSchema(GetSchemaInfo());
1113 errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1114 if (transactionHandle_ == nullptr) {
1115 ReleaseHandle(handle);
1116 }
1117 return errCode;
1118 }
1119
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1120 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp ×tamp,
1121 bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1122 {
1123 int errCode = E_OK;
1124 auto *handle = GetHandleExpectTransaction(false, errCode);
1125 if (handle == nullptr) {
1126 return errCode;
1127 }
1128 QuerySyncObject queryObj = query;
1129 queryObj.SetSchema(GetSchemaInfo());
1130 errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1131 if (transactionHandle_ == nullptr) {
1132 ReleaseHandle(handle);
1133 }
1134 return errCode;
1135 }
1136
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1137 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1138 const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1139 {
1140 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1141 QuerySyncObject query = querySyncObject;
1142 query.SetSchema(GetSchemaInfo());
1143 auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1144 if (token == nullptr) {
1145 LOGE("[SingleVerNStore] Allocate continue token failed.");
1146 return -E_OUT_OF_MEMORY;
1147 }
1148 token->SetCloudTableSchema(tableSchema);
1149 continueStmtToken = static_cast<ContinueToken>(token);
1150 return GetCloudDataNext(continueStmtToken, cloudDataResult);
1151 }
1152
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1153 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1154 CloudSyncData &cloudDataResult)
1155 {
1156 if (continueStmtToken == nullptr) {
1157 return -E_INVALID_ARGS;
1158 }
1159 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1160 if (!token->CheckValid()) {
1161 return -E_INVALID_ARGS;
1162 }
1163 int errCode = E_OK;
1164 auto *handle = GetHandleExpectTransaction(false, errCode);
1165 if (handle == nullptr) {
1166 LOGE("Invalid handle, release the token, %d", errCode);
1167 ReleaseCloudDataToken(continueStmtToken);
1168 return -E_INVALID_DB;
1169 }
1170 cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1171 auto config = GetCloudSyncConfig();
1172 handle->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1173 errCode = handle->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1174 LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1175 cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1176 cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1177 if (transactionHandle_ == nullptr) {
1178 ReleaseHandle(handle);
1179 }
1180 if (errCode != -E_UNFINISHED) {
1181 delete token;
1182 token = nullptr;
1183 }
1184 continueStmtToken = static_cast<ContinueToken>(token);
1185 if (errCode != E_OK && errCode != -E_UNFINISHED) {
1186 return errCode;
1187 }
1188 int fillRefGidCode = FillReferenceData(cloudDataResult);
1189 return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1190 }
1191
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1192 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1193 bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1194 {
1195 int errCode = E_OK;
1196 auto *handle = GetHandle(false, errCode);
1197 if (handle == nullptr) {
1198 return errCode;
1199 }
1200 Timestamp beginTime = 0u;
1201 SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1202 QuerySyncObject query = querySyncObject;
1203 query.SetSchema(GetSchemaInfo());
1204 handle->SetTableSchema(tableSchema);
1205 errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1206 ReleaseHandle(handle);
1207 if (errCode != E_OK) {
1208 LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1209 }
1210 return errCode;
1211 }
1212
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1213 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1214 {
1215 if (continueStmtToken == nullptr) {
1216 return E_OK;
1217 }
1218 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1219 if (!token->CheckValid()) {
1220 return E_OK;
1221 }
1222 int errCode = token->ReleaseCloudStatement();
1223 delete token;
1224 token = nullptr;
1225 return errCode;
1226 }
1227
GetSchemaFromDB(RelationalSchemaObject & schema)1228 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1229 {
1230 Key schemaKey;
1231 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1232 Value schemaVal;
1233 int errCode = GetMetaData(schemaKey, schemaVal);
1234 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1235 LOGE("Get relational schema from DB failed. %d", errCode);
1236 return errCode;
1237 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1238 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1239 return -E_NOT_FOUND;
1240 }
1241 std::string schemaStr;
1242 DBCommon::VectorToString(schemaVal, schemaStr);
1243 errCode = schema.ParseFromSchemaString(schemaStr);
1244 if (errCode != E_OK) {
1245 LOGE("Parse schema string from DB failed.");
1246 return errCode;
1247 }
1248 storageEngine_->SetSchema(schema);
1249 return errCode;
1250 }
1251
ChkSchema(const TableName & tableName)1252 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1253 {
1254 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1255 RelationalSchemaObject localSchema = GetSchemaInfo();
1256 int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1257 if (errCode == -E_SCHEMA_MISMATCH) {
1258 LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1259 RelationalSchemaObject newSchema;
1260 errCode = GetSchemaFromDB(newSchema);
1261 if (errCode != E_OK) {
1262 LOGE("Get schema from db when check schema. err: %d", errCode);
1263 return -E_SCHEMA_MISMATCH;
1264 }
1265 errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1266 }
1267 return errCode;
1268 }
1269
SetCloudDbSchema(const DataBaseSchema & schema)1270 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1271 {
1272 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1273 RelationalSchemaObject localSchema = GetSchemaInfo();
1274 schemaMgr_.SetCloudDbSchema(schema, localSchema);
1275 return E_OK;
1276 }
1277
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1278 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1279 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1280 {
1281 return GetInfoByPrimaryKeyOrGid(tableName, vBucket, true, dataInfoWithLog, assetInfo);
1282 }
1283
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1284 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1285 const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1286 {
1287 if (handle == nullptr) {
1288 return -E_INVALID_DB;
1289 }
1290 TableSchema tableSchema;
1291 int errCode = GetCloudTableSchema(tableName, tableSchema);
1292 if (errCode != E_OK) {
1293 LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1294 return errCode;
1295 }
1296 RelationalSchemaObject localSchema = GetSchemaInfo();
1297 handle->SetLocalSchema(localSchema);
1298 return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1299 }
1300
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1301 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1302 {
1303 if (transactionHandle_ == nullptr) {
1304 LOGE(" the transaction has not been started");
1305 return -E_INVALID_DB;
1306 }
1307 return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1308 }
1309
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1310 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1311 const std::string &tableName, DownloadData &downloadData)
1312 {
1313 TableSchema tableSchema;
1314 int errCode = GetCloudTableSchema(tableName, tableSchema);
1315 if (errCode != E_OK) {
1316 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1317 return errCode;
1318 }
1319 RelationalSchemaObject localSchema = GetSchemaInfo();
1320 handle->SetLocalSchema(localSchema);
1321 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1322 handle->SetLogicDelete(IsCurrentLogicDelete());
1323 errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1324 handle->SetLogicDelete(false);
1325 return errCode;
1326 }
1327
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1328 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1329 {
1330 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1331 cloudSchema = schemaMgr_.GetCloudDbSchema();
1332 return E_OK;
1333 }
1334
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1335 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1336 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1337 {
1338 if (transactionHandle_ == nullptr) {
1339 LOGE("the transaction has not been started");
1340 return -E_INVALID_DB;
1341 }
1342 transactionHandle_->SetLogicDelete(logicDelete_);
1343 std::vector<std::string> notifyTableList;
1344 int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1345 if (!notifyTableList.empty()) {
1346 for (auto notifyTableName : notifyTableList) {
1347 ChangedData changedData;
1348 changedData.type = ChangedDataType::DATA;
1349 changedData.tableName = notifyTableName;
1350 std::vector<DistributedDB::Type> dataVec;
1351 DistributedDB::Type type;
1352 if (mode == FLAG_ONLY) {
1353 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1354 } else {
1355 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1356 }
1357 dataVec.push_back(type);
1358 changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1359 TriggerObserverAction("CLOUD", std::move(changedData), true);
1360 }
1361 }
1362 transactionHandle_->SetLogicDelete(false);
1363 return errCode;
1364 }
1365
ClearCloudLogVersion(const std::vector<std::string> & tableNameList)1366 int RelationalSyncAbleStorage::ClearCloudLogVersion(const std::vector<std::string> &tableNameList)
1367 {
1368 if (transactionHandle_ == nullptr) {
1369 LOGE("[RelationalSyncAbleStorage][ClearCloudLogVersion] the transaction has not been started");
1370 return -E_INVALID_DB;
1371 }
1372 return transactionHandle_->DoClearCloudLogVersion(tableNameList);
1373 }
1374
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1375 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1376 {
1377 std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1378 return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1379 }
1380
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1381 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1382 bool isDownloadSuccess)
1383 {
1384 if (storageEngine_ == nullptr) {
1385 return -E_INVALID_DB;
1386 }
1387 if (transactionHandle_ == nullptr) {
1388 LOGE("the transaction has not been started when fill asset for download.");
1389 return -E_INVALID_DB;
1390 }
1391 TableSchema tableSchema;
1392 int errCode = GetCloudTableSchema(tableName, tableSchema);
1393 if (errCode != E_OK) {
1394 LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1395 return errCode;
1396 }
1397 uint64_t currCursor = DBConstant::INVALID_CURSOR;
1398 errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess, currCursor);
1399 if (errCode != E_OK) {
1400 LOGE("fill cloud asset for download failed.%d", errCode);
1401 }
1402 return errCode;
1403 }
1404
SetLogTriggerStatus(bool status)1405 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1406 {
1407 int errCode = E_OK;
1408 auto *handle = GetHandleExpectTransaction(false, errCode);
1409 if (handle == nullptr) {
1410 return errCode;
1411 }
1412 errCode = handle->SetLogTriggerStatus(status);
1413 if (transactionHandle_ == nullptr) {
1414 ReleaseHandle(handle);
1415 }
1416 return errCode;
1417 }
1418
SetCursorIncFlag(bool flag)1419 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1420 {
1421 int errCode = E_OK;
1422 auto *handle = GetHandleExpectTransaction(false, errCode);
1423 if (handle == nullptr) {
1424 return errCode;
1425 }
1426 errCode = handle->SetCursorIncFlag(flag);
1427 if (transactionHandle_ == nullptr) {
1428 ReleaseHandle(handle);
1429 }
1430 return errCode;
1431 }
1432
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1433 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1434 bool ignoreEmptyGid)
1435 {
1436 if (storageEngine_ == nullptr) {
1437 return -E_INVALID_DB;
1438 }
1439 int errCode = E_OK;
1440 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1441 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1442 if (writeHandle == nullptr) {
1443 return errCode;
1444 }
1445 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1446 if (errCode != E_OK) {
1447 ReleaseHandle(writeHandle);
1448 return errCode;
1449 }
1450 errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1451 if (errCode != E_OK) {
1452 LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1453 writeHandle->Rollback();
1454 ReleaseHandle(writeHandle);
1455 return errCode;
1456 }
1457 errCode = writeHandle->Commit();
1458 ReleaseHandle(writeHandle);
1459 return errCode;
1460 }
1461
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1462 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1463 {
1464 syncAbleEngine_ = syncAbleEngine;
1465 }
1466
GetIdentify() const1467 std::string RelationalSyncAbleStorage::GetIdentify() const
1468 {
1469 if (storageEngine_ == nullptr) {
1470 LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1471 return "";
1472 }
1473 return storageEngine_->GetIdentifier();
1474 }
1475
EraseDataChangeCallback(uint64_t connectionId)1476 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1477 {
1478 TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1479 ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1480 ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1481 auto it = dataChangeCallbackMap_.find(connectionId);
1482 if (it != dataChangeCallbackMap_.end()) {
1483 dataChangeCallbackMap_.erase(it);
1484 LOGI("erase all observer, %" PRIu64, connectionId);
1485 }
1486 }, nullptr, &dataChangeCallbackMap_);
1487 ADAPTER_WAIT(handle);
1488 }
1489
ReleaseContinueToken(ContinueToken & continueStmtToken) const1490 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1491 {
1492 auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1493 if (token == nullptr || !(token->CheckValid())) {
1494 LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1495 return;
1496 }
1497 delete token;
1498 continueStmtToken = nullptr;
1499 }
1500
CheckQueryValid(const QuerySyncObject & query)1501 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1502 {
1503 int errCode = E_OK;
1504 auto *handle = GetHandle(false, errCode);
1505 if (handle == nullptr) {
1506 return errCode;
1507 }
1508 errCode = handle->CheckQueryObjectLegal(query);
1509 if (errCode != E_OK) {
1510 ReleaseHandle(handle);
1511 return errCode;
1512 }
1513 QuerySyncObject queryObj = query;
1514 queryObj.SetSchema(GetSchemaInfo());
1515 int64_t count = 0;
1516 errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1517 ReleaseHandle(handle);
1518 if (errCode != E_OK) {
1519 LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1520 return -E_INVALID_ARGS;
1521 }
1522 return errCode;
1523 }
1524
CreateTempSyncTrigger(const std::string & tableName)1525 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1526 {
1527 int errCode = E_OK;
1528 auto *handle = GetHandle(true, errCode);
1529 if (handle == nullptr) {
1530 return errCode;
1531 }
1532 errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1533 ReleaseHandle(handle);
1534 if (errCode != E_OK) {
1535 LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1536 }
1537 return errCode;
1538 }
1539
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1540 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1541 ChangeProperties &changeProperties)
1542 {
1543 int errCode = E_OK;
1544 auto *handle = GetHandle(false, errCode);
1545 if (handle == nullptr) {
1546 return errCode;
1547 }
1548 errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1549 ReleaseHandle(handle);
1550 if (errCode != E_OK) {
1551 LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1552 }
1553 return errCode;
1554 }
1555
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1556 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1557 {
1558 if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1559 changedData.field = {};
1560 for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1561 changedData.primaryData[i].clear();
1562 }
1563 }
1564 if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1565 changedData.properties = {};
1566 }
1567 }
1568
ClearAllTempSyncTrigger()1569 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1570 {
1571 int errCode = E_OK;
1572 auto *handle = GetHandle(true, errCode);
1573 if (handle == nullptr) {
1574 return errCode;
1575 }
1576 errCode = handle->ClearAllTempSyncTrigger();
1577 ReleaseHandle(handle);
1578 if (errCode != E_OK) {
1579 LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1580 }
1581 return errCode;
1582 }
1583
FillReferenceData(CloudSyncData & syncData)1584 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1585 {
1586 std::map<int64_t, Entries> referenceGid;
1587 int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1588 if (errCode != E_OK) {
1589 LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1590 return errCode;
1591 }
1592 errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1593 if (errCode != E_OK) {
1594 return errCode;
1595 }
1596 referenceGid.clear();
1597 errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1598 if (errCode != E_OK) {
1599 LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1600 return errCode;
1601 }
1602 return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1603 }
1604
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1605 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1606 const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1607 {
1608 if (referenceGid.empty()) {
1609 return E_OK;
1610 }
1611 int ignoredCount = 0;
1612 for (size_t index = 0u; index < rowid.size(); index++) {
1613 if (index >= extend.size()) {
1614 LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1615 return -E_UNEXPECTED_DATA;
1616 }
1617 int64_t rowId = rowid[index];
1618 if (referenceGid.find(rowId) == referenceGid.end()) {
1619 // current data miss match reference data, we ignored it
1620 ignoredCount++;
1621 continue;
1622 }
1623 extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1624 }
1625 if (ignoredCount != 0) {
1626 LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1627 }
1628 return E_OK;
1629 }
1630
IsSharedTable(const std::string & tableName)1631 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1632 {
1633 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1634 return schemaMgr_.IsSharedTable(tableName);
1635 }
1636
GetSharedTableOriginNames()1637 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1638 {
1639 std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1640 return schemaMgr_.GetSharedTableOriginNames();
1641 }
1642
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1643 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1644 std::map<int64_t, Entries> &referenceGid)
1645 {
1646 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1647 int errCode = GetTableReference(tableName, tableReference);
1648 if (errCode != E_OK) {
1649 return errCode;
1650 }
1651 if (tableReference.empty()) {
1652 LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1653 return E_OK;
1654 }
1655 auto *handle = GetHandle(false, errCode);
1656 if (handle == nullptr) {
1657 return errCode;
1658 }
1659 errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1660 ReleaseHandle(handle);
1661 return errCode;
1662 }
1663
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1664 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1665 std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1666 {
1667 if (storageEngine_ == nullptr) {
1668 LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1669 return -E_INVALID_DB;
1670 }
1671 RelationalSchemaObject schema = storageEngine_->GetSchema();
1672 auto referenceProperty = schema.GetReferenceProperty();
1673 if (referenceProperty.empty()) {
1674 return E_OK;
1675 }
1676 auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1677 if (errCode != E_OK) {
1678 return errCode;
1679 }
1680 for (const auto &property : referenceProperty) {
1681 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1682 if (!IsSharedTable(tableName)) {
1683 reference[property.targetTableName].push_back(property);
1684 continue;
1685 }
1686 TableReferenceProperty tableReference;
1687 tableReference.sourceTableName = tableName;
1688 tableReference.columns = property.columns;
1689 tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1690 auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1691 if (ret != E_OK) {
1692 return ret;
1693 }
1694 tableReference.targetTableName = sharedTargetTable;
1695 reference[tableReference.targetTableName].push_back(tableReference);
1696 }
1697 }
1698 return E_OK;
1699 }
1700
GetSourceTableName(const std::string & tableName)1701 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1702 {
1703 std::pair<std::string, int> res = { "", E_OK };
1704 std::shared_ptr<DataBaseSchema> cloudSchema;
1705 (void) GetCloudDbSchema(cloudSchema);
1706 if (cloudSchema == nullptr) {
1707 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1708 return { "", -E_INTERNAL_ERROR };
1709 }
1710 for (const auto &table : cloudSchema->tables) {
1711 if (CloudStorageUtils::IsSharedTable(table)) {
1712 continue;
1713 }
1714 if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1715 DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1716 res.first = table.name;
1717 break;
1718 }
1719 }
1720 if (res.first.empty()) {
1721 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1722 res.second = -E_SCHEMA_MISMATCH;
1723 }
1724 return res;
1725 }
1726
GetSharedTargetTableName(const std::string & tableName)1727 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1728 {
1729 std::pair<std::string, int> res = { "", E_OK };
1730 std::shared_ptr<DataBaseSchema> cloudSchema;
1731 (void) GetCloudDbSchema(cloudSchema);
1732 if (cloudSchema == nullptr) {
1733 LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1734 return { "", -E_INTERNAL_ERROR };
1735 }
1736 for (const auto &table : cloudSchema->tables) {
1737 if (CloudStorageUtils::IsSharedTable(table)) {
1738 continue;
1739 }
1740 if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1741 res.first = table.sharedTableName;
1742 break;
1743 }
1744 }
1745 if (res.first.empty()) {
1746 LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1747 res.second = -E_SCHEMA_MISMATCH;
1748 }
1749 return res;
1750 }
1751
SetLogicDelete(bool logicDelete)1752 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1753 {
1754 logicDelete_ = logicDelete;
1755 LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1756 }
1757
IsCurrentLogicDelete() const1758 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1759 {
1760 return logicDelete_;
1761 }
1762
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1763 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1764 const std::string &gid, const Bytes &hashKey, VBucket &assets)
1765 {
1766 if (gid.empty() && hashKey.empty()) {
1767 LOGE("both gid and hashKey are empty.");
1768 return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1769 }
1770 if (transactionHandle_ == nullptr) {
1771 LOGE("the transaction has not been started");
1772 return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1773 }
1774 auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1775 if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1776 LOGE("get assets by gid or hashKey failed. %d", errCode);
1777 }
1778 return { errCode, status };
1779 }
1780
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1781 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1782 {
1783 int errCode = E_OK;
1784 auto *wHandle = GetHandle(true, errCode);
1785 if (wHandle == nullptr) {
1786 return errCode;
1787 }
1788 wHandle->SetIAssetLoader(loader);
1789 ReleaseHandle(wHandle);
1790 return errCode;
1791 }
1792
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1793 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1794 const std::vector<VBucket> &records)
1795 {
1796 int errCode = E_OK;
1797 auto *handle = GetHandle(true, errCode);
1798 if (handle == nullptr || errCode != E_OK) {
1799 return errCode;
1800 }
1801 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1802 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1803 errCode = UpsertDataInner(handle, tableName, records);
1804 handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1805 handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1806 ReleaseHandle(handle);
1807 return errCode;
1808 }
1809
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1810 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1811 const std::string &tableName, const std::vector<VBucket> &records)
1812 {
1813 int errCode = E_OK;
1814 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1815 if (errCode != E_OK) {
1816 LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1817 return errCode;
1818 }
1819 errCode = CreateTempSyncTriggerInner(handle, tableName);
1820 if (errCode == E_OK) {
1821 errCode = UpsertDataInTransaction(handle, tableName, records);
1822 (void) handle->ClearAllTempSyncTrigger();
1823 }
1824 if (errCode == E_OK) {
1825 errCode = handle->Commit();
1826 if (errCode != E_OK) {
1827 LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1828 }
1829 } else {
1830 int ret = handle->Rollback();
1831 if (ret != E_OK) {
1832 LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1833 }
1834 }
1835 return errCode;
1836 }
1837
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1838 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1839 const std::string &tableName, const std::vector<VBucket> &records)
1840 {
1841 TableSchema tableSchema;
1842 int errCode = GetCloudTableSchema(tableName, tableSchema);
1843 if (errCode != E_OK) {
1844 LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1845 return errCode;
1846 }
1847 TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1848 std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1849 std::set<std::vector<uint8_t>> primaryKeys;
1850 DownloadData downloadData;
1851 for (const auto &record : records) {
1852 DataInfoWithLog dataInfoWithLog;
1853 VBucket assetInfo;
1854 auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1855 tableSchema, localTable, pkMap, false);
1856 if (errorCode != E_OK) {
1857 return errorCode;
1858 }
1859 errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1860 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1861 return errCode;
1862 }
1863 VBucket recordCopy = record;
1864 if ((errCode == -E_NOT_FOUND ||
1865 (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1866 primaryKeys.find(hashValue) == primaryKeys.end()) {
1867 downloadData.opType.push_back(OpType::INSERT);
1868 auto currentTime = TimeHelper::GetSysCurrentTime();
1869 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1870 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1871 primaryKeys.insert(hashValue);
1872 } else {
1873 downloadData.opType.push_back(OpType::UPDATE);
1874 recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1875 recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1876 recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1877 recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1878 recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1879 }
1880 downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1881 downloadData.data.push_back(std::move(recordCopy));
1882 }
1883 return PutCloudSyncDataInner(handle, tableName, downloadData);
1884 }
1885
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1886 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1887 const LogInfo &logInfo)
1888 {
1889 if (transactionHandle_ == nullptr) {
1890 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1891 return -E_INVALID_DB;
1892 }
1893 TableSchema tableSchema;
1894 GetCloudTableSchema(tableName, tableSchema);
1895 std::vector<VBucket> assets;
1896 int errCode = transactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
1897 if (errCode != E_OK) {
1898 LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d",
1899 DBCommon::StringMiddleMasking(logInfo.cloudGid).c_str(), errCode);
1900 return errCode;
1901 }
1902 bool isInconsistency = !assets.empty();
1903 UpdateRecordFlagStruct updateRecordFlag = {
1904 .tableName = tableName,
1905 .isRecordConflict = recordConflict,
1906 .isInconsistency = isInconsistency
1907 };
1908 std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
1909 return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1910 }
1911
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1912 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1913 OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1914 {
1915 TableSchema tableSchema;
1916 int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1917 if (errCode != E_OK) {
1918 LOGE("get table schema failed when fill log and asset. %d", errCode);
1919 return errCode;
1920 }
1921 errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1922 if (errCode != E_OK) {
1923 return errCode;
1924 }
1925 if (opType == OpType::INSERT) {
1926 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1927 handle, {data.tableName, CloudWaterType::INSERT, tableSchema}, data.insData, uploadRecorder_);
1928 } else if (opType == OpType::UPDATE) {
1929 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1930 handle, {data.tableName, CloudWaterType::UPDATE, tableSchema}, data.updData, uploadRecorder_);
1931 } else if (opType == OpType::DELETE) {
1932 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1933 handle, {data.tableName, CloudWaterType::DELETE, tableSchema}, data.delData, uploadRecorder_);
1934 } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1935 errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1936 handle, {data.tableName, CloudWaterType::BUTT, tableSchema}, data.lockData, uploadRecorder_, true);
1937 }
1938 return errCode;
1939 }
1940
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users,bool isQueryDownloadRecords)1941 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1942 std::vector<std::string> &users, bool isQueryDownloadRecords)
1943 {
1944 std::vector<TableSchema> tables;
1945 int errCode = GetCloudTableWithoutShared(tables);
1946 if (errCode != E_OK) {
1947 return errCode;
1948 }
1949 if (tables.empty()) {
1950 LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1951 return E_OK;
1952 }
1953 auto *handle = GetHandle(true, errCode);
1954 if (handle == nullptr || errCode != E_OK) {
1955 return errCode;
1956 }
1957 errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery, isQueryDownloadRecords);
1958 ReleaseHandle(handle);
1959 return errCode;
1960 }
1961
ClearUnLockingNoNeedCompensated()1962 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1963 {
1964 std::vector<TableSchema> tables;
1965 int errCode = GetCloudTableWithoutShared(tables);
1966 if (errCode != E_OK) {
1967 return errCode;
1968 }
1969 if (tables.empty()) {
1970 LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1971 return E_OK;
1972 }
1973 auto *handle = GetHandle(true, errCode);
1974 if (handle == nullptr || errCode != E_OK) {
1975 return errCode;
1976 }
1977 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1978 if (errCode != E_OK) {
1979 ReleaseHandle(handle);
1980 return errCode;
1981 }
1982 for (const auto &table : tables) {
1983 errCode = handle->ClearUnLockingStatus(table.name);
1984 if (errCode != E_OK) {
1985 LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1986 }
1987 }
1988 errCode = handle->Commit();
1989 if (errCode != E_OK) {
1990 LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1991 }
1992 ReleaseHandle(handle);
1993 return errCode;
1994 }
1995
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1996 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1997 {
1998 const auto tableInfos = GetSchemaInfo().GetTables();
1999 for (const auto &[tableName, info] : tableInfos) {
2000 if (info.GetSharedTableMark()) {
2001 continue;
2002 }
2003 TableSchema schema;
2004 int errCode = GetCloudTableSchema(tableName, schema);
2005 if (errCode == -E_NOT_FOUND) {
2006 continue;
2007 }
2008 if (errCode != E_OK) {
2009 LOGW("[RDBStorage] Get cloud table failed %d", errCode);
2010 return errCode;
2011 }
2012 tables.push_back(schema);
2013 }
2014 return E_OK;
2015 }
2016
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery,bool isQueryDownloadRecords)2017 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2018 const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords)
2019 {
2020 int errCode = E_OK;
2021 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2022 if (errCode != E_OK) {
2023 return errCode;
2024 }
2025 for (const auto &table : tables) {
2026 if (!CheckTableSupportCompensatedSync(table)) {
2027 continue;
2028 }
2029
2030 std::vector<VBucket> syncDataPk;
2031 errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk, isQueryDownloadRecords);
2032 if (errCode != E_OK) {
2033 LOGW("[RDBStorageEngine] Get wait compensated sync data failed, continue! errCode=%d", errCode);
2034 errCode = E_OK;
2035 continue;
2036 }
2037 if (syncDataPk.empty()) {
2038 // no data need to compensated sync
2039 continue;
2040 }
2041 errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncQuery);
2042 if (errCode != E_OK) {
2043 LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2044 errCode = E_OK;
2045 continue;
2046 }
2047 }
2048 if (errCode == E_OK) {
2049 errCode = handle->Commit();
2050 if (errCode != E_OK) {
2051 LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2052 }
2053 } else {
2054 int ret = handle->Rollback();
2055 if (ret != E_OK) {
2056 LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2057 }
2058 }
2059 return errCode;
2060 }
2061
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2062 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2063 const std::string &tableName, bool flag)
2064 {
2065 TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2066 if (trackerTable.IsEmpty()) {
2067 trackerTable.SetTableName(tableName);
2068 }
2069 return handle->CreateTempSyncTrigger(trackerTable, flag);
2070 }
2071
CheckTableSupportCompensatedSync(const TableSchema & table)2072 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2073 {
2074 auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2075 return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2076 field.type == TYPE_INDEX<Bytes>);
2077 });
2078 if (it != table.fields.end()) {
2079 LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2080 return false;
2081 }
2082 // check whether reference exist
2083 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2084 int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2085 if (errCode != E_OK) {
2086 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2087 return false;
2088 }
2089 if (!tableReference.empty()) {
2090 LOGI("[RDBStorageEngine] current table exist reference property");
2091 return false;
2092 }
2093 return true;
2094 }
2095
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2096 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2097 const std::set<std::string> &gidFilters)
2098 {
2099 if (transactionHandle_ == nullptr) {
2100 LOGE("the transaction has not been started");
2101 return -E_INVALID_DB;
2102 }
2103 int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2104 if (errCode != E_OK) {
2105 LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2106 }
2107 return errCode;
2108 }
2109
GetCloudSyncConfig() const2110 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2111 {
2112 std::lock_guard<std::mutex> autoLock(configMutex_);
2113 return cloudSyncConfig_;
2114 }
2115
SetCloudSyncConfig(const CloudSyncConfig & config)2116 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2117 {
2118 std::lock_guard<std::mutex> autoLock(configMutex_);
2119 cloudSyncConfig_ = config;
2120 LOGI("[RelationalSyncAbleStorage] SetCloudSyncConfig value:[%" PRId32 ", %" PRId32 ", %" PRId32 ", %d]",
2121 cloudSyncConfig_.maxUploadCount, cloudSyncConfig_.maxUploadSize,
2122 cloudSyncConfig_.maxRetryConflictTimes, cloudSyncConfig_.isSupportEncrypt);
2123 }
2124
IsTableExistReference(const std::string & table)2125 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2126 {
2127 // check whether reference exist
2128 std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2129 int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2130 if (errCode != E_OK) {
2131 LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2132 return false;
2133 }
2134 return !tableReference.empty();
2135 }
2136
IsTableExistReferenceOrReferenceBy(const std::string & table)2137 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2138 {
2139 // check whether reference or reference by exist
2140 if (storageEngine_ == nullptr) {
2141 LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2142 return false;
2143 }
2144 RelationalSchemaObject schema = storageEngine_->GetSchema();
2145 auto referenceProperty = schema.GetReferenceProperty();
2146 if (referenceProperty.empty()) {
2147 return false;
2148 }
2149 auto [sourceTableName, errCode] = GetSourceTableName(table);
2150 if (errCode != E_OK) {
2151 return false;
2152 }
2153 for (const auto &property : referenceProperty) {
2154 if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2155 DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2156 return true;
2157 }
2158 }
2159 return false;
2160 }
2161
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)2162 int RelationalSyncAbleStorage::ReviseLocalModTime(const std::string &tableName,
2163 const std::vector<ReviseModTimeInfo> &revisedData)
2164 {
2165 if (storageEngine_ == nullptr) {
2166 LOGE("[ReviseLocalModTime] Storage is null");
2167 return -E_INVALID_DB;
2168 }
2169 int errCode = E_OK;
2170 auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
2171 storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
2172 if (writeHandle == nullptr) {
2173 LOGE("[ReviseLocalModTime] Get write handle fail: %d", errCode);
2174 return errCode;
2175 }
2176 errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
2177 if (errCode != E_OK) {
2178 LOGE("[ReviseLocalModTime] Start Transaction fail: %d", errCode);
2179 ReleaseHandle(writeHandle);
2180 return errCode;
2181 }
2182 errCode = writeHandle->ReviseLocalModTime(tableName, revisedData);
2183 if (errCode != E_OK) {
2184 LOGE("[ReviseLocalModTime] Revise local modify time fail: %d", errCode);
2185 writeHandle->Rollback();
2186 ReleaseHandle(writeHandle);
2187 return errCode;
2188 }
2189 errCode = writeHandle->Commit();
2190 ReleaseHandle(writeHandle);
2191 return errCode;
2192 }
2193
GetCursor(const std::string & tableName,uint64_t & cursor)2194 int RelationalSyncAbleStorage::GetCursor(const std::string &tableName, uint64_t &cursor)
2195 {
2196 if (transactionHandle_ == nullptr) {
2197 LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
2198 return -E_INVALID_DB;
2199 }
2200 return transactionHandle_->GetCursor(tableName, cursor);
2201 }
2202
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)2203 int RelationalSyncAbleStorage::GetLocalDataCount(const std::string &tableName, int &dataCount,
2204 int &logicDeleteDataCount)
2205 {
2206 int errCode = E_OK;
2207 auto *handle = GetHandle(false, errCode);
2208 if (handle == nullptr || errCode != E_OK) {
2209 LOGE("[RelationalSyncAbleStorage] Get handle failed when get local data count: %d", errCode);
2210 return errCode;
2211 }
2212 errCode = handle->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
2213 ReleaseHandle(handle);
2214 return errCode;
2215 }
2216 }
2217 #endif
2218