• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34 
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41         std::string(CLOSE_CONN_TASK),
42         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43     );
44 }
45 }
46 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48     : storageEngine_(std::move(engine)),
49       reusedHandle_(nullptr),
50       isCachedOption_(false)
51 {}
52 
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55     syncAbleEngine_ = nullptr;
56 }
57 
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61     return SYNC_RELATION;
62 }
63 
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67     LOGD("RelationalSyncAbleStorage ref +1");
68     IncObjRef(this);
69 }
70 
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74     LOGD("RelationalSyncAbleStorage ref -1");
75     DecObjRef(this);
76 }
77 
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81     std::string identifier = storageEngine_->GetIdentifier();
82     return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84 
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87     std::string identifier = storageEngine_->GetProperties().GetStringProp(
88         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90     return identifierVect;
91 }
92 
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
95 {
96     int errCode = E_OK;
97     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98     if (handle == nullptr) {
99         return;
100     }
101     timestamp = 0;
102     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103     if (errCode != E_OK) {
104         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
106     }
107     ReleaseHandle(handle);
108     return;
109 }
110 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const111 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
112 {
113     int errCode = E_OK;
114     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
115     if (handle == nullptr) {
116         return errCode;
117     }
118     timestamp = 0;
119     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
120     if (errCode != E_OK) {
121         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
122         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
123     }
124     ReleaseHandle(handle);
125     return errCode;
126 }
127 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const128 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
129     OperatePerm perm) const
130 {
131     if (storageEngine_ == nullptr) {
132         errCode = -E_INVALID_DB;
133         return nullptr;
134     }
135     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
136         storageEngine_->FindExecutor(isWrite, perm, errCode));
137     if (handle == nullptr) {
138         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
139     }
140     return handle;
141 }
142 
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const143 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
144     int &errCode, OperatePerm perm) const
145 {
146     if (storageEngine_ == nullptr) {
147         errCode = -E_INVALID_DB;
148         return nullptr;
149     }
150     if (transactionHandle_ != nullptr) {
151         return transactionHandle_;
152     }
153     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
154         storageEngine_->FindExecutor(isWrite, perm, errCode));
155     if (errCode != E_OK) {
156         ReleaseHandle(handle);
157         handle = nullptr;
158     }
159     return handle;
160 }
161 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const162 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
163 {
164     if (storageEngine_ == nullptr) {
165         return;
166     }
167     StorageExecutor *databaseHandle = handle;
168     storageEngine_->Recycle(databaseHandle);
169     std::function<void()> listener = nullptr;
170     {
171         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
172         listener = heartBeatListener_;
173     }
174     if (listener) {
175         listener();
176     }
177 }
178 
179 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const180 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
181 {
182     if (storageEngine_ == nullptr) {
183         return -E_INVALID_DB;
184     }
185     if (key.size() > DBConstant::MAX_KEY_SIZE) {
186         return -E_INVALID_ARGS;
187     }
188     int errCode = E_OK;
189     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
190     if (handle == nullptr) {
191         return errCode;
192     }
193     errCode = handle->GetKvData(key, value);
194     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
195         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
196     }
197     ReleaseHandle(handle);
198     return errCode;
199 }
200 
201 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)202 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
203 {
204     if (storageEngine_ == nullptr) {
205         return -E_INVALID_DB;
206     }
207     int errCode = E_OK;
208     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
209     if (handle == nullptr) {
210         return errCode;
211     }
212 
213     errCode = handle->PutKvData(key, value); // meta doesn't need time.
214     if (errCode != E_OK) {
215         LOGE("Put kv data err:%d", errCode);
216         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217     }
218     ReleaseHandle(handle);
219     return errCode;
220 }
221 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)222 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
223 {
224     if (storageEngine_ == nullptr) {
225         return -E_INVALID_DB;
226     }
227     int errCode = E_OK;
228     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
229     std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
230 
231     // try to recycle using the handle
232     if (isInTransaction) {
233         handLock.lock();
234         if (reusedHandle_ != nullptr) {
235             handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
236         } else {
237             isInTransaction = false;
238             handLock.unlock();
239         }
240     }
241 
242     if (handle == nullptr) {
243         handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
244         if (handle == nullptr) {
245             return errCode;
246         }
247     }
248 
249     errCode = handle->PutKvData(key, value);
250     if (errCode != E_OK) {
251         LOGE("Put kv data err:%d", errCode);
252         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
253     }
254     if (!isInTransaction) {
255         ReleaseHandle(handle);
256     }
257     return errCode;
258 }
259 
260 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)261 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
262 {
263     if (storageEngine_ == nullptr) {
264         return -E_INVALID_DB;
265     }
266     for (const auto &key : keys) {
267         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
268             return -E_INVALID_ARGS;
269         }
270     }
271     int errCode = E_OK;
272     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
273     if (handle == nullptr) {
274         return errCode;
275     }
276 
277     handle->StartTransaction(TransactType::IMMEDIATE);
278     errCode = handle->DeleteMetaData(keys);
279     if (errCode != E_OK) {
280         handle->Rollback();
281         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
282         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283     } else {
284         handle->Commit();
285     }
286     ReleaseHandle(handle);
287     return errCode;
288 }
289 
290 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const291 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
292 {
293     if (storageEngine_ == nullptr) {
294         return -E_INVALID_DB;
295     }
296     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
297         return -E_INVALID_ARGS;
298     }
299 
300     int errCode = E_OK;
301     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
302     if (handle == nullptr) {
303         return errCode;
304     }
305 
306     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
307     if (errCode != E_OK) {
308         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
309         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
310     }
311     ReleaseHandle(handle);
312     return errCode;
313 }
314 
315 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const316 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
317 {
318     if (storageEngine_ == nullptr) {
319         return -E_INVALID_DB;
320     }
321     int errCode = E_OK;
322     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
323     if (handle == nullptr) {
324         return errCode;
325     }
326 
327     errCode = handle->GetAllMetaKeys(keys);
328     if (errCode != E_OK) {
329         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
330     }
331     ReleaseHandle(handle);
332     return errCode;
333 }
334 
GetDbProperties() const335 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
336 {
337     return storageEngine_->GetProperties();
338 }
339 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)340 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
341 {
342     int errCode = E_OK;
343     for (auto &item : dataItems) {
344         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
345         if (entry == nullptr) {
346             errCode = -E_OUT_OF_MEMORY;
347             LOGE("GetKvEntries failed, errCode:%d", errCode);
348             SingleVerKvEntry::Release(entries);
349             break;
350         }
351         entry->SetEntryData(std::move(item));
352         entries.push_back(entry);
353     }
354     return errCode;
355 }
356 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)357 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
358 {
359     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
360     // the size would not be very large.
361     static const size_t maxOrigDevLength = 40;
362     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
363     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
364                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
365     return dataSize;
366 }
367 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)368 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
369     size_t appendLen)
370 {
371     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
372     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
373         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
374         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
375     }
376     return !reachThreshold;
377 }
378 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)379 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
380     SQLiteSingleVerRelationalContinueToken *&token)
381 {
382     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
383         delete token;
384         token = nullptr;
385         return;
386     }
387 
388     if (dataItems.empty()) {
389         errCode = -E_INTERNAL_ERROR;
390         LOGE("Get data unfinished but data items is empty.");
391         delete token;
392         token = nullptr;
393         return;
394     }
395     token->SetNextBeginTime(dataItems.back());
396     token->UpdateNextSyncOffset(dataItems.size());
397 }
398 
399 /**
400  * Caller must ensure that parameter token is valid.
401  * If error happened, token will be deleted here.
402  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo,RelationalSchemaObject && filterSchema) const403 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
404     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo,
405     RelationalSchemaObject &&filterSchema) const
406 {
407     if (storageEngine_ == nullptr) {
408         return -E_INVALID_DB;
409     }
410 
411     int errCode = E_OK;
412     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
413         OperatePerm::NORMAL_PERM, errCode));
414     if (handle == nullptr) {
415         goto ERROR;
416     }
417     handle->SetLocalSchema(filterSchema);
418     do {
419         errCode = handle->GetSyncDataByQuery(dataItems,
420             Parcel::GetAppendedLen(),
421             dataSizeInfo,
422             [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
423                 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
424             }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
425         if (errCode == -E_FINISHED) {
426             token->FinishGetData();
427             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
428         }
429     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
430 
431 ERROR:
432     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
433         dataItems.clear();
434     }
435     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
436     ReleaseHandle(handle);
437     return errCode;
438 }
439 
440 // use kv struct data to sync
441 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const442 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
443     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
444     std::vector<SingleVerKvEntry *> &entries) const
445 {
446     if (!timeRange.IsValid()) {
447         return -E_INVALID_ARGS;
448     }
449     query.SetSchema(storageEngine_->GetSchema());
450     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
451     if (token == nullptr) {
452         LOGE("[SingleVerNStore] Allocate continue token failed.");
453         return -E_OUT_OF_MEMORY;
454     }
455 
456     continueStmtToken = static_cast<ContinueToken>(token);
457     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
458 }
459 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const460 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
461     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
462 {
463     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
464     if (token == nullptr) {
465         LOGE("[SingleVerNStore] Allocate continue stmt token failed.");
466         return -E_OUT_OF_MEMORY;
467     }
468     if (!token->CheckValid()) {
469         return -E_INVALID_ARGS;
470     }
471     RelationalSchemaObject schema = storageEngine_->GetSchema();
472     RelationalSchemaObject filterSchema;
473     if (token->IsUseLocalSchema()) {
474         filterSchema = schema;
475     } else {
476         int errCode = GetRemoteDeviceSchema(token->GetRemoteDev(), filterSchema);
477         if (errCode != E_OK) {
478             return errCode;
479         }
480     }
481     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
482     std::vector<std::string> fieldNames;
483     fieldNames.reserve(fieldInfos.size());
484     for (const auto &fieldInfo : fieldInfos) { // order by cid
485         fieldNames.push_back(fieldInfo.GetFieldName());
486     }
487     token->SetFieldNames(fieldNames);
488 
489     std::vector<DataItem> dataItems;
490     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo, std::move(filterSchema));
491     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
492         continueStmtToken = static_cast<ContinueToken>(token);
493         return errCode;
494     }
495 
496     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
497     if (innerCode != E_OK) {
498         errCode = innerCode;
499         delete token;
500         token = nullptr;
501     }
502     continueStmtToken = static_cast<ContinueToken>(token);
503     return errCode;
504 }
505 
506 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)507 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
508 {
509     std::vector<DataItem> dataItems;
510     for (const auto &itemEntry : entries) {
511         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
512         if (entry != nullptr) {
513             DataItem item;
514             item.origDev = entry->GetOrigDevice();
515             item.flag = entry->GetFlag();
516             item.timestamp = entry->GetTimestamp();
517             item.writeTimestamp = entry->GetWriteTimestamp();
518             entry->GetKey(item.key);
519             entry->GetValue(item.value);
520             entry->GetHashKey(item.hashKey);
521             dataItems.push_back(item);
522         }
523     }
524     return dataItems;
525 }
526 }
527 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)528 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
529     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
530 {
531     std::vector<DataItem> dataItems = ConvertEntries(entries);
532     return PutSyncData(object, dataItems, deviceName);
533 }
534 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)535 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
536     const std::string &deviceName)
537 {
538     int errCode = E_OK;
539     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
540     QueryObject query = object;
541     auto localSchema = storageEngine_->GetSchema();
542     query.SetSchema(localSchema);
543 
544     RelationalSchemaObject filterSchema;
545     errCode = GetRemoteDeviceSchema(deviceName, filterSchema);
546     if (errCode != E_OK) {
547         LOGE("Find remote schema failed. err=%d", errCode);
548         return errCode;
549     }
550     if (query.IsUseLocalSchema()) {
551         // remote send always with its table col sort
552         filterSchema.SetDistributedSchema(localSchema.GetDistributedSchema());
553     }
554 
555     StoreInfo info = GetStoreInfo();
556     auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
557         filterSchema.GetSyncFieldInfo(query.GetTableName()), info);
558     inserter.SetEntries(dataItems);
559 
560     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
561     if (handle == nullptr) {
562         return errCode;
563     }
564 
565     // To prevent certain abnormal scenarios from deleting the table,
566     // check if the table exists before each synchronization.
567     // If the table does not exist, create it.
568     // Because it is a fallback scenario, if the table creation fails, no failure will be returned
569     if (localSchema.GetTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
570         errCode = handle->CreateDistributedDeviceTable(deviceName,
571             storageEngine_->GetSchema().GetTable(query.GetTableName()), info);
572         if (errCode != E_OK) {
573             LOGW("[RelationalSyncAbleStorage::SaveSyncDataItems] Create distributed device table fail %d", errCode);
574         }
575     }
576     DBDfxAdapter::StartTracing();
577 
578     handle->SetTableMode(localSchema.GetTableMode());
579     errCode = handle->SaveSyncItems(inserter);
580     ChangedData data = inserter.GetChangedData();
581     data.properties.isP2pSyncDataChange = !dataItems.empty();
582     bool emptyChangedData = data.field.empty() && data.primaryData[OP_INSERT].empty() &&
583         data.primaryData[OP_UPDATE].empty() && data.primaryData[OP_DELETE].empty();
584 
585     DBDfxAdapter::FinishTracing();
586     if (errCode == E_OK && !emptyChangedData) {
587         // dataItems size > 0 now because already check before
588         // all dataItems will write into db now, so need to observer notify here
589         // if some dataItems will not write into db in the future, observer notify here need change
590         data.tableName = query.GetTableName();
591         // SPLIT_BY_DEVICE trigger observer with device, userId, appId and storeId, so trigger with isChangeData false
592         // COLLABORATION   trigger observer with changeData, so trigger with isChangeData true
593         TriggerObserverAction(deviceName, std::move(data),
594             GetDbProperties().GetDistributedTableMode() == DistributedTableMode::COLLABORATION, Origin::ORIGIN_REMOTE);
595     }
596 
597     ReleaseHandle(handle);
598     return errCode;
599 }
600 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)601 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
602     const std::string &deviceName)
603 {
604     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
605         LOGW("Device length is invalid for sync put");
606         return -E_INVALID_ARGS;
607     }
608 
609     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
610     if (errCode != E_OK) {
611         LOGE("[Relational] PutSyncData errCode:%d", errCode);
612         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
613     }
614     return errCode;
615 }
616 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)617 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
618 {
619     (void) deviceName;
620     (void) isNeedNotify;
621     return -E_NOT_SUPPORT;
622 }
623 
GetSchemaInfo() const624 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
625 {
626     return storageEngine_->GetSchema();
627 }
628 
GetSecurityOption(SecurityOption & option) const629 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
630 {
631     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
632     if (isCachedOption_) {
633         option = securityOption_;
634         return E_OK;
635     }
636     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
637     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
638     if (errCode == E_OK) {
639         option = securityOption_;
640         isCachedOption_ = true;
641     }
642     return errCode;
643 }
644 
NotifyRemotePushFinished(const std::string & deviceId) const645 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
646 {
647     return;
648 }
649 
650 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const651 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
652 {
653     return OS::GetCurrentSysTimeInMicrosecond(outTime);
654 }
655 
GetTablesQuery()656 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
657 {
658     auto tableNames = storageEngine_->GetSchema().GetTableNames();
659     std::vector<QuerySyncObject> queries;
660     queries.reserve(tableNames.size());
661     for (const auto &it : tableNames) {
662         queries.emplace_back(Query::Select(it));
663     }
664     return queries;
665 }
666 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)667 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
668 {
669     (void) queryObj;
670     return -E_NOT_SUPPORT;
671 }
672 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const673 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
674     const std::string &targetID, bool isPush) const
675 {
676     return E_OK;
677 }
678 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)679 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
680     const RelationalSyncStrategy &syncStrategy)
681 {
682     auto mode = storageEngine_->GetProperties().GetDistributedTableMode();
683     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
684         LOGD("No need create device table in COLLABORATION mode.");
685         return E_OK;
686     }
687 
688     int errCode = E_OK;
689     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
690     if (handle == nullptr) {
691         return errCode;
692     }
693 
694     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
695     if (errCode != E_OK) {
696         LOGE("Start transaction failed:%d", errCode);
697         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
698         ReleaseHandle(handle);
699         return errCode;
700     }
701 
702     StoreInfo info = GetStoreInfo();
703     for (const auto &[table, strategy] : syncStrategy) {
704         if (!strategy.permitSync) {
705             continue;
706         }
707 
708         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
709         if (errCode != E_OK) {
710             LOGE("Create distributed device table failed. %d", errCode);
711             break;
712         }
713     }
714 
715     if (errCode == E_OK) {
716         errCode = handle->Commit();
717     } else {
718         (void)handle->Rollback();
719     }
720 
721     ReleaseHandle(handle);
722     return errCode;
723 }
724 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)725 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
726 {
727     std::lock_guard lock(onSchemaChangedMutex_);
728     onSchemaChanged_ = callback;
729     return E_OK;
730 }
731 
NotifySchemaChanged()732 void RelationalSyncAbleStorage::NotifySchemaChanged()
733 {
734     std::lock_guard lock(onSchemaChangedMutex_);
735     if (onSchemaChanged_) {
736         LOGD("Notify relational schema was changed");
737         onSchemaChanged_();
738     }
739 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const740 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
741 {
742     algorithmSet.clear();
743     DataCompression::GetCompressionAlgo(algorithmSet);
744     return E_OK;
745 }
746 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)747 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
748     const RelationalObserverAction &action)
749 {
750     ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
751     ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
752     auto it = dataChangeCallbackMap_.find(connectionId);
753     if (it != dataChangeCallbackMap_.end()) {
754         if (it->second.find(observer) != it->second.end()) {
755             LOGE("obsever already registered");
756             return -E_ALREADY_SET;
757         }
758         if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
759             LOGE("The number of relational observers has been over limit");
760             return -E_MAX_LIMITS;
761         }
762         it->second[observer] = action;
763     } else {
764         dataChangeCallbackMap_[connectionId][observer] = action;
765     }
766     LOGI("register relational observer ok");
767     return E_OK;
768 }
769 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)770 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
771 {
772     if (observer == nullptr) {
773         EraseDataChangeCallback(connectionId);
774         return E_OK;
775     }
776     ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
777     ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
778     auto it = dataChangeCallbackMap_.find(connectionId);
779     if (it != dataChangeCallbackMap_.end()) {
780         auto action = it->second.find(observer);
781         if (action != it->second.end()) {
782             it->second.erase(action);
783             LOGI("unregister relational observer.");
784             if (it->second.empty()) {
785                 dataChangeCallbackMap_.erase(it);
786                 LOGI("observer for this delegate is zero now");
787             }
788             return E_OK;
789         }
790     }
791     return -E_NOT_FOUND;
792 }
793 
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,Origin origin)794 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
795     const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
796     const std::string &deviceName, const ChangedData &changedData, bool isChangedData, Origin origin)
797 {
798     for (auto &action : item.second) {
799         if (action.second == nullptr) {
800             continue;
801         }
802         ChangedData observerChangeData = changedData;
803         if (action.first != nullptr) {
804             FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
805         }
806         action.second(deviceName, std::move(observerChangeData), isChangedData, origin);
807     }
808 }
809 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)810 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
811     ChangedData &&changedData, bool isChangedData)
812 {
813     TriggerObserverAction(deviceName, std::move(changedData), isChangedData, Origin::ORIGIN_CLOUD);
814 }
815 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData,Origin origin)816 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName, ChangedData &&changedData,
817     bool isChangedData, Origin origin)
818 {
819     IncObjRef(this);
820     int taskErrCode =
821         ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData, origin] () mutable {
822             LOGD("begin to trigger relational observer.");
823             ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
824             ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
825             for (const auto &item : dataChangeCallbackMap_) {
826                 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, origin);
827             }
828             DecObjRef(this);
829         }, &dataChangeCallbackMap_);
830     if (taskErrCode != E_OK) {
831         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
832         DecObjRef(this);
833     }
834 }
835 
RegisterHeartBeatListener(const std::function<void ()> & listener)836 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
837 {
838     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
839     heartBeatListener_ = listener;
840 }
841 
CheckAndInitQueryCondition(QueryObject & query) const842 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
843 {
844     RelationalSchemaObject schema = storageEngine_->GetSchema();
845     TableInfo table = schema.GetTable(query.GetTableName());
846     if (!table.IsValid()) {
847         LOGE("Query table is not a distributed table.");
848         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
849     }
850     if (table.GetTableSyncType() == CLOUD_COOPERATION) {
851         LOGE("cloud table mode is not support");
852         return -E_NOT_SUPPORT;
853     }
854     query.SetSchema(schema);
855 
856     int errCode = E_OK;
857     auto *handle = GetHandle(true, errCode);
858     if (handle == nullptr) {
859         return errCode;
860     }
861 
862     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
863     if (errCode != E_OK) {
864         LOGE("Check relational query condition failed. %d", errCode);
865         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
866     }
867 
868     ReleaseHandle(handle);
869     return errCode;
870 }
871 
CheckCompatible(const std::string & schema,uint8_t type) const872 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
873 {
874     // return true if is relational schema.
875     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
876 }
877 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const878 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
879     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
880 {
881     if (!storageEngine_->GetSchema().IsSchemaValid()) {
882         return -E_NOT_SUPPORT;
883     }
884     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
885         LOGE("[ExecuteQuery] invalid args");
886         return -E_INVALID_ARGS;
887     }
888     int errCode = E_OK;
889     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
890     if (handle == nullptr) {
891         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
892         return errCode;
893     }
894     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
895     if (errCode != E_OK) {
896         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
897     }
898     ReleaseHandle(handle);
899     return errCode;
900 }
901 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const902 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
903     RelationalRowDataSet &dataSet, ContinueToken &token) const
904 {
905     dataSet.Clear();
906     if (token == nullptr) {
907         // start query
908         std::vector<std::string> colNames;
909         std::vector<RelationalRowData *> data;
910         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
911 
912         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
913         if (errCode != E_OK) {
914             return errCode;
915         }
916 
917         // create one token
918         token = static_cast<ContinueToken>(
919             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
920         if (token == nullptr) {
921             LOGE("ExecuteQuery OOM");
922             return -E_OUT_OF_MEMORY;
923         }
924     }
925 
926     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
927     if (!remoteToken->CheckValid()) {
928         LOGE("ExecuteQuery invalid token");
929         return -E_INVALID_ARGS;
930     }
931 
932     int errCode = remoteToken->GetData(packetSize, dataSet);
933     if (errCode == -E_UNFINISHED) {
934         errCode = E_OK;
935     } else {
936         if (errCode != E_OK) {
937             dataSet.Clear();
938         }
939         delete remoteToken;
940         remoteToken = nullptr;
941         token = nullptr;
942     }
943     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
944     return errCode;
945 }
946 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)947 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
948     uint8_t type)
949 {
950     if (ReadSchemaType(type) != SchemaType::RELATIVE) {
951         return -E_INVALID_ARGS;
952     }
953 
954     RelationalSchemaObject schemaObj;
955     int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
956     if (errCode != E_OK) {
957         LOGE("Parse remote schema failed. err=%d", errCode);
958         return errCode;
959     }
960 
961     std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
962     Key remoteSchemaKey(keyStr.begin(), keyStr.end());
963     Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
964     errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
965     if (errCode != E_OK) {
966         LOGE("Save remote schema failed. err=%d", errCode);
967         return errCode;
968     }
969 
970     return remoteDeviceSchema_.Put(deviceId, remoteSchema);
971 }
972 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj) const973 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId,
974     RelationalSchemaObject &schemaObj) const
975 {
976     if (schemaObj.IsSchemaValid()) {
977         LOGE("schema is already valid");
978         return -E_INVALID_ARGS;
979     }
980 
981     std::string remoteSchema;
982     int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
983     if (errCode == -E_NOT_FOUND) {
984         LOGW("Get remote device schema miss cached.");
985         std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
986         Key remoteSchemaKey(keyStr.begin(), keyStr.end());
987         Value remoteSchemaBuff;
988         errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
989         if (errCode != E_OK) {
990             LOGE("Get remote device schema from meta failed. err=%d", errCode);
991             return errCode;
992         }
993         remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
994         errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
995     }
996 
997     if (errCode != E_OK) {
998         LOGE("Get remote device schema failed. err=%d", errCode);
999         return errCode;
1000     }
1001 
1002     errCode = schemaObj.ParseFromSchemaString(remoteSchema);
1003     if (errCode != E_OK) {
1004         LOGE("Parse remote schema failed. err=%d", errCode);
1005     }
1006     return errCode;
1007 }
1008 
SetReusedHandle(StorageExecutor * handle)1009 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
1010 {
1011     std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
1012     reusedHandle_ = handle;
1013 }
1014 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const1015 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
1016 {
1017     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1018     delete remoteToken;
1019     remoteToken = nullptr;
1020     token = nullptr;
1021 }
1022 
GetStoreInfo() const1023 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1024 {
1025     StoreInfo info = {
1026         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
1027         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1028         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1029     };
1030     return info;
1031 }
1032 
StartTransaction(TransactType type)1033 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1034 {
1035     if (storageEngine_ == nullptr) {
1036         return -E_INVALID_DB;
1037     }
1038     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1039     if (transactionHandle_ != nullptr) {
1040         LOGD("Transaction started already.");
1041         return -E_TRANSACT_STATE;
1042     }
1043     int errCode = E_OK;
1044     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1045         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1046     if (handle == nullptr) {
1047         ReleaseHandle(handle);
1048         return errCode;
1049     }
1050     errCode = handle->StartTransaction(type);
1051     if (errCode != E_OK) {
1052         ReleaseHandle(handle);
1053         return errCode;
1054     }
1055     transactionHandle_ = handle;
1056     return errCode;
1057 }
1058 
Commit()1059 int RelationalSyncAbleStorage::Commit()
1060 {
1061     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1062     if (transactionHandle_ == nullptr) {
1063         LOGE("relation database is null or the transaction has not been started");
1064         return -E_INVALID_DB;
1065     }
1066     int errCode = transactionHandle_->Commit();
1067     ReleaseHandle(transactionHandle_);
1068     transactionHandle_ = nullptr;
1069     LOGD("connection commit transaction!");
1070     return errCode;
1071 }
1072 
Rollback()1073 int RelationalSyncAbleStorage::Rollback()
1074 {
1075     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1076     if (transactionHandle_ == nullptr) {
1077         LOGE("Invalid handle for rollback or the transaction has not been started.");
1078         return -E_INVALID_DB;
1079     }
1080 
1081     int errCode = transactionHandle_->Rollback();
1082     ReleaseHandle(transactionHandle_);
1083     transactionHandle_ = nullptr;
1084     LOGI("connection rollback transaction!");
1085     return errCode;
1086 }
1087 
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1088 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1089     const std::vector<Timestamp> &timestampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1090 {
1091     int errCode = E_OK;
1092     auto *handle = GetHandleExpectTransaction(false, errCode);
1093     if (handle == nullptr) {
1094         return errCode;
1095     }
1096     QuerySyncObject queryObj = query;
1097     queryObj.SetSchema(GetSchemaInfo());
1098     errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1099     if (transactionHandle_ == nullptr) {
1100         ReleaseHandle(handle);
1101     }
1102     return errCode;
1103 }
1104 
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1105 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp,
1106     bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1107 {
1108     int errCode = E_OK;
1109     auto *handle = GetHandleExpectTransaction(false, errCode);
1110     if (handle == nullptr) {
1111         return errCode;
1112     }
1113     QuerySyncObject queryObj = query;
1114     queryObj.SetSchema(GetSchemaInfo());
1115     errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1116     if (transactionHandle_ == nullptr) {
1117         ReleaseHandle(handle);
1118     }
1119     return errCode;
1120 }
1121 
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1122 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1123     const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1124 {
1125     if (transactionHandle_ == nullptr) {
1126         LOGE("the transaction has not been started");
1127         return -E_INVALID_DB;
1128     }
1129     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1130     QuerySyncObject query = querySyncObject;
1131     query.SetSchema(GetSchemaInfo());
1132     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1133     if (token == nullptr) {
1134         LOGE("[SingleVerNStore] Allocate continue token failed.");
1135         return -E_OUT_OF_MEMORY;
1136     }
1137     token->SetCloudTableSchema(tableSchema);
1138     continueStmtToken = static_cast<ContinueToken>(token);
1139     return GetCloudDataNext(continueStmtToken, cloudDataResult);
1140 }
1141 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1142 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1143     CloudSyncData &cloudDataResult)
1144 {
1145     if (continueStmtToken == nullptr) {
1146         return -E_INVALID_ARGS;
1147     }
1148     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1149     if (!token->CheckValid()) {
1150         return -E_INVALID_ARGS;
1151     }
1152     if (transactionHandle_ == nullptr) {
1153         LOGE("the transaction has not been started, release the token");
1154         ReleaseCloudDataToken(continueStmtToken);
1155         return -E_INVALID_DB;
1156     }
1157     cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1158     auto config = GetCloudSyncConfig();
1159     transactionHandle_->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1160     int errCode = transactionHandle_->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1161     LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1162         cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1163         cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1164     if (errCode != -E_UNFINISHED) {
1165         delete token;
1166         token = nullptr;
1167     }
1168     continueStmtToken = static_cast<ContinueToken>(token);
1169     if (errCode != E_OK && errCode != -E_UNFINISHED) {
1170         return errCode;
1171     }
1172     int fillRefGidCode = FillReferenceData(cloudDataResult);
1173     return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1174 }
1175 
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1176 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1177     bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1178 {
1179     int errCode = E_OK;
1180     auto *handle = GetHandle(false, errCode);
1181     if (handle == nullptr) {
1182         return errCode;
1183     }
1184     Timestamp beginTime = 0u;
1185     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1186     QuerySyncObject query = querySyncObject;
1187     query.SetSchema(GetSchemaInfo());
1188     handle->SetTableSchema(tableSchema);
1189     errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1190     ReleaseHandle(handle);
1191     if (errCode != E_OK) {
1192         LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1193     }
1194     return errCode;
1195 }
1196 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1197 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1198 {
1199     if (continueStmtToken == nullptr) {
1200         return E_OK;
1201     }
1202     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1203     if (!token->CheckValid()) {
1204         return E_OK;
1205     }
1206     int errCode = token->ReleaseCloudStatement();
1207     delete token;
1208     token = nullptr;
1209     return errCode;
1210 }
1211 
GetSchemaFromDB(RelationalSchemaObject & schema)1212 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1213 {
1214     Key schemaKey;
1215     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1216     Value schemaVal;
1217     int errCode = GetMetaData(schemaKey, schemaVal);
1218     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1219         LOGE("Get relational schema from DB failed. %d", errCode);
1220         return errCode;
1221     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1222         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1223         return -E_NOT_FOUND;
1224     }
1225     std::string schemaStr;
1226     DBCommon::VectorToString(schemaVal, schemaStr);
1227     errCode = schema.ParseFromSchemaString(schemaStr);
1228     if (errCode != E_OK) {
1229         LOGE("Parse schema string from DB failed.");
1230         return errCode;
1231     }
1232     storageEngine_->SetSchema(schema);
1233     return errCode;
1234 }
1235 
ChkSchema(const TableName & tableName)1236 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1237 {
1238     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1239     RelationalSchemaObject localSchema = GetSchemaInfo();
1240     int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1241     if (errCode == -E_SCHEMA_MISMATCH) {
1242         LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1243         RelationalSchemaObject newSchema;
1244         errCode = GetSchemaFromDB(newSchema);
1245         if (errCode != E_OK) {
1246             LOGE("Get schema from db when check schema. err: %d", errCode);
1247             return -E_SCHEMA_MISMATCH;
1248         }
1249         errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1250     }
1251     return errCode;
1252 }
1253 
SetCloudDbSchema(const DataBaseSchema & schema)1254 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1255 {
1256     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1257     RelationalSchemaObject localSchema = GetSchemaInfo();
1258     schemaMgr_.SetCloudDbSchema(schema, localSchema);
1259     return E_OK;
1260 }
1261 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1262 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1263     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1264 {
1265     return GetInfoByPrimaryKeyOrGid(tableName, vBucket, true, dataInfoWithLog, assetInfo);
1266 }
1267 
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1268 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1269     const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1270 {
1271     if (handle == nullptr) {
1272         return -E_INVALID_DB;
1273     }
1274     TableSchema tableSchema;
1275     int errCode = GetCloudTableSchema(tableName, tableSchema);
1276     if (errCode != E_OK) {
1277         LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1278         return errCode;
1279     }
1280     RelationalSchemaObject localSchema = GetSchemaInfo();
1281     handle->SetLocalSchema(localSchema);
1282     return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1283 }
1284 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1285 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1286 {
1287     if (transactionHandle_ == nullptr) {
1288         LOGE(" the transaction has not been started");
1289         return -E_INVALID_DB;
1290     }
1291     return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1292 }
1293 
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1294 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1295     const std::string &tableName, DownloadData &downloadData)
1296 {
1297     TableSchema tableSchema;
1298     int errCode = GetCloudTableSchema(tableName, tableSchema);
1299     if (errCode != E_OK) {
1300         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1301         return errCode;
1302     }
1303     RelationalSchemaObject localSchema = GetSchemaInfo();
1304     handle->SetLocalSchema(localSchema);
1305     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1306     handle->SetLogicDelete(IsCurrentLogicDelete());
1307     errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1308     handle->SetLogicDelete(false);
1309     return errCode;
1310 }
1311 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1312 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1313 {
1314     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1315     cloudSchema = schemaMgr_.GetCloudDbSchema();
1316     return E_OK;
1317 }
1318 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1319 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1320     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1321 {
1322     if (transactionHandle_ == nullptr) {
1323         LOGE("the transaction has not been started");
1324         return -E_INVALID_DB;
1325     }
1326     transactionHandle_->SetLogicDelete(logicDelete_);
1327     std::vector<std::string> notifyTableList;
1328     int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1329     if (!notifyTableList.empty()) {
1330         for (auto notifyTableName : notifyTableList) {
1331             ChangedData changedData;
1332             changedData.type = ChangedDataType::DATA;
1333             changedData.tableName = notifyTableName;
1334             std::vector<DistributedDB::Type> dataVec;
1335             DistributedDB::Type type;
1336             if (mode == FLAG_ONLY) {
1337                 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1338             } else {
1339                 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1340             }
1341             dataVec.push_back(type);
1342             changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1343             TriggerObserverAction("CLOUD", std::move(changedData), true);
1344         }
1345     }
1346     transactionHandle_->SetLogicDelete(false);
1347     return errCode;
1348 }
1349 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1350 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1351 {
1352     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1353     return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1354 }
1355 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1356 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1357     bool isDownloadSuccess)
1358 {
1359     if (storageEngine_ == nullptr) {
1360         return -E_INVALID_DB;
1361     }
1362     if (transactionHandle_ == nullptr) {
1363         LOGE("the transaction has not been started when fill asset for download.");
1364         return -E_INVALID_DB;
1365     }
1366     TableSchema tableSchema;
1367     int errCode = GetCloudTableSchema(tableName, tableSchema);
1368     if (errCode != E_OK) {
1369         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1370         return errCode;
1371     }
1372     uint64_t currCursor = DBConstant::INVALID_CURSOR;
1373     errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess, currCursor);
1374     if (errCode != E_OK) {
1375         LOGE("fill cloud asset for download failed.%d", errCode);
1376     }
1377     return errCode;
1378 }
1379 
SetLogTriggerStatus(bool status)1380 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1381 {
1382     int errCode = E_OK;
1383     auto *handle = GetHandleExpectTransaction(false, errCode);
1384     if (handle == nullptr) {
1385         return errCode;
1386     }
1387     errCode = handle->SetLogTriggerStatus(status);
1388     if (transactionHandle_ == nullptr) {
1389         ReleaseHandle(handle);
1390     }
1391     return errCode;
1392 }
1393 
SetCursorIncFlag(bool flag)1394 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1395 {
1396     int errCode = E_OK;
1397     auto *handle = GetHandleExpectTransaction(false, errCode);
1398     if (handle == nullptr) {
1399         return errCode;
1400     }
1401     errCode = handle->SetCursorIncFlag(flag);
1402     if (transactionHandle_ == nullptr) {
1403         ReleaseHandle(handle);
1404     }
1405     return errCode;
1406 }
1407 
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1408 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1409     bool ignoreEmptyGid)
1410 {
1411     if (storageEngine_ == nullptr) {
1412         return -E_INVALID_DB;
1413     }
1414     int errCode = E_OK;
1415     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1416         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1417     if (writeHandle == nullptr) {
1418         return errCode;
1419     }
1420     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1421     if (errCode != E_OK) {
1422         ReleaseHandle(writeHandle);
1423         return errCode;
1424     }
1425     errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1426     if (errCode != E_OK) {
1427         LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1428         writeHandle->Rollback();
1429         ReleaseHandle(writeHandle);
1430         return errCode;
1431     }
1432     errCode = writeHandle->Commit();
1433     ReleaseHandle(writeHandle);
1434     return errCode;
1435 }
1436 
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1437 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1438 {
1439     syncAbleEngine_ = syncAbleEngine;
1440 }
1441 
GetIdentify() const1442 std::string RelationalSyncAbleStorage::GetIdentify() const
1443 {
1444     if (storageEngine_ == nullptr) {
1445         LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1446         return "";
1447     }
1448     return storageEngine_->GetIdentifier();
1449 }
1450 
EraseDataChangeCallback(uint64_t connectionId)1451 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1452 {
1453     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1454         ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1455         ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1456         auto it = dataChangeCallbackMap_.find(connectionId);
1457         if (it != dataChangeCallbackMap_.end()) {
1458             dataChangeCallbackMap_.erase(it);
1459             LOGI("erase all observer for this delegate.");
1460         }
1461     }, nullptr, &dataChangeCallbackMap_);
1462     ADAPTER_WAIT(handle);
1463 }
1464 
ReleaseContinueToken(ContinueToken & continueStmtToken) const1465 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1466 {
1467     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1468     if (token == nullptr || !(token->CheckValid())) {
1469         LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1470         return;
1471     }
1472     delete token;
1473     continueStmtToken = nullptr;
1474 }
1475 
CheckQueryValid(const QuerySyncObject & query)1476 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1477 {
1478     int errCode = E_OK;
1479     auto *handle = GetHandle(false, errCode);
1480     if (handle == nullptr) {
1481         return errCode;
1482     }
1483     errCode = handle->CheckQueryObjectLegal(query);
1484     if (errCode != E_OK) {
1485         ReleaseHandle(handle);
1486         return errCode;
1487     }
1488     QuerySyncObject queryObj = query;
1489     queryObj.SetSchema(GetSchemaInfo());
1490     int64_t count = 0;
1491     errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1492     ReleaseHandle(handle);
1493     if (errCode != E_OK) {
1494         LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1495         return -E_INVALID_ARGS;
1496     }
1497     return errCode;
1498 }
1499 
CreateTempSyncTrigger(const std::string & tableName)1500 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1501 {
1502     int errCode = E_OK;
1503     auto *handle = GetHandle(true, errCode);
1504     if (handle == nullptr) {
1505         return errCode;
1506     }
1507     errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1508     ReleaseHandle(handle);
1509     if (errCode != E_OK) {
1510         LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1511     }
1512     return errCode;
1513 }
1514 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1515 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1516     ChangeProperties &changeProperties)
1517 {
1518     int errCode = E_OK;
1519     auto *handle = GetHandle(false, errCode);
1520     if (handle == nullptr) {
1521         return errCode;
1522     }
1523     errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1524     ReleaseHandle(handle);
1525     if (errCode != E_OK) {
1526         LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1527     }
1528     return errCode;
1529 }
1530 
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1531 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1532 {
1533     if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1534         changedData.field = {};
1535         for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1536             changedData.primaryData[i].clear();
1537         }
1538     }
1539     if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1540         changedData.properties = {};
1541     }
1542 }
1543 
ClearAllTempSyncTrigger()1544 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1545 {
1546     int errCode = E_OK;
1547     auto *handle = GetHandle(true, errCode);
1548     if (handle == nullptr) {
1549         return errCode;
1550     }
1551     errCode = handle->ClearAllTempSyncTrigger();
1552     ReleaseHandle(handle);
1553     if (errCode != E_OK) {
1554         LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1555     }
1556     return errCode;
1557 }
1558 
FillReferenceData(CloudSyncData & syncData)1559 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1560 {
1561     std::map<int64_t, Entries> referenceGid;
1562     int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1563     if (errCode != E_OK) {
1564         LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1565         return errCode;
1566     }
1567     errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1568     if (errCode != E_OK) {
1569         return errCode;
1570     }
1571     referenceGid.clear();
1572     errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1573     if (errCode != E_OK) {
1574         LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1575         return errCode;
1576     }
1577     return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1578 }
1579 
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1580 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1581     const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1582 {
1583     if (referenceGid.empty()) {
1584         return E_OK;
1585     }
1586     int ignoredCount = 0;
1587     for (size_t index = 0u; index < rowid.size(); index++) {
1588         if (index >= extend.size()) {
1589             LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1590             return -E_UNEXPECTED_DATA;
1591         }
1592         int64_t rowId = rowid[index];
1593         if (referenceGid.find(rowId) == referenceGid.end()) {
1594             // current data miss match reference data, we ignored it
1595             ignoredCount++;
1596             continue;
1597         }
1598         extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1599     }
1600     if (ignoredCount != 0) {
1601         LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1602     }
1603     return E_OK;
1604 }
1605 
IsSharedTable(const std::string & tableName)1606 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1607 {
1608     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1609     return schemaMgr_.IsSharedTable(tableName);
1610 }
1611 
GetSharedTableOriginNames()1612 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1613 {
1614     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1615     return schemaMgr_.GetSharedTableOriginNames();
1616 }
1617 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1618 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1619     std::map<int64_t, Entries> &referenceGid)
1620 {
1621     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1622     int errCode = GetTableReference(tableName, tableReference);
1623     if (errCode != E_OK) {
1624         return errCode;
1625     }
1626     if (tableReference.empty()) {
1627         LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1628         return E_OK;
1629     }
1630     auto *handle = GetHandle(false, errCode);
1631     if (handle == nullptr) {
1632         return errCode;
1633     }
1634     errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1635     ReleaseHandle(handle);
1636     return errCode;
1637 }
1638 
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1639 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1640     std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1641 {
1642     if (storageEngine_ == nullptr) {
1643         LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1644         return -E_INVALID_DB;
1645     }
1646     RelationalSchemaObject schema = storageEngine_->GetSchema();
1647     auto referenceProperty = schema.GetReferenceProperty();
1648     if (referenceProperty.empty()) {
1649         return E_OK;
1650     }
1651     auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1652     if (errCode != E_OK) {
1653         return errCode;
1654     }
1655     for (const auto &property : referenceProperty) {
1656         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1657             if (!IsSharedTable(tableName)) {
1658                 reference[property.targetTableName].push_back(property);
1659                 continue;
1660             }
1661             TableReferenceProperty tableReference;
1662             tableReference.sourceTableName = tableName;
1663             tableReference.columns = property.columns;
1664             tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1665             auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1666             if (ret != E_OK) {
1667                 return ret;
1668             }
1669             tableReference.targetTableName = sharedTargetTable;
1670             reference[tableReference.targetTableName].push_back(tableReference);
1671         }
1672     }
1673     return E_OK;
1674 }
1675 
GetSourceTableName(const std::string & tableName)1676 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1677 {
1678     std::pair<std::string, int> res = { "", E_OK };
1679     std::shared_ptr<DataBaseSchema> cloudSchema;
1680     (void) GetCloudDbSchema(cloudSchema);
1681     if (cloudSchema == nullptr) {
1682         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1683         return { "", -E_INTERNAL_ERROR };
1684     }
1685     for (const auto &table : cloudSchema->tables) {
1686         if (CloudStorageUtils::IsSharedTable(table)) {
1687             continue;
1688         }
1689         if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1690             DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1691             res.first = table.name;
1692             break;
1693         }
1694     }
1695     if (res.first.empty()) {
1696         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1697         res.second = -E_SCHEMA_MISMATCH;
1698     }
1699     return res;
1700 }
1701 
GetSharedTargetTableName(const std::string & tableName)1702 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1703 {
1704     std::pair<std::string, int> res = { "", E_OK };
1705     std::shared_ptr<DataBaseSchema> cloudSchema;
1706     (void) GetCloudDbSchema(cloudSchema);
1707     if (cloudSchema == nullptr) {
1708         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1709         return { "", -E_INTERNAL_ERROR };
1710     }
1711     for (const auto &table : cloudSchema->tables) {
1712         if (CloudStorageUtils::IsSharedTable(table)) {
1713             continue;
1714         }
1715         if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1716             res.first = table.sharedTableName;
1717             break;
1718         }
1719     }
1720     if (res.first.empty()) {
1721         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1722         res.second = -E_SCHEMA_MISMATCH;
1723     }
1724     return res;
1725 }
1726 
SetLogicDelete(bool logicDelete)1727 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1728 {
1729     logicDelete_ = logicDelete;
1730     LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1731 }
1732 
IsCurrentLogicDelete() const1733 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1734 {
1735     return logicDelete_;
1736 }
1737 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1738 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1739     const std::string &gid, const Bytes &hashKey, VBucket &assets)
1740 {
1741     if (gid.empty() && hashKey.empty()) {
1742         LOGE("both gid and hashKey are empty.");
1743         return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1744     }
1745     if (transactionHandle_ == nullptr) {
1746         LOGE("the transaction has not been started");
1747         return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1748     }
1749     auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1750     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1751         LOGE("get assets by gid or hashKey failed. %d", errCode);
1752     }
1753     return { errCode, status };
1754 }
1755 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1756 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1757 {
1758     int errCode = E_OK;
1759     auto *wHandle = GetHandle(true, errCode);
1760     if (wHandle == nullptr) {
1761         return errCode;
1762     }
1763     wHandle->SetIAssetLoader(loader);
1764     ReleaseHandle(wHandle);
1765     return errCode;
1766 }
1767 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1768 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1769     const std::vector<VBucket> &records)
1770 {
1771     int errCode = E_OK;
1772     auto *handle = GetHandle(true, errCode);
1773     if (handle == nullptr || errCode != E_OK) {
1774         return errCode;
1775     }
1776     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1777     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1778     errCode = UpsertDataInner(handle, tableName, records);
1779     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1780     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1781     ReleaseHandle(handle);
1782     return errCode;
1783 }
1784 
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1785 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1786     const std::string &tableName, const std::vector<VBucket> &records)
1787 {
1788     int errCode = E_OK;
1789     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1790     if (errCode != E_OK) {
1791         LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1792         return errCode;
1793     }
1794     errCode = CreateTempSyncTriggerInner(handle, tableName);
1795     if (errCode == E_OK) {
1796         errCode = UpsertDataInTransaction(handle, tableName, records);
1797         (void) handle->ClearAllTempSyncTrigger();
1798     }
1799     if (errCode == E_OK) {
1800         errCode = handle->Commit();
1801         if (errCode != E_OK) {
1802             LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1803         }
1804     } else {
1805         int ret = handle->Rollback();
1806         if (ret != E_OK) {
1807             LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1808         }
1809     }
1810     return errCode;
1811 }
1812 
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1813 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1814     const std::string &tableName, const std::vector<VBucket> &records)
1815 {
1816     TableSchema tableSchema;
1817     int errCode = GetCloudTableSchema(tableName, tableSchema);
1818     if (errCode != E_OK) {
1819         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1820         return errCode;
1821     }
1822     TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1823     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1824     std::set<std::vector<uint8_t>> primaryKeys;
1825     DownloadData downloadData;
1826     for (const auto &record : records) {
1827         DataInfoWithLog dataInfoWithLog;
1828         VBucket assetInfo;
1829         auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1830             tableSchema, localTable, pkMap, false);
1831         if (errorCode != E_OK) {
1832             return errorCode;
1833         }
1834         errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1835         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1836             return errCode;
1837         }
1838         VBucket recordCopy = record;
1839         if ((errCode == -E_NOT_FOUND ||
1840             (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1841             primaryKeys.find(hashValue) == primaryKeys.end()) {
1842             downloadData.opType.push_back(OpType::INSERT);
1843             auto currentTime = TimeHelper::GetSysCurrentTime();
1844             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1845             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1846             primaryKeys.insert(hashValue);
1847         } else {
1848             downloadData.opType.push_back(OpType::UPDATE);
1849             recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1850             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1851             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1852             recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1853             recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1854         }
1855         downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1856         downloadData.data.push_back(std::move(recordCopy));
1857     }
1858     return PutCloudSyncDataInner(handle, tableName, downloadData);
1859 }
1860 
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1861 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1862     const LogInfo &logInfo)
1863 {
1864     if (transactionHandle_ == nullptr) {
1865         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1866         return -E_INVALID_DB;
1867     }
1868     TableSchema tableSchema;
1869     GetCloudTableSchema(tableName, tableSchema);
1870     std::vector<VBucket> assets;
1871     int errCode = transactionHandle_->GetDownloadAssetRecordsByGid(tableSchema, logInfo.cloudGid, assets);
1872     if (errCode != E_OK) {
1873         LOGE("[RelationalSyncAbleStorage] get download asset by gid %s failed %d",
1874             DBCommon::StringMiddleMasking(logInfo.cloudGid).c_str(), errCode);
1875         return errCode;
1876     }
1877     bool isInconsistency = !assets.empty();
1878     UpdateRecordFlagStruct updateRecordFlag = {
1879         .tableName = tableName,
1880         .isRecordConflict = recordConflict,
1881         .isInconsistency = isInconsistency
1882     };
1883     std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(updateRecordFlag, logInfo);
1884     return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1885 }
1886 
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1887 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1888     OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1889 {
1890     TableSchema tableSchema;
1891     int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1892     if (errCode != E_OK) {
1893         LOGE("get table schema failed when fill log and asset. %d", errCode);
1894         return errCode;
1895     }
1896     errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1897     if (errCode != E_OK) {
1898         return errCode;
1899     }
1900     if (opType == OpType::INSERT) {
1901         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1902             handle, {data.tableName, CloudWaterType::INSERT, tableSchema}, data.insData, uploadRecorder_);
1903     } else if (opType == OpType::UPDATE) {
1904         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1905             handle, {data.tableName, CloudWaterType::UPDATE, tableSchema}, data.updData, uploadRecorder_);
1906     } else if (opType == OpType::DELETE) {
1907         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1908             handle, {data.tableName, CloudWaterType::DELETE, tableSchema}, data.delData, uploadRecorder_);
1909     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1910         errCode = CloudStorageUtils::UpdateRecordFlagAfterUpload(
1911             handle, {data.tableName, CloudWaterType::BUTT, tableSchema}, data.lockData, uploadRecorder_, true);
1912     }
1913     return errCode;
1914 }
1915 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users,bool isQueryDownloadRecords)1916 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1917     std::vector<std::string> &users, bool isQueryDownloadRecords)
1918 {
1919     std::vector<TableSchema> tables;
1920     int errCode = GetCloudTableWithoutShared(tables);
1921     if (errCode != E_OK) {
1922         return errCode;
1923     }
1924     if (tables.empty()) {
1925         LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1926         return E_OK;
1927     }
1928     auto *handle = GetHandle(true, errCode);
1929     if (handle == nullptr || errCode != E_OK) {
1930         return errCode;
1931     }
1932     errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery, isQueryDownloadRecords);
1933     ReleaseHandle(handle);
1934     return errCode;
1935 }
1936 
ClearUnLockingNoNeedCompensated()1937 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1938 {
1939     std::vector<TableSchema> tables;
1940     int errCode = GetCloudTableWithoutShared(tables);
1941     if (errCode != E_OK) {
1942         return errCode;
1943     }
1944     if (tables.empty()) {
1945         LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1946         return E_OK;
1947     }
1948     auto *handle = GetHandle(true, errCode);
1949     if (handle == nullptr || errCode != E_OK) {
1950         return errCode;
1951     }
1952     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1953     if (errCode != E_OK) {
1954         ReleaseHandle(handle);
1955         return errCode;
1956     }
1957     for (const auto &table : tables) {
1958         errCode = handle->ClearUnLockingStatus(table.name);
1959         if (errCode != E_OK) {
1960             LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1961         }
1962     }
1963     errCode = handle->Commit();
1964     if (errCode != E_OK) {
1965         LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1966     }
1967     ReleaseHandle(handle);
1968     return errCode;
1969 }
1970 
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1971 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1972 {
1973     const auto tableInfos = GetSchemaInfo().GetTables();
1974     for (const auto &[tableName, info] : tableInfos) {
1975         if (info.GetSharedTableMark()) {
1976             continue;
1977         }
1978         TableSchema schema;
1979         int errCode = GetCloudTableSchema(tableName, schema);
1980         if (errCode == -E_NOT_FOUND) {
1981             continue;
1982         }
1983         if (errCode != E_OK) {
1984             LOGW("[RDBStorage] Get cloud table failed %d", errCode);
1985             return errCode;
1986         }
1987         tables.push_back(schema);
1988     }
1989     return E_OK;
1990 }
1991 
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery,bool isQueryDownloadRecords)1992 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1993     const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery, bool isQueryDownloadRecords)
1994 {
1995     int errCode = E_OK;
1996     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1997     if (errCode != E_OK) {
1998         return errCode;
1999     }
2000     for (const auto &table : tables) {
2001         if (!CheckTableSupportCompensatedSync(table)) {
2002             continue;
2003         }
2004 
2005         std::vector<VBucket> syncDataPk;
2006         errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk, isQueryDownloadRecords);
2007         if (errCode != E_OK) {
2008             LOGW("[RDBStorageEngine] Get wait compensated sync data failed, continue! errCode=%d", errCode);
2009             errCode = E_OK;
2010             continue;
2011         }
2012         if (syncDataPk.empty()) {
2013             // no data need to compensated sync
2014             continue;
2015         }
2016         errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncQuery);
2017         if (errCode != E_OK) {
2018             LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2019             errCode = E_OK;
2020             continue;
2021         }
2022     }
2023     if (errCode == E_OK) {
2024         errCode = handle->Commit();
2025         if (errCode != E_OK) {
2026             LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2027         }
2028     } else {
2029         int ret = handle->Rollback();
2030         if (ret != E_OK) {
2031             LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2032         }
2033     }
2034     return errCode;
2035 }
2036 
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2037 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2038     const std::string &tableName, bool flag)
2039 {
2040     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2041     if (trackerTable.IsEmpty()) {
2042         trackerTable.SetTableName(tableName);
2043     }
2044     return handle->CreateTempSyncTrigger(trackerTable, flag);
2045 }
2046 
CheckTableSupportCompensatedSync(const TableSchema & table)2047 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2048 {
2049     auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2050         return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2051             field.type == TYPE_INDEX<Bytes>);
2052     });
2053     if (it != table.fields.end()) {
2054         LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2055         return false;
2056     }
2057     // check whether reference exist
2058     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2059     int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2060     if (errCode != E_OK) {
2061         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2062         return false;
2063     }
2064     if (!tableReference.empty()) {
2065         LOGI("[RDBStorageEngine] current table exist reference property");
2066         return false;
2067     }
2068     return true;
2069 }
2070 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2071 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2072     const std::set<std::string> &gidFilters)
2073 {
2074     if (transactionHandle_ == nullptr) {
2075         LOGE("the transaction has not been started");
2076         return -E_INVALID_DB;
2077     }
2078     int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2079     if (errCode != E_OK) {
2080         LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2081     }
2082     return errCode;
2083 }
2084 
GetCloudSyncConfig() const2085 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2086 {
2087     std::lock_guard<std::mutex> autoLock(configMutex_);
2088     return cloudSyncConfig_;
2089 }
2090 
SetCloudSyncConfig(const CloudSyncConfig & config)2091 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2092 {
2093     std::lock_guard<std::mutex> autoLock(configMutex_);
2094     cloudSyncConfig_ = config;
2095 }
2096 
IsTableExistReference(const std::string & table)2097 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2098 {
2099     // check whether reference exist
2100     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2101     int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2102     if (errCode != E_OK) {
2103         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2104         return false;
2105     }
2106     return !tableReference.empty();
2107 }
2108 
IsTableExistReferenceOrReferenceBy(const std::string & table)2109 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2110 {
2111     // check whether reference or reference by exist
2112     if (storageEngine_ == nullptr) {
2113         LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2114         return false;
2115     }
2116     RelationalSchemaObject schema = storageEngine_->GetSchema();
2117     auto referenceProperty = schema.GetReferenceProperty();
2118     if (referenceProperty.empty()) {
2119         return false;
2120     }
2121     auto [sourceTableName, errCode] = GetSourceTableName(table);
2122     if (errCode != E_OK) {
2123         return false;
2124     }
2125     for (const auto &property : referenceProperty) {
2126         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2127             DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2128             return true;
2129         }
2130     }
2131     return false;
2132 }
2133 
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)2134 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
2135     Timestamp localMark)
2136 {
2137     uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
2138 }
2139 
ReviseLocalModTime(const std::string & tableName,const std::vector<ReviseModTimeInfo> & revisedData)2140 int RelationalSyncAbleStorage::ReviseLocalModTime(const std::string &tableName,
2141     const std::vector<ReviseModTimeInfo> &revisedData)
2142 {
2143     if (storageEngine_ == nullptr) {
2144         LOGE("[ReviseLocalModTime] Storage is null");
2145         return -E_INVALID_DB;
2146     }
2147     int errCode = E_OK;
2148     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
2149             storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
2150     if (writeHandle == nullptr) {
2151         LOGE("[ReviseLocalModTime] Get write handle fail: %d", errCode);
2152         return errCode;
2153     }
2154     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
2155     if (errCode != E_OK) {
2156         LOGE("[ReviseLocalModTime] Start Transaction fail: %d", errCode);
2157         ReleaseHandle(writeHandle);
2158         return errCode;
2159     }
2160     errCode = writeHandle->ReviseLocalModTime(tableName, revisedData);
2161     if (errCode != E_OK) {
2162         LOGE("[ReviseLocalModTime] Revise local modify time fail: %d", errCode);
2163         writeHandle->Rollback();
2164         ReleaseHandle(writeHandle);
2165         return errCode;
2166     }
2167     errCode = writeHandle->Commit();
2168     ReleaseHandle(writeHandle);
2169     return errCode;
2170 }
2171 
GetCursor(const std::string & tableName,uint64_t & cursor)2172 int RelationalSyncAbleStorage::GetCursor(const std::string &tableName, uint64_t &cursor)
2173 {
2174     if (transactionHandle_ == nullptr) {
2175         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
2176         return -E_INVALID_DB;
2177     }
2178     return transactionHandle_->GetCursor(tableName, cursor);
2179 }
2180 
GetLocalDataCount(const std::string & tableName,int & dataCount,int & logicDeleteDataCount)2181 int RelationalSyncAbleStorage::GetLocalDataCount(const std::string &tableName, int &dataCount,
2182     int &logicDeleteDataCount)
2183 {
2184     int errCode = E_OK;
2185     auto *handle = GetHandle(false, errCode);
2186     if (handle == nullptr || errCode != E_OK) {
2187         LOGE("[RelationalSyncAbleStorage] Get handle failed when get local data count: %d", errCode);
2188         return errCode;
2189     }
2190     errCode = handle->GetLocalDataCount(tableName, dataCount, logicDeleteDataCount);
2191     ReleaseHandle(handle);
2192     return errCode;
2193 }
2194 }
2195 #endif
2196