• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34 
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41         std::string(CLOSE_CONN_TASK),
42         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43     );
44 }
45 }
46 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48     : storageEngine_(std::move(engine)),
49       reusedHandle_(nullptr),
50       isCachedOption_(false)
51 {}
52 
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55     syncAbleEngine_ = nullptr;
56 }
57 
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61     return SYNC_RELATION;
62 }
63 
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67     LOGD("RelationalSyncAbleStorage ref +1");
68     IncObjRef(this);
69 }
70 
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74     LOGD("RelationalSyncAbleStorage ref -1");
75     DecObjRef(this);
76 }
77 
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81     std::string identifier = storageEngine_->GetIdentifier();
82     return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84 
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87     std::string identifier = storageEngine_->GetRelationalProperties().GetStringProp(
88         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90     return identifierVect;
91 }
92 
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
95 {
96     int errCode = E_OK;
97     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98     if (handle == nullptr) {
99         return;
100     }
101     timestamp = 0;
102     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103     if (errCode != E_OK) {
104         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
106     }
107     ReleaseHandle(handle);
108 }
109 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const110 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
111 {
112     int errCode = E_OK;
113     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
114     if (handle == nullptr) {
115         return errCode;
116     }
117     timestamp = 0;
118     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
119     if (errCode != E_OK) {
120         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
121         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
122     }
123     ReleaseHandle(handle);
124     return errCode;
125 }
126 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const127 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
128     OperatePerm perm) const
129 {
130     if (storageEngine_ == nullptr) {
131         errCode = -E_INVALID_DB;
132         return nullptr;
133     }
134     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
135         storageEngine_->FindExecutor(isWrite, perm, errCode));
136     if (handle == nullptr) {
137         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
138     }
139     return handle;
140 }
141 
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const142 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
143     int &errCode, OperatePerm perm) const
144 {
145     if (storageEngine_ == nullptr) {
146         errCode = -E_INVALID_DB;
147         return nullptr;
148     }
149     if (transactionHandle_ != nullptr) {
150         return transactionHandle_;
151     }
152     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
153         storageEngine_->FindExecutor(isWrite, perm, errCode));
154     if (errCode != E_OK) {
155         ReleaseHandle(handle);
156         handle = nullptr;
157     }
158     return handle;
159 }
160 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const161 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
162 {
163     if (storageEngine_ == nullptr) {
164         return;
165     }
166     StorageExecutor *databaseHandle = handle;
167     storageEngine_->Recycle(databaseHandle);
168     std::function<void()> listener = nullptr;
169     {
170         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
171         listener = heartBeatListener_;
172     }
173     if (listener) {
174         listener();
175     }
176 }
177 
178 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const179 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
180 {
181     if (key.size() > DBConstant::MAX_KEY_SIZE) {
182         return -E_INVALID_ARGS;
183     }
184     int errCode = E_OK;
185     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
186     if (handle == nullptr) {
187         return errCode;
188     }
189     errCode = handle->GetKvData(key, value);
190     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
191         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
192     }
193     ReleaseHandle(handle);
194     return errCode;
195 }
196 
GetMetaDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const197 int RelationalSyncAbleStorage::GetMetaDataByPrefixKey(const Key &keyPrefix, std::map<Key, Value> &data) const
198 {
199     if (keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
200         return -E_INVALID_ARGS;
201     }
202     int errCode = E_OK;
203     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
204     if (handle == nullptr) {
205         return errCode;
206     }
207     errCode = handle->GetKvDataByPrefixKey(keyPrefix, data);
208     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
209         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
210     }
211     ReleaseHandle(handle);
212     return errCode;
213 }
214 
215 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)216 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
217 {
218     int errCode = E_OK;
219     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
220     if (handle == nullptr) {
221         return errCode;
222     }
223 
224     errCode = handle->PutKvData(key, value); // meta doesn't need time.
225     if (errCode != E_OK) {
226         LOGE("Put kv data err:%d", errCode);
227         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
228     }
229     ReleaseHandle(handle);
230     return errCode;
231 }
232 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)233 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
234 {
235     if (storageEngine_ == nullptr) {
236         return -E_INVALID_DB;
237     }
238     int errCode = E_OK;
239     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
240     std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
241 
242     // try to recycle using the handle
243     if (isInTransaction) {
244         handLock.lock();
245         if (reusedHandle_ != nullptr) {
246             handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
247         } else {
248             isInTransaction = false;
249             handLock.unlock();
250         }
251     }
252 
253     if (handle == nullptr) {
254         handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
255         if (handle == nullptr) {
256             return errCode;
257         }
258     }
259 
260     errCode = handle->PutKvData(key, value);
261     if (errCode != E_OK) {
262         LOGE("Put kv data err:%d", errCode);
263         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
264     }
265     if (!isInTransaction) {
266         ReleaseHandle(handle);
267     }
268     return errCode;
269 }
270 
271 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)272 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
273 {
274     for (const auto &key : keys) {
275         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
276             return -E_INVALID_ARGS;
277         }
278     }
279     int errCode = E_OK;
280     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
281     if (handle == nullptr) {
282         return errCode;
283     }
284 
285     handle->StartTransaction(TransactType::IMMEDIATE);
286     errCode = handle->DeleteMetaData(keys);
287     if (errCode != E_OK) {
288         handle->Rollback();
289         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
290         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
291     } else {
292         handle->Commit();
293     }
294     ReleaseHandle(handle);
295     return errCode;
296 }
297 
298 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const299 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
300 {
301     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
302         return -E_INVALID_ARGS;
303     }
304 
305     int errCode = E_OK;
306     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
307     if (handle == nullptr) {
308         return errCode;
309     }
310 
311     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
312     if (errCode != E_OK) {
313         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
314         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
315     }
316     ReleaseHandle(handle);
317     return errCode;
318 }
319 
320 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const321 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
322 {
323     int errCode = E_OK;
324     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
325     if (handle == nullptr) {
326         return errCode;
327     }
328 
329     errCode = handle->GetAllMetaKeys(keys);
330     if (errCode != E_OK) {
331         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
332     }
333     ReleaseHandle(handle);
334     return errCode;
335 }
336 
GetDbProperties() const337 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
338 {
339     return storageEngine_->GetProperties();
340 }
341 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)342 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
343 {
344     int errCode = E_OK;
345     for (auto &item : dataItems) {
346         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
347         if (entry == nullptr) {
348             errCode = -E_OUT_OF_MEMORY;
349             LOGE("GetKvEntries failed, errCode:%d", errCode);
350             SingleVerKvEntry::Release(entries);
351             break;
352         }
353         entry->SetEntryData(std::move(item));
354         entries.push_back(entry);
355     }
356     return errCode;
357 }
358 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)359 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
360 {
361     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
362     // the size would not be very large.
363     static const size_t maxOrigDevLength = 40;
364     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
365     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
366                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
367     return dataSize;
368 }
369 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)370 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
371     size_t appendLen)
372 {
373     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
374     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
375         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
376         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
377     }
378     return !reachThreshold;
379 }
380 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)381 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
382     SQLiteSingleVerRelationalContinueToken *&token)
383 {
384     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
385         delete token;
386         token = nullptr;
387         return;
388     }
389 
390     if (dataItems.empty()) {
391         errCode = -E_INTERNAL_ERROR;
392         LOGE("Get data unfinished but data items is empty.");
393         delete token;
394         token = nullptr;
395         return;
396     }
397     token->SetNextBeginTime(dataItems.back());
398     token->UpdateNextSyncOffset(dataItems.size());
399 }
400 
401 /**
402  * Caller must ensure that parameter token is valid.
403  * If error happened, token will be deleted here.
404  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo,RelationalSchemaObject && filterSchema) const405 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
406     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo,
407     RelationalSchemaObject &&filterSchema) const
408 {
409     if (storageEngine_ == nullptr) {
410         return -E_INVALID_DB;
411     }
412 
413     int errCode = E_OK;
414     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
415         OperatePerm::NORMAL_PERM, errCode));
416     if (handle == nullptr) {
417         goto ERROR;
418     }
419     handle->SetLocalSchema(filterSchema);
420     do {
421         errCode = handle->GetSyncDataByQuery(dataItems,
422             Parcel::GetAppendedLen(),
423             dataSizeInfo,
424             [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
425                 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
426             }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
427         if (errCode == -E_FINISHED) {
428             token->FinishGetData();
429             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
430         }
431     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
432 
433 ERROR:
434     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
435         dataItems.clear();
436     }
437     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
438     ReleaseHandle(handle);
439     return errCode;
440 }
441 
442 // use kv struct data to sync
443 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const444 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
445     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
446     std::vector<SingleVerKvEntry *> &entries) const
447 {
448     if (!timeRange.IsValid()) {
449         return -E_INVALID_ARGS;
450     }
451     query.SetSchema(storageEngine_->GetSchema());
452     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
453     if (token == nullptr) {
454         LOGE("[SingleVerNStore] Allocate continue token failed.");
455         return -E_OUT_OF_MEMORY;
456     }
457 
458     continueStmtToken = static_cast<ContinueToken>(token);
459     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
460 }
461 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const462 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
463     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
464 {
465     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
466     if (token == nullptr) {
467         LOGE("[SingleVerNStore] Allocate continue stmt token failed.");
468         return -E_OUT_OF_MEMORY;
469     }
470     if (!token->CheckValid()) {
471         return -E_INVALID_ARGS;
472     }
473     RelationalSchemaObject schema = storageEngine_->GetSchema();
474     RelationalSchemaObject filterSchema;
475     if (token->IsUseLocalSchema()) {
476         filterSchema = schema;
477     } else {
478         int errCode = GetRemoteDeviceSchema(token->GetRemoteDev(), filterSchema);
479         if (errCode != E_OK) {
480             return errCode;
481         }
482     }
483     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
484     std::vector<std::string> fieldNames;
485     fieldNames.reserve(fieldInfos.size());
486     for (const auto &fieldInfo : fieldInfos) { // order by cid
487         fieldNames.push_back(fieldInfo.GetFieldName());
488     }
489     token->SetFieldNames(fieldNames);
490 
491     std::vector<DataItem> dataItems;
492     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo, std::move(filterSchema));
493     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
494         continueStmtToken = static_cast<ContinueToken>(token);
495         return errCode;
496     }
497 
498     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
499     if (innerCode != E_OK) {
500         errCode = innerCode;
501         delete token;
502         token = nullptr;
503     }
504     continueStmtToken = static_cast<ContinueToken>(token);
505     return errCode;
506 }
507 
508 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)509 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
510 {
511     std::vector<DataItem> dataItems;
512     for (const auto &itemEntry : entries) {
513         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
514         if (entry != nullptr) {
515             DataItem item;
516             item.origDev = entry->GetOrigDevice();
517             item.flag = entry->GetFlag();
518             item.timestamp = entry->GetTimestamp();
519             item.writeTimestamp = entry->GetWriteTimestamp();
520             entry->GetKey(item.key);
521             entry->GetValue(item.value);
522             entry->GetHashKey(item.hashKey);
523             dataItems.push_back(item);
524         }
525     }
526     return dataItems;
527 }
528 }
529 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)530 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
531     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
532 {
533     std::vector<DataItem> dataItems = ConvertEntries(entries);
534     return PutSyncData(object, dataItems, deviceName);
535 }
536 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)537 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
538     const std::string &deviceName)
539 {
540     int errCode = E_OK;
541     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
542     QueryObject query = object;
543     auto localSchema = storageEngine_->GetSchema();
544     query.SetSchema(localSchema);
545 
546     RelationalSchemaObject filterSchema;
547     errCode = GetRemoteDeviceSchema(deviceName, filterSchema);
548     if (errCode != E_OK) {
549         LOGE("Find remote schema failed. err=%d", errCode);
550         return errCode;
551     }
552     if (!IsSetDistributedSchema(query.GetTableName(), localSchema)) {
553         return -E_SCHEMA_MISMATCH;
554     }
555     if (query.IsUseLocalSchema()) {
556         // remote send always with its table col sort
557         filterSchema.SetDistributedSchema(localSchema.GetDistributedSchema());
558     }
559 
560     StoreInfo info = GetStoreInfo();
561     SchemaInfo schemaInfo = {storageEngine_->GetSchema(), storageEngine_->GetTrackerSchema()};
562     auto inserter = RelationalSyncDataInserter::CreateInserter(
563         deviceName, query, schemaInfo, filterSchema.GetSyncFieldInfo(query.GetTableName()), info);
564     inserter.SetEntries(dataItems);
565 
566     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
567     if (handle == nullptr) {
568         return errCode;
569     }
570 
571     // To prevent certain abnormal scenarios from deleting the table,
572     // check if the table exists before each synchronization.
573     // If the table does not exist, create it.
574     // Because it is a fallback scenario, if the table creation fails, no failure will be returned
575     if (localSchema.GetTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
576         errCode = handle->CreateDistributedDeviceTable(deviceName,
577             storageEngine_->GetSchema().GetTable(query.GetTableName()), info);
578         if (errCode != E_OK) {
579             LOGW("[RelationalSyncAbleStorage::SaveSyncDataItems] Create distributed device table fail %d", errCode);
580         }
581     }
582     DBDfxAdapter::StartTracing();
583 
584     handle->SetTableMode(localSchema.GetTableMode());
585     errCode = handle->SaveSyncItems(inserter);
586     ChangedData data = inserter.GetChangedData();
587     data.properties.isP2pSyncDataChange = !dataItems.empty();
588 
589     DBDfxAdapter::FinishTracing();
590     bool isEmptyChangedData = data.field.empty() && data.primaryData[OP_INSERT].empty() &&
591         data.primaryData[OP_UPDATE].empty() && data.primaryData[OP_DELETE].empty();
592     if (errCode == E_OK && !isEmptyChangedData) {
593         // dataItems size > 0 now because already check before
594         // all dataItems will write into db now, so need to observer notify here
595         // if some dataItems will not write into db in the future, observer notify here need change
596         data.tableName = query.GetTableName();
597         // SPLIT_BY_DEVICE trigger observer with device, userId, appId and storeId, so trigger with isChangeData false
598         // COLLABORATION   trigger observer with changeData, so trigger with isChangeData true
599         TriggerObserverAction(deviceName, std::move(data), storageEngine_->GetRelationalProperties()
600             .GetDistributedTableMode() == DistributedTableMode::COLLABORATION, Origin::ORIGIN_REMOTE);
601     }
602 
603     ReleaseHandle(handle);
604     return errCode;
605 }
606 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)607 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
608     const std::string &deviceName)
609 {
610     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
611         LOGW("Device length is invalid for sync put");
612         return -E_INVALID_ARGS;
613     }
614 
615     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
616     if (errCode != E_OK) {
617         LOGE("[Relational] PutSyncData errCode:%d", errCode);
618         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
619     }
620     return errCode;
621 }
622 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)623 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
624 {
625     (void) deviceName;
626     (void) isNeedNotify;
627     return -E_NOT_SUPPORT;
628 }
629 
GetSchemaInfo() const630 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
631 {
632     return storageEngine_->GetSchema();
633 }
634 
GetSecurityOption(SecurityOption & option) const635 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
636 {
637     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
638     if (isCachedOption_) {
639         option = securityOption_;
640         return E_OK;
641     }
642     std::string dbPath = storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::DATA_DIR, "");
643     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
644     if (errCode == E_OK) {
645         option = securityOption_;
646         isCachedOption_ = true;
647     }
648     return errCode;
649 }
650 
NotifyRemotePushFinished(const std::string & deviceId) const651 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
652 {
653     return;
654 }
655 
656 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const657 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
658 {
659     return OS::GetCurrentSysTimeInMicrosecond(outTime);
660 }
661 
GetTablesQuery()662 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
663 {
664     auto tableNames = storageEngine_->GetSchema().GetTableNames();
665     std::vector<QuerySyncObject> queries;
666     queries.reserve(tableNames.size());
667     for (const auto &it : tableNames) {
668         queries.emplace_back(Query::Select(it));
669     }
670     return queries;
671 }
672 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)673 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
674 {
675     (void) queryObj;
676     return -E_NOT_SUPPORT;
677 }
678 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const679 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
680     const std::string &targetID, bool isPush) const
681 {
682     return E_OK;
683 }
684 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)685 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
686     const RelationalSyncStrategy &syncStrategy)
687 {
688     auto mode = storageEngine_->GetRelationalProperties().GetDistributedTableMode();
689     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
690         LOGD("No need create device table in COLLABORATION mode.");
691         return E_OK;
692     }
693 
694     int errCode = E_OK;
695     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
696     if (handle == nullptr) {
697         return errCode;
698     }
699 
700     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
701     if (errCode != E_OK) {
702         LOGE("Start transaction failed:%d", errCode);
703         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
704         ReleaseHandle(handle);
705         return errCode;
706     }
707 
708     StoreInfo info = GetStoreInfo();
709     for (const auto &[table, strategy] : syncStrategy) {
710         if (!strategy.permitSync) {
711             continue;
712         }
713 
714         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
715         if (errCode != E_OK) {
716             LOGE("Create distributed device table failed. %d", errCode);
717             break;
718         }
719     }
720 
721     if (errCode == E_OK) {
722         errCode = handle->Commit();
723     } else {
724         (void)handle->Rollback();
725     }
726 
727     ReleaseHandle(handle);
728     return errCode;
729 }
730 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)731 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
732 {
733     std::lock_guard lock(onSchemaChangedMutex_);
734     onSchemaChanged_ = callback;
735     return E_OK;
736 }
737 
NotifySchemaChanged()738 void RelationalSyncAbleStorage::NotifySchemaChanged()
739 {
740     std::lock_guard lock(onSchemaChangedMutex_);
741     if (onSchemaChanged_) {
742         LOGD("Notify relational schema was changed");
743         onSchemaChanged_();
744     }
745 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const746 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
747 {
748     algorithmSet.clear();
749     DataCompression::GetCompressionAlgo(algorithmSet);
750     return E_OK;
751 }
752 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)753 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
754     const RelationalObserverAction &action)
755 {
756     ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
757     ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
758     auto it = dataChangeCallbackMap_.find(connectionId);
759     if (it != dataChangeCallbackMap_.end()) {
760         if (it->second.find(observer) != it->second.end()) {
761             LOGE("obsever already registered");
762             return -E_ALREADY_SET;
763         }
764         if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
765             LOGE("The number of relational observers has been over limit");
766             return -E_MAX_LIMITS;
767         }
768         it->second[observer] = action;
769     } else {
770         dataChangeCallbackMap_[connectionId][observer] = action;
771     }
772     LOGI("register relational observer ok");
773     return E_OK;
774 }
775 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)776 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
777 {
778     if (observer == nullptr) {
779         EraseDataChangeCallback(connectionId);
780         return E_OK;
781     }
782     ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
783     ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
784     auto it = dataChangeCallbackMap_.find(connectionId);
785     if (it != dataChangeCallbackMap_.end()) {
786         auto action = it->second.find(observer);
787         if (action != it->second.end()) {
788             it->second.erase(action);
789             LOGI("unregister relational observer.");
790             if (it->second.empty()) {
791                 dataChangeCallbackMap_.erase(it);
792                 LOGI("observer for this delegate is zero now");
793             }
794             return E_OK;
795         }
796     }
797     return -E_NOT_FOUND;
798 }
799 
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,Origin origin)800 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
801     const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
802     const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin)
803 {
804     for (auto &action : item.second) {
805         if (action.second == nullptr) {
806             continue;
807         }
808         ChangedData observerChangeData = changedData;
809         if (action.first != nullptr) {
810             FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
811         }
812         action.second(deviceName, std::move(observerChangeData), isChangedData, origin);
813     }
814 }
815 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)816 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
817     ChangedData &&changedData, bool isChangedData)
818 {
819     TriggerObserverAction(deviceName, std::move(changedData), isChangedData, Origin::ORIGIN_CLOUD);
820 }
821 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData,Origin origin)822 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
823     bool isChangedData, Origin origin)
824 {
825     IncObjRef(this);
826     int taskErrCode =
827         ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData, origin] () mutable {
828             LOGD("begin to trigger relational observer.");
829             ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
830             ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
831             for (const auto &item : dataChangeCallbackMap_) {
832                 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, origin);
833             }
834             DecObjRef(this);
835         }, &dataChangeCallbackMap_);
836     if (taskErrCode != E_OK) {
837         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
838         DecObjRef(this);
839     }
840 }
841 
RegisterHeartBeatListener(const std::function<void ()> & listener)842 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
843 {
844     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
845     heartBeatListener_ = listener;
846 }
847 
CheckAndInitQueryCondition(QueryObject & query) const848 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
849 {
850     RelationalSchemaObject schema = storageEngine_->GetSchema();
851     TableInfo table = schema.GetTable(query.GetTableName());
852     if (!table.IsValid()) {
853         LOGE("Query table is not a distributed table.");
854         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
855     }
856     if (table.GetTableSyncType() == CLOUD_COOPERATION) {
857         LOGE("cloud table mode is not support");
858         return -E_NOT_SUPPORT;
859     }
860     query.SetSchema(schema);
861 
862     int errCode = E_OK;
863     auto *handle = GetHandle(true, errCode);
864     if (handle == nullptr) {
865         return errCode;
866     }
867 
868     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
869     if (errCode != E_OK) {
870         LOGE("Check relational query condition failed. %d", errCode);
871         TriggerCloseAutoLaunchConn(storageEngine_->GetRelationalProperties());
872     }
873 
874     ReleaseHandle(handle);
875     return errCode;
876 }
877 
CheckCompatible(const std::string & schema,uint8_t type) const878 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
879 {
880     // return true if is relational schema.
881     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
882 }
883 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const884 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
885     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
886 {
887     if (!storageEngine_->GetSchema().IsSchemaValid()) {
888         return -E_NOT_SUPPORT;
889     }
890     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
891         LOGE("[ExecuteQuery] invalid args");
892         return -E_INVALID_ARGS;
893     }
894     int errCode = E_OK;
895     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
896     if (handle == nullptr) {
897         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
898         return errCode;
899     }
900     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
901     if (errCode != E_OK) {
902         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
903     }
904     ReleaseHandle(handle);
905     return errCode;
906 }
907 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const908 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
909     RelationalRowDataSet &dataSet, ContinueToken &token) const
910 {
911     dataSet.Clear();
912     if (token == nullptr) {
913         // start query
914         std::vector<std::string> colNames;
915         std::vector<RelationalRowData *> data;
916         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
917 
918         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
919         if (errCode != E_OK) {
920             return errCode;
921         }
922 
923         // create one token
924         token = static_cast<ContinueToken>(
925             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
926         if (token == nullptr) {
927             LOGE("ExecuteQuery OOM");
928             return -E_OUT_OF_MEMORY;
929         }
930     }
931 
932     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
933     if (!remoteToken->CheckValid()) {
934         LOGE("ExecuteQuery invalid token");
935         return -E_INVALID_ARGS;
936     }
937 
938     int errCode = remoteToken->GetData(packetSize, dataSet);
939     if (errCode == -E_UNFINISHED) {
940         errCode = E_OK;
941     } else {
942         if (errCode != E_OK) {
943             dataSet.Clear();
944         }
945         delete remoteToken;
946         remoteToken = nullptr;
947         token = nullptr;
948     }
949     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
950     return errCode;
951 }
952 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)953 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
954     uint8_t type)
955 {
956     if (ReadSchemaType(type) != SchemaType::RELATIVE) {
957         return -E_INVALID_ARGS;
958     }
959 
960     RelationalSchemaObject schemaObj;
961     int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
962     if (errCode != E_OK) {
963         LOGE("Parse remote schema failed. err=%d", errCode);
964         return errCode;
965     }
966 
967     std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
968     Key remoteSchemaKey(keyStr.begin(), keyStr.end());
969     Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
970     errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
971     if (errCode != E_OK) {
972         LOGE("Save remote schema failed. err=%d", errCode);
973         return errCode;
974     }
975 
976     return remoteDeviceSchema_.Put(deviceId, remoteSchema);
977 }
978 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const979 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId,
980     RelationalSchemaObject &schemaObj) const
981 {
982     if (schemaObj.IsSchemaValid()) {
983         LOGE("schema is already valid");
984         return -E_INVALID_ARGS;
985     }
986 
987     std::string remoteSchema;
988     int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
989     if (errCode == -E_NOT_FOUND) {
990         LOGW("Get remote device schema miss cached.");
991         std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
992         Key remoteSchemaKey(keyStr.begin(), keyStr.end());
993         Value remoteSchemaBuff;
994         errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
995         if (errCode != E_OK) {
996             LOGE("Get remote device schema from meta failed. err=%d", errCode);
997             return errCode;
998         }
999         remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
1000         errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
1001     }
1002 
1003     if (errCode != E_OK) {
1004         LOGE("Get remote device schema failed. err=%d", errCode);
1005         return errCode;
1006     }
1007 
1008     errCode = schemaObj.ParseFromSchemaString(remoteSchema);
1009     if (errCode != E_OK) {
1010         LOGE("Parse remote schema failed. err=%d", errCode);
1011     }
1012     return errCode;
1013 }
1014 
SetReusedHandle(StorageExecutor * handle)1015 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
1016 {
1017     std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
1018     reusedHandle_ = handle;
1019 }
1020 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const1021 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
1022 {
1023     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1024     delete remoteToken;
1025     remoteToken = nullptr;
1026     token = nullptr;
1027 }
1028 
GetStoreInfo() const1029 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1030 {
1031     StoreInfo info = {
1032         storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, ""),
1033         storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, ""),
1034         storageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "")
1035     };
1036     return info;
1037 }
1038 
StartTransaction(TransactType type,bool isAsyncDownload)1039 int RelationalSyncAbleStorage::StartTransaction(TransactType type, bool isAsyncDownload)
1040 {
1041     if (isAsyncDownload) {
1042         return StartTransactionForAsyncDownload(type);
1043     }
1044     if (storageEngine_ == nullptr) {
1045         return -E_INVALID_DB;
1046     }
1047     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1048     if (transactionHandle_ != nullptr) {
1049         LOGD("Transaction started already.");
1050         return -E_TRANSACT_STATE;
1051     }
1052     int errCode = E_OK;
1053     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1054         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1055     if (handle == nullptr) {
1056         ReleaseHandle(handle);
1057         return errCode;
1058     }
1059     errCode = handle->StartTransaction(type);
1060     if (errCode != E_OK) {
1061         ReleaseHandle(handle);
1062         return errCode;
1063     }
1064     transactionHandle_ = handle;
1065     return errCode;
1066 }
1067 
Commit(bool isAsyncDownload)1068 int RelationalSyncAbleStorage::Commit(bool isAsyncDownload)
1069 {
1070     if (isAsyncDownload) {
1071         return CommitForAsyncDownload();
1072     }
1073     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1074     if (transactionHandle_ == nullptr) {
1075         LOGE("relation database is null or the transaction has not been started");
1076         return -E_INVALID_DB;
1077     }
1078     int errCode = transactionHandle_->Commit();
1079     ReleaseHandle(transactionHandle_);
1080     transactionHandle_ = nullptr;
1081     LOGD("connection commit transaction!");
1082     return errCode;
1083 }
1084 
Rollback(bool isAsyncDownload)1085 int RelationalSyncAbleStorage::Rollback(bool isAsyncDownload)
1086 {
1087     if (isAsyncDownload) {
1088         return RollbackForAsyncDownload();
1089     }
1090     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1091     if (transactionHandle_ == nullptr) {
1092         LOGE("Invalid handle for rollback or the transaction has not been started.");
1093         return -E_INVALID_DB;
1094     }
1095 
1096     int errCode = transactionHandle_->Rollback();
1097     ReleaseHandle(transactionHandle_);
1098     transactionHandle_ = nullptr;
1099     LOGI("connection rollback transaction!");
1100     return errCode;
1101 }
1102 
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1103 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1104     const std::vector<Timestamp> &timestampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1105 {
1106     int errCode = E_OK;
1107     auto *handle = GetHandleExpectTransaction(false, errCode);
1108     if (handle == nullptr) {
1109         return errCode;
1110     }
1111     QuerySyncObject queryObj = query;
1112     queryObj.SetSchema(GetSchemaInfo());
1113     errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1114     if (transactionHandle_ == nullptr) {
1115         ReleaseHandle(handle);
1116     }
1117     return errCode;
1118 }
1119 
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1120 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp,
1121     bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1122 {
1123     int errCode = E_OK;
1124     auto *handle = GetHandleExpectTransaction(false, errCode);
1125     if (handle == nullptr) {
1126         return errCode;
1127     }
1128     QuerySyncObject queryObj = query;
1129     queryObj.SetSchema(GetSchemaInfo());
1130     errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1131     if (transactionHandle_ == nullptr) {
1132         ReleaseHandle(handle);
1133     }
1134     return errCode;
1135 }
1136 
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1137 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1138     const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1139 {
1140     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1141     QuerySyncObject query = querySyncObject;
1142     query.SetSchema(GetSchemaInfo());
1143     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1144     if (token == nullptr) {
1145         LOGE("[SingleVerNStore] Allocate continue token failed.");
1146         return -E_OUT_OF_MEMORY;
1147     }
1148     token->SetCloudTableSchema(tableSchema);
1149     continueStmtToken = static_cast<ContinueToken>(token);
1150     return GetCloudDataNext(continueStmtToken, cloudDataResult);
1151 }
1152 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1153 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1154     CloudSyncData &cloudDataResult)
1155 {
1156     if (continueStmtToken == nullptr) {
1157         return -E_INVALID_ARGS;
1158     }
1159     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1160     if (!token->CheckValid()) {
1161         return -E_INVALID_ARGS;
1162     }
1163     int errCode = E_OK;
1164     auto *handle = GetHandleExpectTransaction(false, errCode);
1165     if (handle == nullptr) {
1166         LOGE("Invalid handle, release the token, %d", errCode);
1167         ReleaseCloudDataToken(continueStmtToken);
1168         return -E_INVALID_DB;
1169     }
1170     cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1171     auto config = GetCloudSyncConfig();
1172     handle->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1173     errCode = handle->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1174     LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1175         cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1176         cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1177     if (transactionHandle_ == nullptr) {
1178         ReleaseHandle(handle);
1179     }
1180     if (errCode != -E_UNFINISHED) {
1181         delete token;
1182         token = nullptr;
1183     }
1184     continueStmtToken = static_cast<ContinueToken>(token);
1185     if (errCode != E_OK && errCode != -E_UNFINISHED) {
1186         return errCode;
1187     }
1188     int fillRefGidCode = FillReferenceData(cloudDataResult);
1189     return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1190 }
1191 
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1192 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1193     bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1194 {
1195     int errCode = E_OK;
1196     auto *handle = GetHandle(false, errCode);
1197     if (handle == nullptr) {
1198         return errCode;
1199     }
1200     Timestamp beginTime = 0u;
1201     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1202     QuerySyncObject query = querySyncObject;
1203     query.SetSchema(GetSchemaInfo());
1204     handle->SetTableSchema(tableSchema);
1205     errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1206     ReleaseHandle(handle);
1207     if (errCode != E_OK) {
1208         LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1209     }
1210     return errCode;
1211 }
1212 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1213 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1214 {
1215     if (continueStmtToken == nullptr) {
1216         return E_OK;
1217     }
1218     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1219     if (!token->CheckValid()) {
1220         return E_OK;
1221     }
1222     int errCode = token->ReleaseCloudStatement();
1223     delete token;
1224     token = nullptr;
1225     return errCode;
1226 }
1227 
GetSchemaFromDB(RelationalSchemaObject & schema)1228 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1229 {
1230     Key schemaKey;
1231     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1232     Value schemaVal;
1233     int errCode = GetMetaData(schemaKey, schemaVal);
1234     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1235         LOGE("Get relational schema from DB failed. %d", errCode);
1236         return errCode;
1237     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1238         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1239         return -E_NOT_FOUND;
1240     }
1241     std::string schemaStr;
1242     DBCommon::VectorToString(schemaVal, schemaStr);
1243     errCode = schema.ParseFromSchemaString(schemaStr);
1244     if (errCode != E_OK) {
1245         LOGE("Parse schema string from DB failed.");
1246         return errCode;
1247     }
1248     storageEngine_->SetSchema(schema);
1249     return errCode;
1250 }
1251 
ChkSchema(const TableName & tableName)1252 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1253 {
1254     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1255     RelationalSchemaObject localSchema = GetSchemaInfo();
1256     int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1257     if (errCode == -E_SCHEMA_MISMATCH) {
1258         LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1259         RelationalSchemaObject newSchema;
1260         errCode = GetSchemaFromDB(newSchema);
1261         if (errCode != E_OK) {
1262             LOGE("Get schema from db when check schema. err: %d", errCode);
1263             return -E_SCHEMA_MISMATCH;
1264         }
1265         errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1266     }
1267     return errCode;
1268 }
1269 
SetCloudDbSchema(const DataBaseSchema & schema)1270 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1271 {
1272     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1273     RelationalSchemaObject localSchema = GetSchemaInfo();
1274     schemaMgr_.SetCloudDbSchema(schema, localSchema);
1275     return E_OK;
1276 }
1277 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1278 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1279     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1280 {
1281     return GetInfoByPrimaryKeyOrGid(tableName, vBucket, true, dataInfoWithLog, assetInfo);
1282 }
1283 
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1284 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1285     const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1286 {
1287     if (handle == nullptr) {
1288         return -E_INVALID_DB;
1289     }
1290     TableSchema tableSchema;
1291     int errCode = GetCloudTableSchema(tableName, tableSchema);
1292     if (errCode != E_OK) {
1293         LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1294         return errCode;
1295     }
1296     RelationalSchemaObject localSchema = GetSchemaInfo();
1297     handle->SetLocalSchema(localSchema);
1298     return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1299 }
1300 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1301 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1302 {
1303     if (transactionHandle_ == nullptr) {
1304         LOGE(" the transaction has not been started");
1305         return -E_INVALID_DB;
1306     }
1307     return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1308 }
1309 
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1310 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1311     const std::string &tableName, DownloadData &downloadData)
1312 {
1313     TableSchema tableSchema;
1314     int errCode = GetCloudTableSchema(tableName, tableSchema);
1315     if (errCode != E_OK) {
1316         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1317         return errCode;
1318     }
1319     RelationalSchemaObject localSchema = GetSchemaInfo();
1320     handle->SetLocalSchema(localSchema);
1321     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1322     handle->SetLogicDelete(IsCurrentLogicDelete());
1323     errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1324     handle->SetLogicDelete(false);
1325     return errCode;
1326 }
1327 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1328 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1329 {
1330     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1331     cloudSchema = schemaMgr_.GetCloudDbSchema();
1332     return E_OK;
1333 }
1334 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1335 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1336     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1337 {
1338     if (transactionHandle_ == nullptr) {
1339         LOGE("the transaction has not been started");
1340         return -E_INVALID_DB;
1341     }
1342     transactionHandle_->SetLogicDelete(logicDelete_);
1343     std::vector<std::string> notifyTableList;
1344     int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1345     if (!notifyTableList.empty()) {
1346         for (auto notifyTableName : notifyTableList) {
1347             ChangedData changedData;
1348             changedData.type = ChangedDataType::DATA;
1349             changedData.tableName = notifyTableName;
1350             std::vector<DistributedDB::Type> dataVec;
1351             DistributedDB::Type type;
1352             if (mode == FLAG_ONLY) {
1353                 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1354             } else {
1355                 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1356             }
1357             dataVec.push_back(type);
1358             changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1359             TriggerObserverAction("CLOUD", std::move(changedData), true);
1360         }
1361     }
1362     transactionHandle_->SetLogicDelete(false);
1363     return errCode;
1364 }
1365 
ClearCloudLogVersion(const std::vector<std::string> & tableNameList)1366 int RelationalSyncAbleStorage::ClearCloudLogVersion(const std::vector<std::string> &tableNameList)
1367 {
1368     if (transactionHandle_ == nullptr) {
1369         LOGE("[RelationalSyncAbleStorage][ClearCloudLogVersion] the transaction has not been started");
1370         return -E_INVALID_DB;
1371     }
1372     return transactionHandle_->DoClearCloudLogVersion(tableNameList);
1373 }
1374 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1375 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1376 {
1377     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1378     return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1379 }
1380 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1381 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1382     bool isDownloadSuccess)
1383 {
1384     if (storageEngine_ == nullptr) {
1385         return -E_INVALID_DB;
1386     }
1387     if (transactionHandle_ == nullptr) {
1388         LOGE("the transaction has not been started when fill asset for download.");
1389         return -E_INVALID_DB;
1390     }
1391     TableSchema tableSchema;
1392     int errCode = GetCloudTableSchema(tableName, tableSchema);
1393     if (errCode != E_OK) {
1394         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1395         return errCode;
1396     }
1397     uint64_t currCursor = DBConstant::INVALID_CURSOR;
1398     errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess, currCursor);
1399     if (errCode != E_OK) {
1400         LOGE("fill cloud asset for download failed.%d", errCode);
1401     }
1402     return errCode;
1403 }
1404 
SetLogTriggerStatus(bool status)1405 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1406 {
1407     int errCode = E_OK;
1408     auto *handle = GetHandleExpectTransaction(false, errCode);
1409     if (handle == nullptr) {
1410         return errCode;
1411     }
1412     errCode = handle->SetLogTriggerStatus(status);
1413     if (transactionHandle_ == nullptr) {
1414         ReleaseHandle(handle);
1415     }
1416     return errCode;
1417 }
1418 
SetCursorIncFlag(bool flag)1419 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1420 {
1421     int errCode = E_OK;
1422     auto *handle = GetHandleExpectTransaction(false, errCode);
1423     if (handle == nullptr) {
1424         return errCode;
1425     }
1426     errCode = handle->SetCursorIncFlag(flag);
1427     if (transactionHandle_ == nullptr) {
1428         ReleaseHandle(handle);
1429     }
1430     return errCode;
1431 }
1432 
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1433 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1434     bool ignoreEmptyGid)
1435 {
1436     if (storageEngine_ == nullptr) {
1437         return -E_INVALID_DB;
1438     }
1439     int errCode = E_OK;
1440     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1441         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1442     if (writeHandle == nullptr) {
1443         return errCode;
1444     }
1445     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1446     if (errCode != E_OK) {
1447         ReleaseHandle(writeHandle);
1448         return errCode;
1449     }
1450     errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1451     if (errCode != E_OK) {
1452         LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1453         writeHandle->Rollback();
1454         ReleaseHandle(writeHandle);
1455         return errCode;
1456     }
1457     errCode = writeHandle->Commit();
1458     ReleaseHandle(writeHandle);
1459     return errCode;
1460 }
1461 
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1462 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1463 {
1464     syncAbleEngine_ = syncAbleEngine;
1465 }
1466 
GetIdentify() const1467 std::string RelationalSyncAbleStorage::GetIdentify() const
1468 {
1469     if (storageEngine_ == nullptr) {
1470         LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1471         return "";
1472     }
1473     return storageEngine_->GetIdentifier();
1474 }
1475 
EraseDataChangeCallback(uint64_t connectionId)1476 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1477 {
1478     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1479         ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1480         ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1481         auto it = dataChangeCallbackMap_.find(connectionId);
1482         if (it != dataChangeCallbackMap_.end()) {
1483             dataChangeCallbackMap_.erase(it);
1484             LOGI("erase all observer, %" PRIu64, connectionId);
1485         }
1486     }, nullptr, &dataChangeCallbackMap_);
1487     ADAPTER_WAIT(handle);
1488 }
1489 
ReleaseContinueToken(ContinueToken & continueStmtToken) const1490 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1491 {
1492     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1493     if (token == nullptr || !(token->CheckValid())) {
1494         LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1495         return;
1496     }
1497     delete token;
1498     continueStmtToken = nullptr;
1499 }
1500 
CheckQueryValid(const QuerySyncObject & query)1501 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1502 {
1503     int errCode = E_OK;
1504     auto *handle = GetHandle(false, errCode);
1505     if (handle == nullptr) {
1506         return errCode;
1507     }
1508     errCode = handle->CheckQueryObjectLegal(query);
1509     if (errCode != E_OK) {
1510         ReleaseHandle(handle);
1511         return errCode;
1512     }
1513     QuerySyncObject queryObj = query;
1514     queryObj.SetSchema(GetSchemaInfo());
1515     int64_t count = 0;
1516     errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1517     ReleaseHandle(handle);
1518     if (errCode != E_OK) {
1519         LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1520         return -E_INVALID_ARGS;
1521     }
1522     return errCode;
1523 }
1524 
CreateTempSyncTrigger(const std::string & tableName)1525 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1526 {
1527     int errCode = E_OK;
1528     auto *handle = GetHandle(true, errCode);
1529     if (handle == nullptr) {
1530         return errCode;
1531     }
1532     errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1533     ReleaseHandle(handle);
1534     if (errCode != E_OK) {
1535         LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1536     }
1537     return errCode;
1538 }
1539 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1540 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1541     ChangeProperties &changeProperties)
1542 {
1543     int errCode = E_OK;
1544     auto *handle = GetHandle(false, errCode);
1545     if (handle == nullptr) {
1546         return errCode;
1547     }
1548     errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1549     ReleaseHandle(handle);
1550     if (errCode != E_OK) {
1551         LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1552     }
1553     return errCode;
1554 }
1555 
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1556 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1557 {
1558     if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1559         changedData.field = {};
1560         for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1561             changedData.primaryData[i].clear();
1562         }
1563     }
1564     if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1565         changedData.properties = {};
1566     }
1567 }
1568 
ClearAllTempSyncTrigger()1569 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1570 {
1571     int errCode = E_OK;
1572     auto *handle = GetHandle(true, errCode);
1573     if (handle == nullptr) {
1574         return errCode;
1575     }
1576     errCode = handle->ClearAllTempSyncTrigger();
1577     ReleaseHandle(handle);
1578     if (errCode != E_OK) {
1579         LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1580     }
1581     return errCode;
1582 }
1583 
FillReferenceData(CloudSyncData & syncData)1584 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1585 {
1586     std::map<int64_t, Entries> referenceGid;
1587     int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1588     if (errCode != E_OK) {
1589         LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1590         return errCode;
1591     }
1592     errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1593     if (errCode != E_OK) {
1594         return errCode;
1595     }
1596     referenceGid.clear();
1597     errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1598     if (errCode != E_OK) {
1599         LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1600         return errCode;
1601     }
1602     return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1603 }
1604 
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1605 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1606     const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1607 {
1608     if (referenceGid.empty()) {
1609         return E_OK;
1610     }
1611     int ignoredCount = 0;
1612     for (size_t index = 0u; index < rowid.size(); index++) {
1613         if (index >= extend.size()) {
1614             LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1615             return -E_UNEXPECTED_DATA;
1616         }
1617         int64_t rowId = rowid[index];
1618         if (referenceGid.find(rowId) == referenceGid.end()) {
1619             // current data miss match reference data, we ignored it
1620             ignoredCount++;
1621             continue;
1622         }
1623         extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1624     }
1625     if (ignoredCount != 0) {
1626         LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1627     }
1628     return E_OK;
1629 }
1630 
IsSharedTable(const std::string & tableName)1631 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1632 {
1633     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1634     return schemaMgr_.IsSharedTable(tableName);
1635 }
1636 
GetSharedTableOriginNames()1637 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1638 {
1639     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1640     return schemaMgr_.GetSharedTableOriginNames();
1641 }
1642 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1643 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1644     std::map<int64_t, Entries> &referenceGid)
1645 {
1646     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1647     int errCode = GetTableReference(tableName, tableReference);
1648     if (errCode != E_OK) {
1649         return errCode;
1650     }
1651     if (tableReference.empty()) {
1652         LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1653         return E_OK;
1654     }
1655     auto *handle = GetHandle(false, errCode);
1656     if (handle == nullptr) {
1657         return errCode;
1658     }
1659     errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1660     ReleaseHandle(handle);
1661     return errCode;
1662 }
1663 
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1664 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1665     std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1666 {
1667     if (storageEngine_ == nullptr) {
1668         LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1669         return -E_INVALID_DB;
1670     }
1671     RelationalSchemaObject schema = storageEngine_->GetSchema();
1672     auto referenceProperty = schema.GetReferenceProperty();
1673     if (referenceProperty.empty()) {
1674         return E_OK;
1675     }
1676     auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1677     if (errCode != E_OK) {
1678         return errCode;
1679     }
1680     for (const auto &property : referenceProperty) {
1681         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1682             if (!IsSharedTable(tableName)) {
1683                 reference[property.targetTableName].push_back(property);
1684                 continue;
1685             }
1686             TableReferenceProperty tableReference;
1687             tableReference.sourceTableName = tableName;
1688             tableReference.columns = property.columns;
1689             tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1690             auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1691             if (ret != E_OK) {
1692                 return ret;
1693             }
1694             tableReference.targetTableName = sharedTargetTable;
1695             reference[tableReference.targetTableName].push_back(tableReference);
1696         }
1697     }
1698     return E_OK;
1699 }
1700 
GetSourceTableName(const std::string & tableName)1701 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1702 {
1703     std::pair<std::string, int> res = { "", E_OK };
1704     std::shared_ptr<DataBaseSchema> cloudSchema;
1705     (void) GetCloudDbSchema(cloudSchema);
1706     if (cloudSchema == nullptr) {
1707         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1708         return { "", -E_INTERNAL_ERROR };
1709     }
1710     for (const auto &table : cloudSchema->tables) {
1711         if (CloudStorageUtils::IsSharedTable(table)) {
1712             continue;
1713         }
1714         if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1715             DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1716             res.first = table.name;
1717             break;
1718         }
1719     }
1720     if (res.first.empty()) {
1721         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1722         res.second = -E_SCHEMA_MISMATCH;
1723     }
1724     return res;
1725 }
1726 
GetSharedTargetTableName(const std::string & tableName)1727 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1728 {
1729     std::pair<std::string, int> res = { "", E_OK };
1730     std::shared_ptr<DataBaseSchema> cloudSchema;
1731     (void) GetCloudDbSchema(cloudSchema);
1732     if (cloudSchema == nullptr) {
1733         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1734         return { "", -E_INTERNAL_ERROR };
1735     }
1736     for (const auto &table : cloudSchema->tables) {
1737         if (CloudStorageUtils::IsSharedTable(table)) {
1738             continue;
1739         }
1740         if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1741             res.first = table.sharedTableName;
1742             break;
1743         }
1744     }
1745     if (res.first.empty()) {
1746         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1747         res.second = -E_SCHEMA_MISMATCH;
1748     }
1749     return res;
1750 }
1751 
SetLogicDelete(bool logicDelete)1752 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1753 {
1754     logicDelete_ = logicDelete;
1755     LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1756 }
1757 
IsCurrentLogicDelete() const1758 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1759 {
1760     return logicDelete_;
1761 }
1762 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1763 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1764     const std::string &gid, const Bytes &hashKey, VBucket &assets)
1765 {
1766     if (gid.empty() && hashKey.empty()) {
1767         LOGE("both gid and hashKey are empty.");
1768         return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1769     }
1770     if (transactionHandle_ == nullptr) {
1771         LOGE("the transaction has not been started");
1772         return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1773     }
1774     auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1775     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1776         LOGE("get assets by gid or hashKey failed. %d", errCode);
1777     }
1778     return { errCode, status };
1779 }
1780 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1781 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1782 {
1783     int errCode = E_OK;
1784     auto *wHandle = GetHandle(true, errCode);
1785     if (wHandle == nullptr) {
1786         return errCode;
1787     }
1788     wHandle->SetIAssetLoader(loader);
1789     ReleaseHandle(wHandle);
1790     return errCode;
1791 }
1792 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1793 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1794     const std::vector<VBucket> &records)
1795 {
1796     int errCode = E_OK;
1797     auto *handle = GetHandle(true, errCode);
1798     if (handle == nullptr || errCode != E_OK) {
1799         return errCode;
1800     }
1801     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1802     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1803     errCode = UpsertDataInner(handle, tableName, records);
1804     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1805     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1806     ReleaseHandle(handle);
1807     return errCode;
1808 }
1809 
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1810 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1811     const std::string &tableName, const std::vector<VBucket> &records)
1812 {
1813     int errCode = E_OK;
1814     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1815     if (errCode != E_OK) {
1816         LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1817         return errCode;
1818     }
1819     errCode = CreateTempSyncTriggerInner(handle, tableName);
1820     if (errCode == E_OK) {
1821         errCode = UpsertDataInTransaction(handle, tableName, records);
1822         (void) handle->ClearAllTempSyncTrigger();
1823     }
1824     if (errCode == E_OK) {
1825         errCode = handle->Commit();
1826         if (errCode != E_OK) {
1827             LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1828         }
1829     } else {
1830         int ret = handle->Rollback();
1831         if (ret != E_OK) {
1832             LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1833         }
1834     }
1835     return errCode;
1836 }
1837 
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1838 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1839     const std::string &tableName, const std::vector<VBucket> &records)
1840 {
1841     TableSchema tableSchema;
1842     int errCode = GetCloudTableSchema(tableName, tableSchema);
1843     if (errCode != E_OK) {
1844         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1845         return errCode;
1846     }
1847     TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1848     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1849     std::set<std::vector<uint8_t>> primaryKeys;
1850     DownloadData downloadData;
1851     for (const auto &record : records) {
1852         DataInfoWithLog dataInfoWithLog;
1853         VBucket assetInfo;
1854         auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1855             tableSchema, localTable, pkMap, false);
1856         if (errorCode != E_OK) {
1857             return errorCode;
1858         }
1859         errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1860         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1861             return errCode;
1862         }
1863         VBucket recordCopy = record;
1864         if ((errCode == -E_NOT_FOUND ||
1865             (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1866             primaryKeys.find(hashValue) == primaryKeys.end()) {
1867             downloadData.opType.push_back(OpType::INSERT);
1868             auto currentTime = TimeHelper::GetSysCurrentTime();
1869             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1870             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1871             primaryKeys.insert(hashValue);
1872         } else {
1873             downloadData.opType.push_back(OpType::UPDATE);
1874             recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1875             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1876             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1877             recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1878             recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1879         }
1880         downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1881         downloadData.data.push_back(std::move(recordCopy));
1882     }
1883     return PutCloudSyncDataInner(handle, tableName, downloadData);
1884 }
1885 
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1886 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1887     const LogInfo &logInfo)
1888 {
1889     if (transactionHandle_ == nullptr) {
1890         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1891         return -E_INVALID_DB;
1892     }
1893     TableSchema tableSchema;
1894     GetCloudTableSchema(tableName, tableSchema);
1895     std::vector<VBucket> assets;
1896     int errCode = transactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
1897     if (errCode != E_OK) {
1898         LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d",
1899             DBCommon::StringMiddleMasking(logInfo.cloudGid).c_str(), errCode);
1900         return errCode;
1901     }
1902     bool isInconsistency = !assets.empty();
1903     UpdateRecordFlagStruct updateRecordFlag = {
1904         .tableName = tableName,
1905         .isRecordConflict = recordConflict,
1906         .isInconsistency = isInconsistency
1907     };
1908     std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
1909     return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1910 }
1911 
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1912 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1913     OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1914 {
1915     TableSchema tableSchema;
1916     int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1917     if (errCode != E_OK) {
1918         LOGE("get table schema failed when fill log and asset. %d", errCode);
1919         return errCode;
1920     }
1921     errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1922     if (errCode != E_OK) {
1923         return errCode;
1924     }
1925     if (opType == OpType::INSERT) {
1926         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1927             handle, {data.tableName, CloudWaterType::INSERT, tableSchema}, data.insData, uploadRecorder_);
1928     } else if (opType == OpType::UPDATE) {
1929         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1930             handle, {data.tableName, CloudWaterType::UPDATE, tableSchema}, data.updData, uploadRecorder_);
1931     } else if (opType == OpType::DELETE) {
1932         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1933             handle, {data.tableName, CloudWaterType::DELETE, tableSchema}, data.delData, uploadRecorder_);
1934     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1935         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1936             handle, {data.tableName, CloudWaterType::BUTT, tableSchema}, data.lockData, uploadRecorder_, true);
1937     }
1938     return errCode;
1939 }
1940 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users,bool isQueryDownloadRecords)1941 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1942     std::vector<std::string> &users, bool isQueryDownloadRecords)
1943 {
1944     std::vector<TableSchema> tables;
1945     int errCode = GetCloudTableWithoutShared(tables);
1946     if (errCode != E_OK) {
1947         return errCode;
1948     }
1949     if (tables.empty()) {
1950         LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1951         return E_OK;
1952     }
1953     auto *handle = GetHandle(true, errCode);
1954     if (handle == nullptr || errCode != E_OK) {
1955         return errCode;
1956     }
1957     errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery, isQueryDownloadRecords);
1958     ReleaseHandle(handle);
1959     return errCode;
1960 }
1961 
ClearUnLockingNoNeedCompensated()1962 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1963 {
1964     std::vector<TableSchema> tables;
1965     int errCode = GetCloudTableWithoutShared(tables);
1966     if (errCode != E_OK) {
1967         return errCode;
1968     }
1969     if (tables.empty()) {
1970         LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1971         return E_OK;
1972     }
1973     auto *handle = GetHandle(true, errCode);
1974     if (handle == nullptr || errCode != E_OK) {
1975         return errCode;
1976     }
1977     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1978     if (errCode != E_OK) {
1979         ReleaseHandle(handle);
1980         return errCode;
1981     }
1982     for (const auto &table : tables) {
1983         errCode = handle->ClearUnLockingStatus(table.name);
1984         if (errCode != E_OK) {
1985             LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1986         }
1987     }
1988     errCode = handle->Commit();
1989     if (errCode != E_OK) {
1990         LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1991     }
1992     ReleaseHandle(handle);
1993     return errCode;
1994 }
1995 
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1996 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1997 {
1998     const auto tableInfos = GetSchemaInfo().GetTables();
1999     for (const auto &[tableName, info] : tableInfos) {
2000         if (info.GetSharedTableMark()) {
2001             continue;
2002         }
2003         TableSchema schema;
2004         int errCode = GetCloudTableSchema(tableName, schema);
2005         if (errCode == -E_NOT_FOUND) {
2006             continue;
2007         }
2008         if (errCode != E_OK) {
2009             LOGW("[RDBStorage] Get cloud table failed %d", errCode);
2010             return errCode;
2011         }
2012         tables.push_back(schema);
2013     }
2014     return E_OK;
2015 }
2016 
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery,bool isQueryDownloadRecords)2017 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2018     const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords)
2019 {
2020     int errCode = E_OK;
2021     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2022     if (errCode != E_OK) {
2023         return errCode;
2024     }
2025     for (const auto &table : tables) {
2026         if (!CheckTableSupportCompensatedSync(table)) {
2027             continue;
2028         }
2029 
2030         std::vector<VBucket> syncDataPk;
2031         errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk, isQueryDownloadRecords);
2032         if (errCode != E_OK) {
2033             LOGW("[RDBStorageEngine] Get wait compensated sync data failed, continue! errCode=%d", errCode);
2034             errCode = E_OK;
2035             continue;
2036         }
2037         if (syncDataPk.empty()) {
2038             // no data need to compensated sync
2039             continue;
2040         }
2041         errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncQuery);
2042         if (errCode != E_OK) {
2043             LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2044             errCode = E_OK;
2045             continue;
2046         }
2047     }
2048     if (errCode == E_OK) {
2049         errCode = handle->Commit();
2050         if (errCode != E_OK) {
2051             LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2052         }
2053     } else {
2054         int ret = handle->Rollback();
2055         if (ret != E_OK) {
2056             LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2057         }
2058     }
2059     return errCode;
2060 }
2061 
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2062 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2063     const std::string &tableName, bool flag)
2064 {
2065     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2066     if (trackerTable.IsEmpty()) {
2067         trackerTable.SetTableName(tableName);
2068     }
2069     return handle->CreateTempSyncTrigger(trackerTable, flag);
2070 }
2071 
CheckTableSupportCompensatedSync(const TableSchema & table)2072 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2073 {
2074     auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2075         return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2076             field.type == TYPE_INDEX<Bytes>);
2077     });
2078     if (it != table.fields.end()) {
2079         LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2080         return false;
2081     }
2082     // check whether reference exist
2083     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2084     int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2085     if (errCode != E_OK) {
2086         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2087         return false;
2088     }
2089     if (!tableReference.empty()) {
2090         LOGI("[RDBStorageEngine] current table exist reference property");
2091         return false;
2092     }
2093     return true;
2094 }
2095 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2096 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2097     const std::set<std::string> &gidFilters)
2098 {
2099     if (transactionHandle_ == nullptr) {
2100         LOGE("the transaction has not been started");
2101         return -E_INVALID_DB;
2102     }
2103     int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2104     if (errCode != E_OK) {
2105         LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2106     }
2107     return errCode;
2108 }
2109 
GetCloudSyncConfig() const2110 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2111 {
2112     std::lock_guard<std::mutex> autoLock(configMutex_);
2113     return cloudSyncConfig_;
2114 }
2115 
SetCloudSyncConfig(const CloudSyncConfig & config)2116 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2117 {
2118     std::lock_guard<std::mutex> autoLock(configMutex_);
2119     cloudSyncConfig_ = config;
2120     LOGI("[RelationalSyncAbleStorage] SetCloudSyncConfig value:[%" PRId32 ", %" PRId32 ", %" PRId32 ", %d]",
2121         cloudSyncConfig_.maxUploadCount, cloudSyncConfig_.maxUploadSize,
2122         cloudSyncConfig_.maxRetryConflictTimes, cloudSyncConfig_.isSupportEncrypt);
2123 }
2124 
IsTableExistReference(const std::string & table)2125 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2126 {
2127     // check whether reference exist
2128     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2129     int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2130     if (errCode != E_OK) {
2131         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2132         return false;
2133     }
2134     return !tableReference.empty();
2135 }
2136 
IsTableExistReferenceOrReferenceBy(const std::string & table)2137 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2138 {
2139     // check whether reference or reference by exist
2140     if (storageEngine_ == nullptr) {
2141         LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2142         return false;
2143     }
2144     RelationalSchemaObject schema = storageEngine_->GetSchema();
2145     auto referenceProperty = schema.GetReferenceProperty();
2146     if (referenceProperty.empty()) {
2147         return false;
2148     }
2149     auto [sourceTableName, errCode] = GetSourceTableName(table);
2150     if (errCode != E_OK) {
2151         return false;
2152     }
2153     for (const auto &property : referenceProperty) {
2154         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2155             DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2156             return true;
2157         }
2158     }
2159     return false;
2160 }
2161 
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)2162 int RelationalSyncAbleStorage::ReviseLocalModTime(const std::string &tableName,
2163     const std::vector<ReviseModTimeInfo> &revisedData)
2164 {
2165     if (storageEngine_ == nullptr) {
2166         LOGE("[ReviseLocalModTime] Storage is null");
2167         return -E_INVALID_DB;
2168     }
2169     int errCode = E_OK;
2170     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
2171             storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
2172     if (writeHandle == nullptr) {
2173         LOGE("[ReviseLocalModTime] Get write handle fail: %d", errCode);
2174         return errCode;
2175     }
2176     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
2177     if (errCode != E_OK) {
2178         LOGE("[ReviseLocalModTime] Start Transaction fail: %d", errCode);
2179         ReleaseHandle(writeHandle);
2180         return errCode;
2181     }
2182     errCode = writeHandle->ReviseLocalModTime(tableName, revisedData);
2183     if (errCode != E_OK) {
2184         LOGE("[ReviseLocalModTime] Revise local modify time fail: %d", errCode);
2185         writeHandle->Rollback();
2186         ReleaseHandle(writeHandle);
2187         return errCode;
2188     }
2189     errCode = writeHandle->Commit();
2190     ReleaseHandle(writeHandle);
2191     return errCode;
2192 }
2193 
GetCursor(const std::string & tableName,uint64_t & cursor)2194 int RelationalSyncAbleStorage::GetCursor(const std::string &tableName, uint64_t &cursor)
2195 {
2196     if (transactionHandle_ == nullptr) {
2197         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
2198         return -E_INVALID_DB;
2199     }
2200     return transactionHandle_->GetCursor(tableName, cursor);
2201 }
2202 
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)2203 int RelationalSyncAbleStorage::GetLocalDataCount(const std::string &tableName, int &dataCount,
2204     int &logicDeleteDataCount)
2205 {
2206     int errCode = E_OK;
2207     auto *handle = GetHandle(false, errCode);
2208     if (handle == nullptr || errCode != E_OK) {
2209         LOGE("[RelationalSyncAbleStorage] Get handle failed when get local data count: %d", errCode);
2210         return errCode;
2211     }
2212     errCode = handle->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
2213     ReleaseHandle(handle);
2214     return errCode;
2215 }
2216 }
2217 #endif
2218