• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "query_utils.h"
29 #include "relational_remote_query_continue_token.h"
30 #include "relational_sync_data_inserter.h"
31 #include "res_finalizer.h"
32 #include "runtime_context.h"
33 #include "time_helper.h"
34 
35 namespace DistributedDB {
36 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)37 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
38 {
39     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
40     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
41         std::string(CLOSE_CONN_TASK),
42         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
43     );
44 }
45 }
46 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)47 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
48     : storageEngine_(std::move(engine)),
49       reusedHandle_(nullptr),
50       isCachedOption_(false)
51 {}
52 
~RelationalSyncAbleStorage()53 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
54 {
55     syncAbleEngine_ = nullptr;
56 }
57 
58 // Get interface type of this relational db.
GetInterfaceType() const59 int RelationalSyncAbleStorage::GetInterfaceType() const
60 {
61     return SYNC_RELATION;
62 }
63 
64 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()65 void RelationalSyncAbleStorage::IncRefCount()
66 {
67     LOGD("RelationalSyncAbleStorage ref +1");
68     IncObjRef(this);
69 }
70 
71 // Drop the interface ref-count.
DecRefCount()72 void RelationalSyncAbleStorage::DecRefCount()
73 {
74     LOGD("RelationalSyncAbleStorage ref -1");
75     DecObjRef(this);
76 }
77 
78 // Get the identifier of this rdb.
GetIdentifier() const79 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
80 {
81     std::string identifier = storageEngine_->GetIdentifier();
82     return std::vector<uint8_t>(identifier.begin(), identifier.end());
83 }
84 
GetDualTupleIdentifier() const85 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
86 {
87     std::string identifier = storageEngine_->GetProperties().GetStringProp(
88         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
89     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
90     return identifierVect;
91 }
92 
93 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const94 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
95 {
96     int errCode = E_OK;
97     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
98     if (handle == nullptr) {
99         return;
100     }
101     timestamp = 0;
102     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
103     if (errCode != E_OK) {
104         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
105         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
106     }
107     ReleaseHandle(handle);
108     return;
109 }
110 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const111 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
112 {
113     int errCode = E_OK;
114     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
115     if (handle == nullptr) {
116         return errCode;
117     }
118     timestamp = 0;
119     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
120     if (errCode != E_OK) {
121         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
122         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
123     }
124     ReleaseHandle(handle);
125     return errCode;
126 }
127 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const128 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
129     OperatePerm perm) const
130 {
131     if (storageEngine_ == nullptr) {
132         errCode = -E_INVALID_DB;
133         return nullptr;
134     }
135     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
136         storageEngine_->FindExecutor(isWrite, perm, errCode));
137     if (handle == nullptr) {
138         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
139     }
140     return handle;
141 }
142 
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const143 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
144     int &errCode, OperatePerm perm) const
145 {
146     if (storageEngine_ == nullptr) {
147         errCode = -E_INVALID_DB;
148         return nullptr;
149     }
150     if (transactionHandle_ != nullptr) {
151         return transactionHandle_;
152     }
153     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
154         storageEngine_->FindExecutor(isWrite, perm, errCode));
155     if (errCode != E_OK) {
156         ReleaseHandle(handle);
157         handle = nullptr;
158     }
159     return handle;
160 }
161 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const162 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
163 {
164     if (storageEngine_ == nullptr) {
165         return;
166     }
167     StorageExecutor *databaseHandle = handle;
168     storageEngine_->Recycle(databaseHandle);
169     std::function<void()> listener = nullptr;
170     {
171         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
172         listener = heartBeatListener_;
173     }
174     if (listener) {
175         listener();
176     }
177 }
178 
179 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const180 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
181 {
182     if (storageEngine_ == nullptr) {
183         return -E_INVALID_DB;
184     }
185     if (key.size() > DBConstant::MAX_KEY_SIZE) {
186         return -E_INVALID_ARGS;
187     }
188     int errCode = E_OK;
189     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
190     if (handle == nullptr) {
191         return errCode;
192     }
193     errCode = handle->GetKvData(key, value);
194     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
195         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
196     }
197     ReleaseHandle(handle);
198     return errCode;
199 }
200 
201 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)202 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
203 {
204     if (storageEngine_ == nullptr) {
205         return -E_INVALID_DB;
206     }
207     int errCode = E_OK;
208     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
209     if (handle == nullptr) {
210         return errCode;
211     }
212 
213     errCode = handle->PutKvData(key, value); // meta doesn't need time.
214     if (errCode != E_OK) {
215         LOGE("Put kv data err:%d", errCode);
216         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
217     }
218     ReleaseHandle(handle);
219     return errCode;
220 }
221 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)222 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
223 {
224     if (storageEngine_ == nullptr) {
225         return -E_INVALID_DB;
226     }
227     int errCode = E_OK;
228     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
229     std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
230 
231     // try to recycle using the handle
232     if (isInTransaction) {
233         handLock.lock();
234         if (reusedHandle_ != nullptr) {
235             handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
236         } else {
237             isInTransaction = false;
238             handLock.unlock();
239         }
240     }
241 
242     if (handle == nullptr) {
243         handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
244         if (handle == nullptr) {
245             return errCode;
246         }
247     }
248 
249     errCode = handle->PutKvData(key, value);
250     if (errCode != E_OK) {
251         LOGE("Put kv data err:%d", errCode);
252         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
253     }
254     if (!isInTransaction) {
255         ReleaseHandle(handle);
256     }
257     return errCode;
258 }
259 
260 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)261 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
262 {
263     if (storageEngine_ == nullptr) {
264         return -E_INVALID_DB;
265     }
266     for (const auto &key : keys) {
267         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
268             return -E_INVALID_ARGS;
269         }
270     }
271     int errCode = E_OK;
272     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
273     if (handle == nullptr) {
274         return errCode;
275     }
276 
277     handle->StartTransaction(TransactType::IMMEDIATE);
278     errCode = handle->DeleteMetaData(keys);
279     if (errCode != E_OK) {
280         handle->Rollback();
281         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
282         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283     } else {
284         handle->Commit();
285     }
286     ReleaseHandle(handle);
287     return errCode;
288 }
289 
290 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const291 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
292 {
293     if (storageEngine_ == nullptr) {
294         return -E_INVALID_DB;
295     }
296     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
297         return -E_INVALID_ARGS;
298     }
299 
300     int errCode = E_OK;
301     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
302     if (handle == nullptr) {
303         return errCode;
304     }
305 
306     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
307     if (errCode != E_OK) {
308         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
309         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
310     }
311     ReleaseHandle(handle);
312     return errCode;
313 }
314 
315 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const316 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
317 {
318     if (storageEngine_ == nullptr) {
319         return -E_INVALID_DB;
320     }
321     int errCode = E_OK;
322     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
323     if (handle == nullptr) {
324         return errCode;
325     }
326 
327     errCode = handle->GetAllMetaKeys(keys);
328     if (errCode != E_OK) {
329         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
330     }
331     ReleaseHandle(handle);
332     return errCode;
333 }
334 
GetDbProperties() const335 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
336 {
337     return storageEngine_->GetProperties();
338 }
339 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)340 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
341 {
342     int errCode = E_OK;
343     for (auto &item : dataItems) {
344         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
345         if (entry == nullptr) {
346             errCode = -E_OUT_OF_MEMORY;
347             LOGE("GetKvEntries failed, errCode:%d", errCode);
348             SingleVerKvEntry::Release(entries);
349             break;
350         }
351         entry->SetEntryData(std::move(item));
352         entries.push_back(entry);
353     }
354     return errCode;
355 }
356 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)357 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
358 {
359     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
360     // the size would not be very large.
361     static const size_t maxOrigDevLength = 40;
362     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
363     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
364                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
365     return dataSize;
366 }
367 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)368 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
369     size_t appendLen)
370 {
371     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
372     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
373         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
374         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
375     }
376     return !reachThreshold;
377 }
378 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)379 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
380     SQLiteSingleVerRelationalContinueToken *&token)
381 {
382     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
383         delete token;
384         token = nullptr;
385         return;
386     }
387 
388     if (dataItems.empty()) {
389         errCode = -E_INTERNAL_ERROR;
390         LOGE("Get data unfinished but data items is empty.");
391         delete token;
392         token = nullptr;
393         return;
394     }
395     token->SetNextBeginTime(dataItems.back());
396     token->UpdateNextSyncOffset(dataItems.size());
397 }
398 
399 /**
400  * Caller must ensure that parameter token is valid.
401  * If error happened, token will be deleted here.
402  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const403 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
404     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
405 {
406     if (storageEngine_ == nullptr) {
407         return -E_INVALID_DB;
408     }
409 
410     int errCode = E_OK;
411     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
412         OperatePerm::NORMAL_PERM, errCode));
413     if (handle == nullptr) {
414         goto ERROR;
415     }
416 
417     do {
418         errCode = handle->GetSyncDataByQuery(dataItems,
419             Parcel::GetAppendedLen(),
420             dataSizeInfo,
421             [token](sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt, bool &isGettingDeletedData) {
422                 return token->GetStatement(db, queryStmt, fullStmt, isGettingDeletedData);
423             }, storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
424         if (errCode == -E_FINISHED) {
425             token->FinishGetData();
426             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
427         }
428     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
429 
430 ERROR:
431     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
432         dataItems.clear();
433     }
434     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
435     ReleaseHandle(handle);
436     return errCode;
437 }
438 
439 // use kv struct data to sync
440 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const441 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
442     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
443     std::vector<SingleVerKvEntry *> &entries) const
444 {
445     if (!timeRange.IsValid()) {
446         return -E_INVALID_ARGS;
447     }
448     query.SetSchema(storageEngine_->GetSchema());
449     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
450     if (token == nullptr) {
451         LOGE("[SingleVerNStore] Allocate continue token failed.");
452         return -E_OUT_OF_MEMORY;
453     }
454 
455     continueStmtToken = static_cast<ContinueToken>(token);
456     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
457 }
458 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const459 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
460     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
461 {
462     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
463     if (!token->CheckValid()) {
464         return -E_INVALID_ARGS;
465     }
466     RelationalSchemaObject schema = storageEngine_->GetSchema();
467     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
468     std::vector<std::string> fieldNames;
469     fieldNames.reserve(fieldInfos.size());
470     for (const auto &fieldInfo : fieldInfos) { // order by cid
471         fieldNames.push_back(fieldInfo.GetFieldName());
472     }
473     token->SetFieldNames(fieldNames);
474 
475     std::vector<DataItem> dataItems;
476     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
477     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
478         continueStmtToken = static_cast<ContinueToken>(token);
479         return errCode;
480     }
481 
482     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
483     if (innerCode != E_OK) {
484         errCode = innerCode;
485         delete token;
486         token = nullptr;
487     }
488     continueStmtToken = static_cast<ContinueToken>(token);
489     return errCode;
490 }
491 
492 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)493 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
494 {
495     std::vector<DataItem> dataItems;
496     for (const auto &itemEntry : entries) {
497         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
498         if (entry != nullptr) {
499             DataItem item;
500             item.origDev = entry->GetOrigDevice();
501             item.flag = entry->GetFlag();
502             item.timestamp = entry->GetTimestamp();
503             item.writeTimestamp = entry->GetWriteTimestamp();
504             entry->GetKey(item.key);
505             entry->GetValue(item.value);
506             entry->GetHashKey(item.hashKey);
507             dataItems.push_back(item);
508         }
509     }
510     return dataItems;
511 }
512 }
513 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)514 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
515     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
516 {
517     std::vector<DataItem> dataItems = ConvertEntries(entries);
518     return PutSyncData(object, dataItems, deviceName);
519 }
520 
521 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)522 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
523 {
524     return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
525         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
526 }
527 
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)528 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
529 {
530     return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
531 }
532 }
533 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)534 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
535     const std::string &deviceName)
536 {
537     int errCode = E_OK;
538     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
539     QueryObject query = object;
540     query.SetSchema(storageEngine_->GetSchema());
541 
542     RelationalSchemaObject remoteSchema;
543     errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
544     if (errCode != E_OK) {
545         LOGE("Find remote schema failed. err=%d", errCode);
546         return errCode;
547     }
548 
549     StoreInfo info = GetStoreInfo();
550     auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
551         remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
552     inserter.SetEntries(dataItems);
553 
554     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
555     if (handle == nullptr) {
556         return errCode;
557     }
558 
559     DBDfxAdapter::StartTracing();
560 
561     errCode = handle->SaveSyncItems(inserter);
562 
563     DBDfxAdapter::FinishTracing();
564     if (errCode == E_OK) {
565         // dataItems size > 0 now because already check before
566         // all dataItems will write into db now, so need to observer notify here
567         // if some dataItems will not write into db in the future, observer notify here need change
568         ChangedData data;
569         TriggerObserverAction(deviceName, std::move(data), false);
570     }
571 
572     ReleaseHandle(handle);
573     return errCode;
574 }
575 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)576 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
577     const std::string &deviceName)
578 {
579     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
580         LOGW("Device length is invalid for sync put");
581         return -E_INVALID_ARGS;
582     }
583 
584     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
585     if (errCode != E_OK) {
586         LOGE("[Relational] PutSyncData errCode:%d", errCode);
587         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
588     }
589     return errCode;
590 }
591 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)592 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
593 {
594     (void) deviceName;
595     (void) isNeedNotify;
596     return -E_NOT_SUPPORT;
597 }
598 
GetSchemaInfo() const599 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
600 {
601     return storageEngine_->GetSchema();
602 }
603 
GetSecurityOption(SecurityOption & option) const604 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
605 {
606     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
607     if (isCachedOption_) {
608         option = securityOption_;
609         return E_OK;
610     }
611     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
612     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
613     if (errCode == E_OK) {
614         option = securityOption_;
615         isCachedOption_ = true;
616     }
617     return errCode;
618 }
619 
NotifyRemotePushFinished(const std::string & deviceId) const620 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
621 {
622     return;
623 }
624 
625 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const626 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
627 {
628     return OS::GetCurrentSysTimeInMicrosecond(outTime);
629 }
630 
GetTablesQuery()631 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
632 {
633     auto tableNames = storageEngine_->GetSchema().GetTableNames();
634     std::vector<QuerySyncObject> queries;
635     queries.reserve(tableNames.size());
636     for (const auto &it : tableNames) {
637         queries.emplace_back(Query::Select(it));
638     }
639     return queries;
640 }
641 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)642 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
643 {
644     (void) queryObj;
645     return -E_NOT_SUPPORT;
646 }
647 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const648 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
649     const std::string &targetID, bool isPush) const
650 {
651     return E_OK;
652 }
653 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)654 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
655     const RelationalSyncStrategy &syncStrategy)
656 {
657     auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
658         DistributedTableMode::SPLIT_BY_DEVICE);
659     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
660         LOGD("No need create device table in COLLABORATION mode.");
661         return E_OK;
662     }
663 
664     int errCode = E_OK;
665     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
666     if (handle == nullptr) {
667         return errCode;
668     }
669 
670     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
671     if (errCode != E_OK) {
672         LOGE("Start transaction failed:%d", errCode);
673         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
674         ReleaseHandle(handle);
675         return errCode;
676     }
677 
678     StoreInfo info = GetStoreInfo();
679     for (const auto &[table, strategy] : syncStrategy) {
680         if (!strategy.permitSync) {
681             continue;
682         }
683 
684         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
685         if (errCode != E_OK) {
686             LOGE("Create distributed device table failed. %d", errCode);
687             break;
688         }
689     }
690 
691     if (errCode == E_OK) {
692         errCode = handle->Commit();
693     } else {
694         (void)handle->Rollback();
695     }
696 
697     ReleaseHandle(handle);
698     return errCode;
699 }
700 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)701 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
702 {
703     std::lock_guard lock(onSchemaChangedMutex_);
704     onSchemaChanged_ = callback;
705     return E_OK;
706 }
707 
NotifySchemaChanged()708 void RelationalSyncAbleStorage::NotifySchemaChanged()
709 {
710     std::lock_guard lock(onSchemaChangedMutex_);
711     if (onSchemaChanged_) {
712         LOGD("Notify relational schema was changed");
713         onSchemaChanged_();
714     }
715 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const716 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
717 {
718     algorithmSet.clear();
719     DataCompression::GetCompressionAlgo(algorithmSet);
720     return E_OK;
721 }
722 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)723 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
724     const RelationalObserverAction &action)
725 {
726     int errCode = E_OK;
727     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, action, &errCode] () mutable {
728         ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
729         ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
730         auto it = dataChangeCallbackMap_.find(connectionId);
731         if (it != dataChangeCallbackMap_.end()) {
732             if (it->second.find(observer) != it->second.end()) {
733                 LOGE("obsever already registered");
734                 errCode = -E_ALREADY_SET;
735                 return;
736             }
737             if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
738                 LOGE("The number of relational observers has been over limit");
739                 errCode = -E_MAX_LIMITS;
740                 return;
741             }
742             it->second[observer] = action;
743         } else {
744             dataChangeCallbackMap_[connectionId][observer] = action;
745         }
746         LOGI("register relational observer ok");
747     }, nullptr, &dataChangeCallbackMap_);
748     ADAPTER_WAIT(handle);
749     return errCode;
750 }
751 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)752 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
753 {
754     if (observer == nullptr) {
755         EraseDataChangeCallback(connectionId);
756         return E_OK;
757     }
758     int errCode = -E_NOT_FOUND;
759     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, &errCode] () mutable {
760         ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
761         ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
762         auto it = dataChangeCallbackMap_.find(connectionId);
763         if (it == dataChangeCallbackMap_.end()) {
764             return;
765         }
766         auto action = it->second.find(observer);
767         if (action == it->second.end()) {
768             return;
769         }
770         it->second.erase(action);
771         LOGI("unregister relational observer.");
772         if (it->second.empty()) {
773             dataChangeCallbackMap_.erase(it);
774             LOGI("observer for this delegate is zero now");
775         }
776         errCode = E_OK;
777     }, nullptr, &dataChangeCallbackMap_);
778     ADAPTER_WAIT(handle);
779     return errCode;
780 }
781 
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,const std::string & deviceName,const ChangedData & changedData,bool isChangedData,int & observerCnt)782 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
783     const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item,
784     const std::string &deviceName, const ChangedData &changedData, bool isChangedData, int &observerCnt)
785 {
786     for (auto &action : item.second) {
787         if (action.second == nullptr) {
788             continue;
789         }
790         observerCnt++;
791         ChangedData observerChangeData = changedData;
792         if (action.first != nullptr) {
793             FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
794         }
795         action.second(deviceName, std::move(observerChangeData), isChangedData);
796     }
797 }
798 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)799 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
800     ChangedData &&changedData, bool isChangedData)
801 {
802     IncObjRef(this);
803     int taskErrCode =
804         ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
805             LOGD("begin to trigger relational observer.");
806             int observerCnt = 0;
807             ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
808             ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
809             for (const auto &item : dataChangeCallbackMap_) {
810                 ExecuteDataChangeCallback(item, deviceName, changedData, isChangedData, observerCnt);
811             }
812             LOGD("relational observer size = %d", observerCnt);
813             DecObjRef(this);
814         }, &dataChangeCallbackMap_);
815     if (taskErrCode != E_OK) {
816         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
817         DecObjRef(this);
818     }
819 }
820 
RegisterHeartBeatListener(const std::function<void ()> & listener)821 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
822 {
823     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
824     heartBeatListener_ = listener;
825 }
826 
CheckAndInitQueryCondition(QueryObject & query) const827 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
828 {
829     RelationalSchemaObject schema = storageEngine_->GetSchema();
830     TableInfo table = schema.GetTable(query.GetTableName());
831     if (!table.IsValid()) {
832         LOGE("Query table is not a distributed table.");
833         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
834     }
835     if (table.GetTableSyncType() == CLOUD_COOPERATION) {
836         LOGE("cloud table mode is not support");
837         return -E_NOT_SUPPORT;
838     }
839     query.SetSchema(schema);
840 
841     int errCode = E_OK;
842     auto *handle = GetHandle(true, errCode);
843     if (handle == nullptr) {
844         return errCode;
845     }
846 
847     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
848     if (errCode != E_OK) {
849         LOGE("Check relational query condition failed. %d", errCode);
850         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
851     }
852 
853     ReleaseHandle(handle);
854     return errCode;
855 }
856 
CheckCompatible(const std::string & schema,uint8_t type) const857 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
858 {
859     // return true if is relational schema.
860     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
861 }
862 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const863 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
864     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
865 {
866     if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
867         return -E_NOT_SUPPORT;
868     }
869     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
870         LOGE("[ExecuteQuery] invalid args");
871         return -E_INVALID_ARGS;
872     }
873     int errCode = E_OK;
874     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
875     if (handle == nullptr) {
876         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
877         return errCode;
878     }
879     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
880     if (errCode != E_OK) {
881         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
882     }
883     ReleaseHandle(handle);
884     return errCode;
885 }
886 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const887 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
888     RelationalRowDataSet &dataSet, ContinueToken &token) const
889 {
890     dataSet.Clear();
891     if (token == nullptr) {
892         // start query
893         std::vector<std::string> colNames;
894         std::vector<RelationalRowData *> data;
895         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
896 
897         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
898         if (errCode != E_OK) {
899             return errCode;
900         }
901 
902         // create one token
903         token = static_cast<ContinueToken>(
904             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
905         if (token == nullptr) {
906             LOGE("ExecuteQuery OOM");
907             return -E_OUT_OF_MEMORY;
908         }
909     }
910 
911     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
912     if (!remoteToken->CheckValid()) {
913         LOGE("ExecuteQuery invalid token");
914         return -E_INVALID_ARGS;
915     }
916 
917     int errCode = remoteToken->GetData(packetSize, dataSet);
918     if (errCode == -E_UNFINISHED) {
919         errCode = E_OK;
920     } else {
921         if (errCode != E_OK) {
922             dataSet.Clear();
923         }
924         delete remoteToken;
925         remoteToken = nullptr;
926         token = nullptr;
927     }
928     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
929     return errCode;
930 }
931 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)932 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
933     uint8_t type)
934 {
935     if (ReadSchemaType(type) != SchemaType::RELATIVE) {
936         return -E_INVALID_ARGS;
937     }
938 
939     RelationalSchemaObject schemaObj;
940     int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
941     if (errCode != E_OK) {
942         LOGE("Parse remote schema failed. err=%d", errCode);
943         return errCode;
944     }
945 
946     std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
947     Key remoteSchemaKey(keyStr.begin(), keyStr.end());
948     Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
949     errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
950     if (errCode != E_OK) {
951         LOGE("Save remote schema failed. err=%d", errCode);
952         return errCode;
953     }
954 
955     return remoteDeviceSchema_.Put(deviceId, remoteSchema);
956 }
957 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)958 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
959 {
960     if (schemaObj.IsSchemaValid()) {
961         return -E_INVALID_ARGS;
962     }
963 
964     std::string remoteSchema;
965     int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
966     if (errCode == -E_NOT_FOUND) {
967         LOGW("Get remote device schema miss cached.");
968         std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
969         Key remoteSchemaKey(keyStr.begin(), keyStr.end());
970         Value remoteSchemaBuff;
971         errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
972         if (errCode != E_OK) {
973             LOGE("Get remote device schema from meta failed. err=%d", errCode);
974             return errCode;
975         }
976         remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
977         errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
978     }
979 
980     if (errCode != E_OK) {
981         LOGE("Get remote device schema failed. err=%d", errCode);
982         return errCode;
983     }
984 
985     errCode = schemaObj.ParseFromSchemaString(remoteSchema);
986     if (errCode != E_OK) {
987         LOGE("Parse remote schema failed. err=%d", errCode);
988     }
989     return errCode;
990 }
991 
SetReusedHandle(StorageExecutor * handle)992 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
993 {
994     std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
995     reusedHandle_ = handle;
996 }
997 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const998 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
999 {
1000     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
1001     delete remoteToken;
1002     remoteToken = nullptr;
1003     token = nullptr;
1004 }
1005 
GetStoreInfo() const1006 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
1007 {
1008     StoreInfo info = {
1009         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
1010         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1011         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1012     };
1013     return info;
1014 }
1015 
StartTransaction(TransactType type)1016 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1017 {
1018     if (storageEngine_ == nullptr) {
1019         return -E_INVALID_DB;
1020     }
1021     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1022     if (transactionHandle_ != nullptr) {
1023         LOGD("Transaction started already.");
1024         return -E_TRANSACT_STATE;
1025     }
1026     int errCode = E_OK;
1027     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1028         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1029     if (handle == nullptr) {
1030         ReleaseHandle(handle);
1031         return errCode;
1032     }
1033     errCode = handle->StartTransaction(type);
1034     if (errCode != E_OK) {
1035         ReleaseHandle(handle);
1036         return errCode;
1037     }
1038     transactionHandle_ = handle;
1039     return errCode;
1040 }
1041 
Commit()1042 int RelationalSyncAbleStorage::Commit()
1043 {
1044     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1045     if (transactionHandle_ == nullptr) {
1046         LOGE("relation database is null or the transaction has not been started");
1047         return -E_INVALID_DB;
1048     }
1049     int errCode = transactionHandle_->Commit();
1050     ReleaseHandle(transactionHandle_);
1051     transactionHandle_ = nullptr;
1052     LOGD("connection commit transaction!");
1053     return errCode;
1054 }
1055 
Rollback()1056 int RelationalSyncAbleStorage::Rollback()
1057 {
1058     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1059     if (transactionHandle_ == nullptr) {
1060         LOGE("Invalid handle for rollback or the transaction has not been started.");
1061         return -E_INVALID_DB;
1062     }
1063 
1064     int errCode = transactionHandle_->Rollback();
1065     ReleaseHandle(transactionHandle_);
1066     transactionHandle_ = nullptr;
1067     LOGI("connection rollback transaction!");
1068     return errCode;
1069 }
1070 
GetAllUploadCount(const QuerySyncObject & query,const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1071 int RelationalSyncAbleStorage::GetAllUploadCount(const QuerySyncObject &query,
1072     const std::vector<Timestamp> &timestampVec, bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1073 {
1074     int errCode = E_OK;
1075     auto *handle = GetHandleExpectTransaction(false, errCode);
1076     if (handle == nullptr) {
1077         return errCode;
1078     }
1079     QuerySyncObject queryObj = query;
1080     queryObj.SetSchema(GetSchemaInfo());
1081     errCode = handle->GetAllUploadCount(timestampVec, isCloudForcePush, isCompensatedTask, queryObj, count);
1082     if (transactionHandle_ == nullptr) {
1083         ReleaseHandle(handle);
1084     }
1085     return errCode;
1086 }
1087 
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)1088 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp,
1089     bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
1090 {
1091     int errCode = E_OK;
1092     auto *handle = GetHandleExpectTransaction(false, errCode);
1093     if (handle == nullptr) {
1094         return errCode;
1095     }
1096     QuerySyncObject queryObj = query;
1097     queryObj.SetSchema(GetSchemaInfo());
1098     errCode = handle->GetUploadCount(timestamp, isCloudForcePush, isCompensatedTask, queryObj, count);
1099     if (transactionHandle_ == nullptr) {
1100         ReleaseHandle(handle);
1101     }
1102     return errCode;
1103 }
1104 
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1105 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1106     const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1107 {
1108     if (transactionHandle_ == nullptr) {
1109         LOGE("the transaction has not been started");
1110         return -E_INVALID_DB;
1111     }
1112     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1113     QuerySyncObject query = querySyncObject;
1114     query.SetSchema(GetSchemaInfo());
1115     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1116     if (token == nullptr) {
1117         LOGE("[SingleVerNStore] Allocate continue token failed.");
1118         return -E_OUT_OF_MEMORY;
1119     }
1120     token->SetCloudTableSchema(tableSchema);
1121     continueStmtToken = static_cast<ContinueToken>(token);
1122     return GetCloudDataNext(continueStmtToken, cloudDataResult);
1123 }
1124 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1125 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1126     CloudSyncData &cloudDataResult)
1127 {
1128     if (continueStmtToken == nullptr) {
1129         return -E_INVALID_ARGS;
1130     }
1131     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1132     if (!token->CheckValid()) {
1133         return -E_INVALID_ARGS;
1134     }
1135     if (transactionHandle_ == nullptr) {
1136         LOGE("the transaction has not been started, release the token");
1137         ReleaseCloudDataToken(continueStmtToken);
1138         return -E_INVALID_DB;
1139     }
1140     cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1141     auto config = GetCloudSyncConfig();
1142     transactionHandle_->SetUploadConfig(config.maxUploadCount, config.maxUploadSize);
1143     int errCode = transactionHandle_->GetSyncCloudData(uploadRecorder_, cloudDataResult, *token);
1144     LOGI("mode:%d upload data, ins:%zu, upd:%zu, del:%zu, lock:%zu", cloudDataResult.mode,
1145         cloudDataResult.insData.extend.size(), cloudDataResult.updData.extend.size(),
1146         cloudDataResult.delData.extend.size(), cloudDataResult.lockData.extend.size());
1147     if (errCode != -E_UNFINISHED) {
1148         delete token;
1149         token = nullptr;
1150     }
1151     continueStmtToken = static_cast<ContinueToken>(token);
1152     if (errCode != E_OK && errCode != -E_UNFINISHED) {
1153         return errCode;
1154     }
1155     int fillRefGidCode = FillReferenceData(cloudDataResult);
1156     return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1157 }
1158 
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)1159 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1160     bool isCloudForcePush, bool isCompensatedTask, std::vector<std::string> &cloudGid)
1161 {
1162     int errCode = E_OK;
1163     auto *handle = GetHandle(false, errCode);
1164     if (handle == nullptr) {
1165         return errCode;
1166     }
1167     Timestamp beginTime = 0u;
1168     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1169     QuerySyncObject query = querySyncObject;
1170     query.SetSchema(GetSchemaInfo());
1171     handle->SetTableSchema(tableSchema);
1172     errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, isCompensatedTask, cloudGid);
1173     ReleaseHandle(handle);
1174     if (errCode != E_OK) {
1175         LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1176     }
1177     return errCode;
1178 }
1179 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1180 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1181 {
1182     if (continueStmtToken == nullptr) {
1183         return E_OK;
1184     }
1185     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1186     if (!token->CheckValid()) {
1187         return E_OK;
1188     }
1189     int errCode = token->ReleaseCloudStatement();
1190     delete token;
1191     token = nullptr;
1192     return errCode;
1193 }
1194 
GetSchemaFromDB(RelationalSchemaObject & schema)1195 int RelationalSyncAbleStorage::GetSchemaFromDB(RelationalSchemaObject &schema)
1196 {
1197     Key schemaKey;
1198     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
1199     Value schemaVal;
1200     int errCode = GetMetaData(schemaKey, schemaVal);
1201     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1202         LOGE("Get relational schema from DB failed. %d", errCode);
1203         return errCode;
1204     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
1205         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
1206         return -E_NOT_FOUND;
1207     }
1208     std::string schemaStr;
1209     DBCommon::VectorToString(schemaVal, schemaStr);
1210     errCode = schema.ParseFromSchemaString(schemaStr);
1211     if (errCode != E_OK) {
1212         LOGE("Parse schema string from DB failed.");
1213         return errCode;
1214     }
1215     storageEngine_->SetSchema(schema);
1216     return errCode;
1217 }
1218 
ChkSchema(const TableName & tableName)1219 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1220 {
1221     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1222     RelationalSchemaObject localSchema = GetSchemaInfo();
1223     int errCode = schemaMgr_.ChkSchema(tableName, localSchema);
1224     if (errCode == -E_SCHEMA_MISMATCH) {
1225         LOGI("Get schema by tableName %s failed.", DBCommon::STR_MASK(tableName));
1226         RelationalSchemaObject newSchema;
1227         errCode = GetSchemaFromDB(newSchema);
1228         if (errCode != E_OK) {
1229             LOGE("Get schema from db when check schema. err: %d", errCode);
1230             return -E_SCHEMA_MISMATCH;
1231         }
1232         errCode = schemaMgr_.ChkSchema(tableName, newSchema);
1233     }
1234     return errCode;
1235 }
1236 
SetCloudDbSchema(const DataBaseSchema & schema)1237 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1238 {
1239     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1240     RelationalSchemaObject localSchema = GetSchemaInfo();
1241     schemaMgr_.SetCloudDbSchema(schema, localSchema);
1242     return E_OK;
1243 }
1244 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1245 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1246     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1247 {
1248     if (transactionHandle_ == nullptr) {
1249         LOGE(" the transaction has not been started");
1250         return -E_INVALID_DB;
1251     }
1252 
1253     return GetInfoByPrimaryKeyOrGidInner(transactionHandle_, tableName, vBucket, dataInfoWithLog, assetInfo);
1254 }
1255 
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1256 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1257     const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1258 {
1259     if (handle == nullptr) {
1260         return -E_INVALID_DB;
1261     }
1262     TableSchema tableSchema;
1263     int errCode = GetCloudTableSchema(tableName, tableSchema);
1264     if (errCode != E_OK) {
1265         LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1266         return errCode;
1267     }
1268     RelationalSchemaObject localSchema = GetSchemaInfo();
1269     handle->SetLocalSchema(localSchema);
1270     return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1271 }
1272 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1273 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1274 {
1275     if (transactionHandle_ == nullptr) {
1276         LOGE(" the transaction has not been started");
1277         return -E_INVALID_DB;
1278     }
1279     return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1280 }
1281 
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1282 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1283     const std::string &tableName, DownloadData &downloadData)
1284 {
1285     TableSchema tableSchema;
1286     int errCode = GetCloudTableSchema(tableName, tableSchema);
1287     if (errCode != E_OK) {
1288         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1289         return errCode;
1290     }
1291     RelationalSchemaObject localSchema = GetSchemaInfo();
1292     handle->SetLocalSchema(localSchema);
1293     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1294     handle->SetLogicDelete(IsCurrentLogicDelete());
1295     errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1296     handle->SetLogicDelete(false);
1297     return errCode;
1298 }
1299 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1300 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1301 {
1302     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1303     cloudSchema = schemaMgr_.GetCloudDbSchema();
1304     return E_OK;
1305 }
1306 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1307 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1308     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1309 {
1310     if (transactionHandle_ == nullptr) {
1311         LOGE("the transaction has not been started");
1312         return -E_INVALID_DB;
1313     }
1314     transactionHandle_->SetLogicDelete(logicDelete_);
1315     std::vector<std::string> notifyTableList;
1316     int errCode = transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets, notifyTableList);
1317     if (!notifyTableList.empty()) {
1318         for (auto notifyTableName : notifyTableList) {
1319             ChangedData changedData;
1320             changedData.type = ChangedDataType::DATA;
1321             changedData.tableName = notifyTableName;
1322             std::vector<DistributedDB::Type> dataVec;
1323             DistributedDB::Type type;
1324             if (mode == FLAG_ONLY) {
1325                 type = std::string(CloudDbConstant::FLAG_ONLY_MODE_NOTIFY);
1326             } else {
1327                 type = std::string(CloudDbConstant::FLAG_AND_DATA_MODE_NOTIFY);
1328             }
1329             dataVec.push_back(type);
1330             changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec);
1331             TriggerObserverAction("CLOUD", std::move(changedData), true);
1332         }
1333     }
1334     transactionHandle_->SetLogicDelete(false);
1335     return errCode;
1336 }
1337 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1338 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1339 {
1340     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1341     return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1342 }
1343 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1344 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1345     bool isDownloadSuccess)
1346 {
1347     if (storageEngine_ == nullptr) {
1348         return -E_INVALID_DB;
1349     }
1350     if (transactionHandle_ == nullptr) {
1351         LOGE("the transaction has not been started when fill asset for download.");
1352         return -E_INVALID_DB;
1353     }
1354     TableSchema tableSchema;
1355     int errCode = GetCloudTableSchema(tableName, tableSchema);
1356     if (errCode != E_OK) {
1357         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1358         return errCode;
1359     }
1360     errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1361     if (errCode != E_OK) {
1362         LOGE("fill cloud asset for download failed.%d", errCode);
1363     }
1364     return errCode;
1365 }
1366 
SetLogTriggerStatus(bool status)1367 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1368 {
1369     int errCode = E_OK;
1370     auto *handle = GetHandleExpectTransaction(false, errCode);
1371     if (handle == nullptr) {
1372         return errCode;
1373     }
1374     errCode = handle->SetLogTriggerStatus(status);
1375     if (transactionHandle_ == nullptr) {
1376         ReleaseHandle(handle);
1377     }
1378     return errCode;
1379 }
1380 
SetCursorIncFlag(bool flag)1381 int RelationalSyncAbleStorage::SetCursorIncFlag(bool flag)
1382 {
1383     int errCode = E_OK;
1384     auto *handle = GetHandleExpectTransaction(false, errCode);
1385     if (handle == nullptr) {
1386         return errCode;
1387     }
1388     errCode = handle->SetCursorIncFlag(flag);
1389     if (transactionHandle_ == nullptr) {
1390         ReleaseHandle(handle);
1391     }
1392     return errCode;
1393 }
1394 
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1395 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1396     bool ignoreEmptyGid)
1397 {
1398     if (storageEngine_ == nullptr) {
1399         return -E_INVALID_DB;
1400     }
1401     int errCode = E_OK;
1402     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1403         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1404     if (writeHandle == nullptr) {
1405         return errCode;
1406     }
1407     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1408     if (errCode != E_OK) {
1409         ReleaseHandle(writeHandle);
1410         return errCode;
1411     }
1412     errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1413     if (errCode != E_OK) {
1414         LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1415         writeHandle->Rollback();
1416         ReleaseHandle(writeHandle);
1417         return errCode;
1418     }
1419     errCode = writeHandle->Commit();
1420     ReleaseHandle(writeHandle);
1421     return errCode;
1422 }
1423 
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1424 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1425 {
1426     syncAbleEngine_ = syncAbleEngine;
1427 }
1428 
GetIdentify() const1429 std::string RelationalSyncAbleStorage::GetIdentify() const
1430 {
1431     if (storageEngine_ == nullptr) {
1432         LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1433         return "";
1434     }
1435     return storageEngine_->GetIdentifier();
1436 }
1437 
EraseDataChangeCallback(uint64_t connectionId)1438 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1439 {
1440     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1441         ConcurrentAdapter::AdapterAutoLock(dataChangeDeviceMutex_);
1442         ResFinalizer finalizer([this]() { ConcurrentAdapter::AdapterAutoUnLock(dataChangeDeviceMutex_); });
1443         auto it = dataChangeCallbackMap_.find(connectionId);
1444         if (it != dataChangeCallbackMap_.end()) {
1445             dataChangeCallbackMap_.erase(it);
1446             LOGI("erase all observer for this delegate.");
1447         }
1448     }, nullptr, &dataChangeCallbackMap_);
1449     ADAPTER_WAIT(handle);
1450 }
1451 
ReleaseContinueToken(ContinueToken & continueStmtToken) const1452 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1453 {
1454     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1455     if (token == nullptr || !(token->CheckValid())) {
1456         LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1457         return;
1458     }
1459     delete token;
1460     continueStmtToken = nullptr;
1461 }
1462 
CheckQueryValid(const QuerySyncObject & query)1463 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1464 {
1465     int errCode = E_OK;
1466     auto *handle = GetHandle(false, errCode);
1467     if (handle == nullptr) {
1468         return errCode;
1469     }
1470     errCode = handle->CheckQueryObjectLegal(query);
1471     if (errCode != E_OK) {
1472         ReleaseHandle(handle);
1473         return errCode;
1474     }
1475     QuerySyncObject queryObj = query;
1476     queryObj.SetSchema(GetSchemaInfo());
1477     int64_t count = 0;
1478     errCode = handle->GetUploadCount(UINT64_MAX, false, false, queryObj, count);
1479     ReleaseHandle(handle);
1480     if (errCode != E_OK) {
1481         LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1482         return -E_INVALID_ARGS;
1483     }
1484     return errCode;
1485 }
1486 
CreateTempSyncTrigger(const std::string & tableName)1487 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1488 {
1489     int errCode = E_OK;
1490     auto *handle = GetHandle(true, errCode);
1491     if (handle == nullptr) {
1492         return errCode;
1493     }
1494     errCode = CreateTempSyncTriggerInner(handle, tableName, true);
1495     ReleaseHandle(handle);
1496     if (errCode != E_OK) {
1497         LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1498     }
1499     return errCode;
1500 }
1501 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1502 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1503     ChangeProperties &changeProperties)
1504 {
1505     int errCode = E_OK;
1506     auto *handle = GetHandle(false, errCode);
1507     if (handle == nullptr) {
1508         return errCode;
1509     }
1510     errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1511     ReleaseHandle(handle);
1512     if (errCode != E_OK) {
1513         LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1514     }
1515     return errCode;
1516 }
1517 
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1518 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1519 {
1520     if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1521         changedData.field = {};
1522         for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1523             changedData.primaryData[i].clear();
1524         }
1525     }
1526     if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1527         changedData.properties = {};
1528     }
1529 }
1530 
ClearAllTempSyncTrigger()1531 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1532 {
1533     int errCode = E_OK;
1534     auto *handle = GetHandle(true, errCode);
1535     if (handle == nullptr) {
1536         return errCode;
1537     }
1538     errCode = handle->ClearAllTempSyncTrigger();
1539     ReleaseHandle(handle);
1540     if (errCode != E_OK) {
1541         LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1542     }
1543     return errCode;
1544 }
1545 
FillReferenceData(CloudSyncData & syncData)1546 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1547 {
1548     std::map<int64_t, Entries> referenceGid;
1549     int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1550     if (errCode != E_OK) {
1551         LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1552         return errCode;
1553     }
1554     errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1555     if (errCode != E_OK) {
1556         return errCode;
1557     }
1558     referenceGid.clear();
1559     errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1560     if (errCode != E_OK) {
1561         LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1562         return errCode;
1563     }
1564     return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1565 }
1566 
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1567 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1568     const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1569 {
1570     if (referenceGid.empty()) {
1571         return E_OK;
1572     }
1573     int ignoredCount = 0;
1574     for (size_t index = 0u; index < rowid.size(); index++) {
1575         if (index >= extend.size()) {
1576             LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1577             return -E_UNEXPECTED_DATA;
1578         }
1579         int64_t rowId = rowid[index];
1580         if (referenceGid.find(rowId) == referenceGid.end()) {
1581             // current data miss match reference data, we ignored it
1582             ignoredCount++;
1583             continue;
1584         }
1585         extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1586     }
1587     if (ignoredCount != 0) {
1588         LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1589     }
1590     return E_OK;
1591 }
1592 
IsSharedTable(const std::string & tableName)1593 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1594 {
1595     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1596     return schemaMgr_.IsSharedTable(tableName);
1597 }
1598 
GetSharedTableOriginNames()1599 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1600 {
1601     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1602     return schemaMgr_.GetSharedTableOriginNames();
1603 }
1604 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1605 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1606     std::map<int64_t, Entries> &referenceGid)
1607 {
1608     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1609     int errCode = GetTableReference(tableName, tableReference);
1610     if (errCode != E_OK) {
1611         return errCode;
1612     }
1613     if (tableReference.empty()) {
1614         LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1615         return E_OK;
1616     }
1617     auto *handle = GetHandle(false, errCode);
1618     if (handle == nullptr) {
1619         return errCode;
1620     }
1621     errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1622     ReleaseHandle(handle);
1623     return errCode;
1624 }
1625 
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1626 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1627     std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1628 {
1629     if (storageEngine_ == nullptr) {
1630         LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1631         return -E_INVALID_DB;
1632     }
1633     RelationalSchemaObject schema = storageEngine_->GetSchema();
1634     auto referenceProperty = schema.GetReferenceProperty();
1635     if (referenceProperty.empty()) {
1636         return E_OK;
1637     }
1638     auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1639     if (errCode != E_OK) {
1640         return errCode;
1641     }
1642     for (const auto &property : referenceProperty) {
1643         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1644             if (!IsSharedTable(tableName)) {
1645                 reference[property.targetTableName].push_back(property);
1646                 continue;
1647             }
1648             TableReferenceProperty tableReference;
1649             tableReference.sourceTableName = tableName;
1650             tableReference.columns = property.columns;
1651             tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1652             auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1653             if (ret != E_OK) {
1654                 return ret;
1655             }
1656             tableReference.targetTableName = sharedTargetTable;
1657             reference[tableReference.targetTableName].push_back(tableReference);
1658         }
1659     }
1660     return E_OK;
1661 }
1662 
GetSourceTableName(const std::string & tableName)1663 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1664 {
1665     std::pair<std::string, int> res = { "", E_OK };
1666     std::shared_ptr<DataBaseSchema> cloudSchema;
1667     (void) GetCloudDbSchema(cloudSchema);
1668     if (cloudSchema == nullptr) {
1669         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1670         return { "", -E_INTERNAL_ERROR };
1671     }
1672     for (const auto &table : cloudSchema->tables) {
1673         if (CloudStorageUtils::IsSharedTable(table)) {
1674             continue;
1675         }
1676         if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1677             DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1678             res.first = table.name;
1679             break;
1680         }
1681     }
1682     if (res.first.empty()) {
1683         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1684         res.second = -E_SCHEMA_MISMATCH;
1685     }
1686     return res;
1687 }
1688 
GetSharedTargetTableName(const std::string & tableName)1689 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1690 {
1691     std::pair<std::string, int> res = { "", E_OK };
1692     std::shared_ptr<DataBaseSchema> cloudSchema;
1693     (void) GetCloudDbSchema(cloudSchema);
1694     if (cloudSchema == nullptr) {
1695         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1696         return { "", -E_INTERNAL_ERROR };
1697     }
1698     for (const auto &table : cloudSchema->tables) {
1699         if (CloudStorageUtils::IsSharedTable(table)) {
1700             continue;
1701         }
1702         if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1703             res.first = table.sharedTableName;
1704             break;
1705         }
1706     }
1707     if (res.first.empty()) {
1708         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1709         res.second = -E_SCHEMA_MISMATCH;
1710     }
1711     return res;
1712 }
1713 
SetLogicDelete(bool logicDelete)1714 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1715 {
1716     logicDelete_ = logicDelete;
1717     LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1718 }
1719 
SetCloudTaskConfig(const CloudTaskConfig & config)1720 void RelationalSyncAbleStorage::SetCloudTaskConfig(const CloudTaskConfig &config)
1721 {
1722     allowLogicDelete_ = config.allowLogicDelete;
1723     LOGD("[RelationalSyncAbleStorage] allow logic delete %d", static_cast<int>(config.allowLogicDelete));
1724 }
1725 
IsCurrentLogicDelete() const1726 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1727 {
1728     return allowLogicDelete_ && logicDelete_;
1729 }
1730 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1731 std::pair<int, uint32_t> RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema,
1732     const std::string &gid, const Bytes &hashKey, VBucket &assets)
1733 {
1734     if (gid.empty() && hashKey.empty()) {
1735         LOGE("both gid and hashKey are empty.");
1736         return { -E_INVALID_ARGS, static_cast<uint32_t>(LockStatus::UNLOCK) };
1737     }
1738     if (transactionHandle_ == nullptr) {
1739         LOGE("the transaction has not been started");
1740         return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
1741     }
1742     auto [errCode, status] = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1743     if (errCode != E_OK && errCode != -E_NOT_FOUND && errCode != -E_CLOUD_GID_MISMATCH) {
1744         LOGE("get assets by gid or hashKey failed. %d", errCode);
1745     }
1746     return { errCode, status };
1747 }
1748 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1749 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1750 {
1751     int errCode = E_OK;
1752     auto *wHandle = GetHandle(true, errCode);
1753     if (wHandle == nullptr) {
1754         return errCode;
1755     }
1756     wHandle->SetIAssetLoader(loader);
1757     ReleaseHandle(wHandle);
1758     return errCode;
1759 }
1760 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1761 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1762     const std::vector<VBucket> &records)
1763 {
1764     int errCode = E_OK;
1765     auto *handle = GetHandle(true, errCode);
1766     if (errCode != E_OK) {
1767         return errCode;
1768     }
1769     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1770     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1771     errCode = UpsertDataInner(handle, tableName, records);
1772     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1773     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1774     ReleaseHandle(handle);
1775     return errCode;
1776 }
1777 
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1778 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1779     const std::string &tableName, const std::vector<VBucket> &records)
1780 {
1781     int errCode = E_OK;
1782     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1783     if (errCode != E_OK) {
1784         LOGE("[RDBStorageEngine] start transaction failed %d when upsert data", errCode);
1785         return errCode;
1786     }
1787     errCode = CreateTempSyncTriggerInner(handle, tableName);
1788     if (errCode == E_OK) {
1789         errCode = UpsertDataInTransaction(handle, tableName, records);
1790         (void) handle->ClearAllTempSyncTrigger();
1791     }
1792     if (errCode == E_OK) {
1793         errCode = handle->Commit();
1794         if (errCode != E_OK) {
1795             LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1796         }
1797     } else {
1798         int ret = handle->Rollback();
1799         if (ret != E_OK) {
1800             LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1801         }
1802     }
1803     return errCode;
1804 }
1805 
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1806 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1807     const std::string &tableName, const std::vector<VBucket> &records)
1808 {
1809     TableSchema tableSchema;
1810     int errCode = GetCloudTableSchema(tableName, tableSchema);
1811     if (errCode != E_OK) {
1812         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1813         return errCode;
1814     }
1815     TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1816     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1817     std::set<std::vector<uint8_t>> primaryKeys;
1818     DownloadData downloadData;
1819     for (const auto &record : records) {
1820         DataInfoWithLog dataInfoWithLog;
1821         VBucket assetInfo;
1822         auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1823             tableSchema, localTable, pkMap, false);
1824         if (errorCode != E_OK) {
1825             return errorCode;
1826         }
1827         errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1828         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1829             return errCode;
1830         }
1831         VBucket recordCopy = record;
1832         if ((errCode == -E_NOT_FOUND ||
1833             (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1834             primaryKeys.find(hashValue) == primaryKeys.end()) {
1835             downloadData.opType.push_back(OpType::INSERT);
1836             auto currentTime = TimeHelper::GetSysCurrentTime();
1837             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1838             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1839             primaryKeys.insert(hashValue);
1840         } else {
1841             downloadData.opType.push_back(OpType::UPDATE);
1842             recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1843             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1844             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1845             recordCopy[CloudDbConstant::SHARING_RESOURCE_FIELD] = dataInfoWithLog.logInfo.sharingResource;
1846             recordCopy[CloudDbConstant::VERSION_FIELD] = dataInfoWithLog.logInfo.version;
1847         }
1848         downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1849         downloadData.data.push_back(std::move(recordCopy));
1850     }
1851     return PutCloudSyncDataInner(handle, tableName, downloadData);
1852 }
1853 
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)1854 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, bool recordConflict,
1855     const LogInfo &logInfo)
1856 {
1857     if (transactionHandle_ == nullptr) {
1858         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1859         return -E_INVALID_DB;
1860     }
1861     std::string sql = CloudStorageUtils::GetUpdateRecordFlagSql(tableName, recordConflict, logInfo);
1862     return transactionHandle_->UpdateRecordFlag(tableName, sql, logInfo);
1863 }
1864 
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1865 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1866     OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1867 {
1868     TableSchema tableSchema;
1869     int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1870     if (errCode != E_OK) {
1871         LOGE("get table schema failed when fill log and asset. %d", errCode);
1872         return errCode;
1873     }
1874     errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1875     if (errCode != E_OK) {
1876         return errCode;
1877     }
1878     if (opType == OpType::INSERT) {
1879         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.insData, CloudWaterType::INSERT);
1880     } else if (opType == OpType::UPDATE) {
1881         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.updData, CloudWaterType::UPDATE);
1882     } else if (opType == OpType::DELETE) {
1883         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.delData, CloudWaterType::DELETE);
1884     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1885         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.lockData, CloudWaterType::BUTT, true);
1886     }
1887     return errCode;
1888 }
1889 
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const CloudSyncBatch & updateData,const CloudWaterType & type,bool isLock)1890 int RelationalSyncAbleStorage::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1891     const std::string &tableName, const CloudSyncBatch &updateData, const CloudWaterType &type, bool isLock)
1892 {
1893     if (updateData.timestamp.size() != updateData.extend.size()) {
1894         LOGE("the num of extend:%zu and timestamp:%zu is not equal.",
1895             updateData.extend.size(), updateData.timestamp.size());
1896         return -E_INVALID_ARGS;
1897     }
1898     for (size_t i = 0; i < updateData.extend.size(); ++i) {
1899         const auto &record = updateData.extend[i];
1900         if (DBCommon::IsRecordError(record) || DBCommon::IsRecordAssetsMissing(record) ||
1901             DBCommon::IsRecordVersionConflict(record) || isLock) {
1902             if (DBCommon::IsRecordAssetsMissing(record)) {
1903                 LOGI("[RDBStorage][UpdateRecordFlagAfterUpload] Record assets missing, skip update.");
1904             }
1905             int errCode = handle->UpdateRecordStatus(tableName, CloudDbConstant::TO_LOCAL_CHANGE,
1906                 updateData.hashKey[i]);
1907             if (errCode != E_OK) {
1908                 LOGE("[RDBStorage] Update record status failed in index %zu", i);
1909                 return errCode;
1910             }
1911             continue;
1912         }
1913         const auto &rowId = updateData.rowid[i];
1914         std::string cloudGid;
1915         (void)CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, record, cloudGid);
1916         LogInfo logInfo;
1917         logInfo.cloudGid = cloudGid;
1918         logInfo.timestamp = updateData.timestamp[i];
1919         logInfo.dataKey = rowId;
1920         logInfo.hashKey = updateData.hashKey[i];
1921         std::string sql = CloudStorageUtils::GetUpdateRecordFlagSqlUpload(tableName, DBCommon::IsRecordIgnored(record),
1922             logInfo, record, type);
1923         int errCode = handle->UpdateRecordFlag(tableName, sql, logInfo);
1924         if (errCode != E_OK) {
1925             LOGE("[RDBStorage] Update record flag failed in index %zu", i);
1926             return errCode;
1927         }
1928         handle->MarkFlagAsUploadFinished(tableName, updateData.hashKey[i], updateData.timestamp[i]);
1929         uploadRecorder_.RecordUploadRecord(tableName, logInfo.hashKey, type, updateData.timestamp[i]);
1930     }
1931     return E_OK;
1932 }
1933 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery,std::vector<std::string> & users)1934 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery,
1935     std::vector<std::string> &users)
1936 {
1937     std::vector<TableSchema> tables;
1938     int errCode = GetCloudTableWithoutShared(tables);
1939     if (errCode != E_OK) {
1940         return errCode;
1941     }
1942     if (tables.empty()) {
1943         LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1944         return E_OK;
1945     }
1946     auto *handle = GetHandle(true, errCode);
1947     if (errCode != E_OK) {
1948         return errCode;
1949     }
1950     errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery);
1951     ReleaseHandle(handle);
1952     return errCode;
1953 }
1954 
ClearUnLockingNoNeedCompensated()1955 int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated()
1956 {
1957     std::vector<TableSchema> tables;
1958     int errCode = GetCloudTableWithoutShared(tables);
1959     if (errCode != E_OK) {
1960         return errCode;
1961     }
1962     if (tables.empty()) {
1963         LOGI("[RDBStorage] Table is empty, no need to clear unlocking status");
1964         return E_OK;
1965     }
1966     auto *handle = GetHandle(true, errCode);
1967     if (errCode != E_OK) {
1968         return errCode;
1969     }
1970     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1971     if (errCode != E_OK) {
1972         ReleaseHandle(handle);
1973         return errCode;
1974     }
1975     for (const auto &table : tables) {
1976         errCode = handle->ClearUnLockingStatus(table.name);
1977         if (errCode != E_OK) {
1978             LOGW("[ClearUnLockingNoNeedCompensated] clear unlocking status failed, continue! errCode=%d", errCode);
1979         }
1980     }
1981     errCode = handle->Commit();
1982     if (errCode != E_OK) {
1983         LOGE("[ClearUnLockingNoNeedCompensated] commit failed %d when clear unlocking status", errCode);
1984     }
1985     ReleaseHandle(handle);
1986     return errCode;
1987 }
1988 
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1989 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1990 {
1991     const auto tableInfos = GetSchemaInfo().GetTables();
1992     for (const auto &[tableName, info] : tableInfos) {
1993         if (info.GetSharedTableMark()) {
1994             continue;
1995         }
1996         TableSchema schema;
1997         int errCode = GetCloudTableSchema(tableName, schema);
1998         if (errCode == -E_NOT_FOUND) {
1999             continue;
2000         }
2001         if (errCode != E_OK) {
2002             LOGW("[RDBStorage] Get cloud table failed %d", errCode);
2003             return errCode;
2004         }
2005         tables.push_back(schema);
2006     }
2007     return E_OK;
2008 }
2009 
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery)2010 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2011     const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery)
2012 {
2013     int errCode = E_OK;
2014     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2015     if (errCode != E_OK) {
2016         return errCode;
2017     }
2018     for (const auto &table : tables) {
2019         if (!CheckTableSupportCompensatedSync(table)) {
2020             continue;
2021         }
2022 
2023         std::vector<VBucket> syncDataPk;
2024         errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk);
2025         if (errCode != E_OK) {
2026             LOGW("[RDBStorageEngine] Get wait compensated sync date failed, continue! errCode=%d", errCode);
2027             errCode = E_OK;
2028             continue;
2029         }
2030         if (syncDataPk.empty()) {
2031             // no data need to compensated sync
2032             continue;
2033         }
2034         QuerySyncObject syncObject;
2035         errCode = CloudStorageUtils::GetSyncQueryByPk(table.name, syncDataPk, false, syncObject);
2036         if (errCode != E_OK) {
2037             LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
2038             errCode = E_OK;
2039             continue;
2040         }
2041         syncQuery.push_back(syncObject);
2042     }
2043     if (errCode == E_OK) {
2044         errCode = handle->Commit();
2045         if (errCode != E_OK) {
2046             LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
2047         }
2048     } else {
2049         int ret = handle->Rollback();
2050         if (ret != E_OK) {
2051             LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
2052         }
2053     }
2054     return errCode;
2055 }
2056 
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,bool flag)2057 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
2058     const std::string &tableName, bool flag)
2059 {
2060     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
2061     if (trackerTable.IsEmpty()) {
2062         trackerTable.SetTableName(tableName);
2063     }
2064     return handle->CreateTempSyncTrigger(trackerTable, flag);
2065 }
2066 
CheckTableSupportCompensatedSync(const TableSchema & table)2067 bool RelationalSyncAbleStorage::CheckTableSupportCompensatedSync(const TableSchema &table)
2068 {
2069     auto it = std::find_if(table.fields.begin(), table.fields.end(), [](const auto &field) {
2070         return field.primary && (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets> ||
2071             field.type == TYPE_INDEX<Bytes>);
2072     });
2073     if (it != table.fields.end()) {
2074         LOGI("[RDBStorageEngine] Table contain not support pk field type, ignored");
2075         return false;
2076     }
2077     // check whether reference exist
2078     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2079     int errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
2080     if (errCode != E_OK) {
2081         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2082         return false;
2083     }
2084     if (!tableReference.empty()) {
2085         LOGI("[RDBStorageEngine] current table exist reference property");
2086         return false;
2087     }
2088     return true;
2089 }
2090 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)2091 int RelationalSyncAbleStorage::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
2092     const std::set<std::string> &gidFilters)
2093 {
2094     if (transactionHandle_ == nullptr) {
2095         LOGE("the transaction has not been started");
2096         return -E_INVALID_DB;
2097     }
2098     int errCode = transactionHandle_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
2099     if (errCode != E_OK) {
2100         LOGE("[RelationalSyncAbleStorage] mark flag as consistent failed.%d", errCode);
2101     }
2102     return errCode;
2103 }
2104 
GetCloudSyncConfig() const2105 CloudSyncConfig RelationalSyncAbleStorage::GetCloudSyncConfig() const
2106 {
2107     std::lock_guard<std::mutex> autoLock(configMutex_);
2108     return cloudSyncConfig_;
2109 }
2110 
SetCloudSyncConfig(const CloudSyncConfig & config)2111 void RelationalSyncAbleStorage::SetCloudSyncConfig(const CloudSyncConfig &config)
2112 {
2113     std::lock_guard<std::mutex> autoLock(configMutex_);
2114     cloudSyncConfig_ = config;
2115 }
2116 
IsTableExistReference(const std::string & table)2117 bool RelationalSyncAbleStorage::IsTableExistReference(const std::string &table)
2118 {
2119     // check whether reference exist
2120     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
2121     int errCode = RelationalSyncAbleStorage::GetTableReference(table, tableReference);
2122     if (errCode != E_OK) {
2123         LOGW("[RDBStorageEngine] Get table reference failed! errCode = %d", errCode);
2124         return false;
2125     }
2126     return !tableReference.empty();
2127 }
2128 
IsTableExistReferenceOrReferenceBy(const std::string & table)2129 bool RelationalSyncAbleStorage::IsTableExistReferenceOrReferenceBy(const std::string &table)
2130 {
2131     // check whether reference or reference by exist
2132     if (storageEngine_ == nullptr) {
2133         LOGE("[IsTableExistReferenceOrReferenceBy] storage is null when get reference gid");
2134         return false;
2135     }
2136     RelationalSchemaObject schema = storageEngine_->GetSchema();
2137     auto referenceProperty = schema.GetReferenceProperty();
2138     if (referenceProperty.empty()) {
2139         return false;
2140     }
2141     auto [sourceTableName, errCode] = GetSourceTableName(table);
2142     if (errCode != E_OK) {
2143         return false;
2144     }
2145     for (const auto &property : referenceProperty) {
2146         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName) ||
2147             DBCommon::CaseInsensitiveCompare(property.targetTableName, sourceTableName)) {
2148             return true;
2149         }
2150     }
2151     return false;
2152 }
2153 
ReleaseUploadRecord(const std::string & tableName,const CloudWaterType & type,Timestamp localMark)2154 void RelationalSyncAbleStorage::ReleaseUploadRecord(const std::string &tableName, const CloudWaterType &type,
2155     Timestamp localMark)
2156 {
2157     uploadRecorder_.ReleaseUploadRecord(tableName, type, localMark);
2158 }
2159 }
2160 #endif