• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "cloud/cloud_storage_utils.h"
22 #include "concurrent_adapter.h"
23 #include "data_compression.h"
24 #include "db_common.h"
25 #include "db_dfx_adapter.h"
26 #include "generic_single_ver_kv_entry.h"
27 #include "platform_specific.h"
28 #include "relational_remote_query_continue_token.h"
29 #include "relational_sync_data_inserter.h"
30 #include "res_finalizer.h"
31 #include "runtime_context.h"
32 #include "time_helper.h"
33 
34 namespace DistributedDB {
35 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)36 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
37 {
38     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
39     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
40         std::string(CLOSE_CONN_TASK),
41         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
42     );
43 }
44 }
45 
46 #define CHECK_STORAGE_ENGINE do { \
47     if (storageEngine_ == nullptr) { \
48         return -E_INVALID_DB; \
49     } \
50 } while (0)
51 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)52 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
53     : storageEngine_(std::move(engine)),
54       reusedHandle_(nullptr),
55       isCachedOption_(false)
56 {}
57 
~RelationalSyncAbleStorage()58 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
59 {
60     syncAbleEngine_ = nullptr;
61 }
62 
63 // Get interface type of this relational db.
GetInterfaceType() const64 int RelationalSyncAbleStorage::GetInterfaceType() const
65 {
66     return SYNC_RELATION;
67 }
68 
69 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()70 void RelationalSyncAbleStorage::IncRefCount()
71 {
72     LOGD("RelationalSyncAbleStorage ref +1");
73     IncObjRef(this);
74 }
75 
76 // Drop the interface ref-count.
DecRefCount()77 void RelationalSyncAbleStorage::DecRefCount()
78 {
79     LOGD("RelationalSyncAbleStorage ref -1");
80     DecObjRef(this);
81 }
82 
83 // Get the identifier of this rdb.
GetIdentifier() const84 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
85 {
86     std::string identifier = storageEngine_->GetIdentifier();
87     return std::vector<uint8_t>(identifier.begin(), identifier.end());
88 }
89 
GetDualTupleIdentifier() const90 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
91 {
92     std::string identifier = storageEngine_->GetProperties().GetStringProp(
93         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
94     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
95     return identifierVect;
96 }
97 
98 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const99 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
100 {
101     int errCode = E_OK;
102     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
103     if (handle == nullptr) {
104         return;
105     }
106     timestamp = 0;
107     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
108     if (errCode != E_OK) {
109         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
110         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
111     }
112     ReleaseHandle(handle);
113     return;
114 }
115 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const116 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
117 {
118     int errCode = E_OK;
119     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
120     if (handle == nullptr) {
121         return errCode;
122     }
123     timestamp = 0;
124     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
125     if (errCode != E_OK) {
126         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
127         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
128     }
129     ReleaseHandle(handle);
130     return errCode;
131 }
132 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const133 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
134     OperatePerm perm) const
135 {
136     if (storageEngine_ == nullptr) {
137         errCode = -E_INVALID_DB;
138         return nullptr;
139     }
140     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
141         storageEngine_->FindExecutor(isWrite, perm, errCode));
142     if (handle == nullptr) {
143         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
144     }
145     return handle;
146 }
147 
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const148 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
149     int &errCode, OperatePerm perm) const
150 {
151     if (storageEngine_ == nullptr) {
152         errCode = -E_INVALID_DB;
153         return nullptr;
154     }
155     if (transactionHandle_ != nullptr) {
156         return transactionHandle_;
157     }
158     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
159         storageEngine_->FindExecutor(isWrite, perm, errCode));
160     if (errCode != E_OK) {
161         ReleaseHandle(handle);
162         handle = nullptr;
163     }
164     return handle;
165 }
166 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const167 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
168 {
169     if (storageEngine_ == nullptr) {
170         return;
171     }
172     StorageExecutor *databaseHandle = handle;
173     storageEngine_->Recycle(databaseHandle);
174     std::function<void()> listener = nullptr;
175     {
176         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
177         listener = heartBeatListener_;
178     }
179     if (listener) {
180         listener();
181     }
182 }
183 
184 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const185 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
186 {
187     CHECK_STORAGE_ENGINE;
188     if (key.size() > DBConstant::MAX_KEY_SIZE) {
189         return -E_INVALID_ARGS;
190     }
191     int errCode = E_OK;
192     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
193     if (handle == nullptr) {
194         return errCode;
195     }
196     errCode = handle->GetKvData(key, value);
197     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
198         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
199     }
200     ReleaseHandle(handle);
201     return errCode;
202 }
203 
204 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)205 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
206 {
207     CHECK_STORAGE_ENGINE;
208     int errCode = E_OK;
209     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
210     if (handle == nullptr) {
211         return errCode;
212     }
213 
214     errCode = handle->PutKvData(key, value); // meta doesn't need time.
215     if (errCode != E_OK) {
216         LOGE("Put kv data err:%d", errCode);
217         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
218     }
219     ReleaseHandle(handle);
220     return errCode;
221 }
222 
PutMetaData(const Key & key,const Value & value,bool isInTransaction)223 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value, bool isInTransaction)
224 {
225     CHECK_STORAGE_ENGINE;
226     int errCode = E_OK;
227     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
228     std::unique_lock<std::mutex> handLock(reusedHandleMutex_, std::defer_lock);
229 
230     // try to recycle using the handle
231     if (isInTransaction) {
232         handLock.lock();
233         if (reusedHandle_ != nullptr) {
234             handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(reusedHandle_);
235         } else {
236             isInTransaction = false;
237             handLock.unlock();
238         }
239     }
240 
241     if (handle == nullptr) {
242         handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
243         if (handle == nullptr) {
244             return errCode;
245         }
246     }
247 
248     errCode = handle->PutKvData(key, value);
249     if (errCode != E_OK) {
250         LOGE("Put kv data err:%d", errCode);
251         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
252     }
253     if (!isInTransaction) {
254         ReleaseHandle(handle);
255     }
256     return errCode;
257 }
258 
259 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)260 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
261 {
262     CHECK_STORAGE_ENGINE;
263     for (const auto &key : keys) {
264         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
265             return -E_INVALID_ARGS;
266         }
267     }
268     int errCode = E_OK;
269     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
270     if (handle == nullptr) {
271         return errCode;
272     }
273 
274     handle->StartTransaction(TransactType::IMMEDIATE);
275     errCode = handle->DeleteMetaData(keys);
276     if (errCode != E_OK) {
277         handle->Rollback();
278         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
279         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
280     } else {
281         handle->Commit();
282     }
283     ReleaseHandle(handle);
284     return errCode;
285 }
286 
287 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const288 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
289 {
290     CHECK_STORAGE_ENGINE;
291     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
292         return -E_INVALID_ARGS;
293     }
294 
295     int errCode = E_OK;
296     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
297     if (handle == nullptr) {
298         return errCode;
299     }
300 
301     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
302     if (errCode != E_OK) {
303         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
304         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
305     }
306     ReleaseHandle(handle);
307     return errCode;
308 }
309 
310 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const311 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
312 {
313     CHECK_STORAGE_ENGINE;
314     int errCode = E_OK;
315     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
316     if (handle == nullptr) {
317         return errCode;
318     }
319 
320     errCode = handle->GetAllMetaKeys(keys);
321     if (errCode != E_OK) {
322         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
323     }
324     ReleaseHandle(handle);
325     return errCode;
326 }
327 
GetDbProperties() const328 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
329 {
330     return storageEngine_->GetProperties();
331 }
332 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)333 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
334 {
335     int errCode = E_OK;
336     for (auto &item : dataItems) {
337         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
338         if (entry == nullptr) {
339             errCode = -E_OUT_OF_MEMORY;
340             LOGE("GetKvEntries failed, errCode:%d", errCode);
341             SingleVerKvEntry::Release(entries);
342             break;
343         }
344         entry->SetEntryData(std::move(item));
345         entries.push_back(entry);
346     }
347     return errCode;
348 }
349 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)350 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
351 {
352     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
353     // the size would not be very large.
354     static const size_t maxOrigDevLength = 40;
355     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
356     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
357                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
358     return dataSize;
359 }
360 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)361 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
362     size_t appendLen)
363 {
364     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
365     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
366         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
367         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
368     }
369     return !reachThreshold;
370 }
371 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)372 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
373     SQLiteSingleVerRelationalContinueToken *&token)
374 {
375     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
376         delete token;
377         token = nullptr;
378         return;
379     }
380 
381     if (dataItems.empty()) {
382         errCode = -E_INTERNAL_ERROR;
383         LOGE("Get data unfinished but data items is empty.");
384         delete token;
385         token = nullptr;
386         return;
387     }
388     token->SetNextBeginTime(dataItems.back());
389     token->UpdateNextSyncOffset(dataItems.size());
390 }
391 
392 /**
393  * Caller must ensure that parameter token is valid.
394  * If error happened, token will be deleted here.
395  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const396 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
397     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
398 {
399     if (storageEngine_ == nullptr) {
400         return -E_INVALID_DB;
401     }
402 
403     int errCode = E_OK;
404     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
405         OperatePerm::NORMAL_PERM, errCode));
406     if (handle == nullptr) {
407         goto ERROR;
408     }
409 
410     do {
411         errCode = handle->GetSyncDataByQuery(dataItems,
412             Parcel::GetAppendedLen(),
413             dataSizeInfo,
414             std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
415                 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
416             storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
417         if (errCode == -E_FINISHED) {
418             token->FinishGetData();
419             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
420         }
421     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
422 
423 ERROR:
424     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
425         dataItems.clear();
426     }
427     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
428     ReleaseHandle(handle);
429     return errCode;
430 }
431 
432 // use kv struct data to sync
433 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const434 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
435     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
436     std::vector<SingleVerKvEntry *> &entries) const
437 {
438     if (!timeRange.IsValid()) {
439         return -E_INVALID_ARGS;
440     }
441     query.SetSchema(storageEngine_->GetSchema());
442     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
443     if (token == nullptr) {
444         LOGE("[SingleVerNStore] Allocate continue token failed.");
445         return -E_OUT_OF_MEMORY;
446     }
447 
448     continueStmtToken = static_cast<ContinueToken>(token);
449     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
450 }
451 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const452 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
453     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
454 {
455     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
456     if (!token->CheckValid()) {
457         return -E_INVALID_ARGS;
458     }
459     RelationalSchemaObject schema = storageEngine_->GetSchema();
460     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
461     std::vector<std::string> fieldNames;
462     fieldNames.reserve(fieldInfos.size());
463     for (const auto &fieldInfo : fieldInfos) { // order by cid
464         fieldNames.push_back(fieldInfo.GetFieldName());
465     }
466     token->SetFieldNames(fieldNames);
467 
468     std::vector<DataItem> dataItems;
469     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
470     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
471         continueStmtToken = static_cast<ContinueToken>(token);
472         return errCode;
473     }
474 
475     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
476     if (innerCode != E_OK) {
477         errCode = innerCode;
478         delete token;
479         token = nullptr;
480     }
481     continueStmtToken = static_cast<ContinueToken>(token);
482     return errCode;
483 }
484 
485 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)486 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
487 {
488     std::vector<DataItem> dataItems;
489     for (const auto &itemEntry : entries) {
490         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
491         if (entry != nullptr) {
492             DataItem item;
493             item.origDev = entry->GetOrigDevice();
494             item.flag = entry->GetFlag();
495             item.timestamp = entry->GetTimestamp();
496             item.writeTimestamp = entry->GetWriteTimestamp();
497             entry->GetKey(item.key);
498             entry->GetValue(item.value);
499             entry->GetHashKey(item.hashKey);
500             dataItems.push_back(item);
501         }
502     }
503     return dataItems;
504 }
505 }
506 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)507 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
508     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
509 {
510     std::vector<DataItem> dataItems = ConvertEntries(entries);
511     return PutSyncData(object, dataItems, deviceName);
512 }
513 
514 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)515 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
516 {
517     return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
518         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
519 }
520 
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)521 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
522 {
523     return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
524 }
525 }
526 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)527 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
528     const std::string &deviceName)
529 {
530     int errCode = E_OK;
531     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
532     QueryObject query = object;
533     query.SetSchema(storageEngine_->GetSchema());
534 
535     RelationalSchemaObject remoteSchema;
536     errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
537     if (errCode != E_OK) {
538         LOGE("Find remote schema failed. err=%d", errCode);
539         return errCode;
540     }
541 
542     StoreInfo info = GetStoreInfo();
543     auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
544         remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
545     inserter.SetEntries(dataItems);
546 
547     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
548     if (handle == nullptr) {
549         return errCode;
550     }
551 
552     DBDfxAdapter::StartTracing();
553 
554     errCode = handle->SaveSyncItems(inserter);
555 
556     DBDfxAdapter::FinishTracing();
557     if (errCode == E_OK) {
558         // dataItems size > 0 now because already check before
559         // all dataItems will write into db now, so need to observer notify here
560         // if some dataItems will not write into db in the future, observer notify here need change
561         ChangedData data;
562         TriggerObserverAction(deviceName, std::move(data), false);
563     }
564 
565     ReleaseHandle(handle);
566     return errCode;
567 }
568 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)569 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
570     const std::string &deviceName)
571 {
572     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
573         LOGW("Device length is invalid for sync put");
574         return -E_INVALID_ARGS;
575     }
576 
577     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
578     if (errCode != E_OK) {
579         LOGE("[Relational] PutSyncData errCode:%d", errCode);
580         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
581     }
582     return errCode;
583 }
584 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)585 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
586 {
587     (void) deviceName;
588     (void) isNeedNotify;
589     return -E_NOT_SUPPORT;
590 }
591 
GetSchemaInfo() const592 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
593 {
594     return storageEngine_->GetSchema();
595 }
596 
GetSecurityOption(SecurityOption & option) const597 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
598 {
599     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
600     if (isCachedOption_) {
601         option = securityOption_;
602         return E_OK;
603     }
604     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
605     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
606     if (errCode == E_OK) {
607         option = securityOption_;
608         isCachedOption_ = true;
609     }
610     return errCode;
611 }
612 
NotifyRemotePushFinished(const std::string & deviceId) const613 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
614 {
615     return;
616 }
617 
618 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const619 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
620 {
621     return OS::GetCurrentSysTimeInMicrosecond(outTime);
622 }
623 
GetTablesQuery()624 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
625 {
626     auto tableNames = storageEngine_->GetSchema().GetTableNames();
627     std::vector<QuerySyncObject> queries;
628     queries.reserve(tableNames.size());
629     for (const auto &it : tableNames) {
630         queries.emplace_back(Query::Select(it));
631     }
632     return queries;
633 }
634 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)635 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
636 {
637     (void) queryObj;
638     return -E_NOT_SUPPORT;
639 }
640 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const641 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
642     const std::string &targetID) const
643 {
644     return E_OK;
645 }
646 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)647 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
648     const RelationalSyncStrategy &syncStrategy)
649 {
650     auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
651         DistributedTableMode::SPLIT_BY_DEVICE);
652     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
653         LOGD("No need create device table in COLLABORATION mode.");
654         return E_OK;
655     }
656 
657     int errCode = E_OK;
658     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
659     if (handle == nullptr) {
660         return errCode;
661     }
662 
663     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
664     if (errCode != E_OK) {
665         LOGE("Start transaction failed:%d", errCode);
666         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
667         ReleaseHandle(handle);
668         return errCode;
669     }
670 
671     StoreInfo info = GetStoreInfo();
672     for (const auto &[table, strategy] : syncStrategy) {
673         if (!strategy.permitSync) {
674             continue;
675         }
676 
677         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
678         if (errCode != E_OK) {
679             LOGE("Create distributed device table failed. %d", errCode);
680             break;
681         }
682     }
683 
684     if (errCode == E_OK) {
685         errCode = handle->Commit();
686     } else {
687         (void)handle->Rollback();
688     }
689 
690     ReleaseHandle(handle);
691     return errCode;
692 }
693 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)694 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
695 {
696     std::lock_guard lock(onSchemaChangedMutex_);
697     onSchemaChanged_ = callback;
698     return E_OK;
699 }
700 
NotifySchemaChanged()701 void RelationalSyncAbleStorage::NotifySchemaChanged()
702 {
703     std::lock_guard lock(onSchemaChangedMutex_);
704     if (onSchemaChanged_) {
705         LOGD("Notify relational schema was changed");
706         onSchemaChanged_();
707     }
708 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const709 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
710 {
711     algorithmSet.clear();
712     DataCompression::GetCompressionAlgo(algorithmSet);
713     return E_OK;
714 }
715 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)716 int RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
717     const RelationalObserverAction &action)
718 {
719     int errCode = E_OK;
720     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, action, &errCode] () mutable {
721         ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
722         auto it = dataChangeCallbackMap_.find(connectionId);
723         if (it != dataChangeCallbackMap_.end()) {
724             if (it->second.find(observer) != it->second.end()) {
725                 LOGE("obsever already registered");
726                 errCode = -E_ALREADY_SET;
727                 return;
728             }
729             if (it->second.size() >= DBConstant::MAX_OBSERVER_COUNT) {
730                 LOGE("The number of relational observers has been over limit");
731                 errCode = -E_MAX_LIMITS;
732                 return;
733             }
734             it->second[observer] = action;
735         } else {
736             dataChangeCallbackMap_[connectionId][observer] = action;
737         }
738         LOGI("register relational observer ok");
739     }, nullptr, &dataChangeCallbackMap_);
740     ADAPTER_WAIT(handle);
741     return errCode;
742 }
743 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)744 int RelationalSyncAbleStorage::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
745 {
746     if (observer == nullptr) {
747         EraseDataChangeCallback(connectionId);
748         return E_OK;
749     }
750     int errCode = -E_NOT_FOUND;
751     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId, observer, &errCode] () mutable {
752         ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
753         auto it = dataChangeCallbackMap_.find(connectionId);
754         if (it == dataChangeCallbackMap_.end()) {
755             return;
756         }
757         auto action = it->second.find(observer);
758         if (action != it->second.end()) {
759             it->second.erase(action);
760             LOGI("unregister relational observer.");
761             if (it->second.empty()) {
762                 dataChangeCallbackMap_.erase(it);
763                 LOGI("observer for this delegate is zero now");
764             }
765             errCode = E_OK;
766         }
767     }, nullptr, &dataChangeCallbackMap_);
768     ADAPTER_WAIT(handle);
769     return errCode;
770 }
771 
ExecuteDataChangeCallback(const std::pair<uint64_t,std::map<const StoreObserver *,RelationalObserverAction>> & item,int & observerCnt,const std::string & deviceName,const ChangedData & changedData,bool isChangedData)772 void RelationalSyncAbleStorage::ExecuteDataChangeCallback(
773     const std::pair<uint64_t, std::map<const StoreObserver *, RelationalObserverAction>> &item, int &observerCnt,
774     const std::string &deviceName, const ChangedData &changedData, bool isChangedData)
775 {
776     for (auto &action : item.second) {
777         if (action.second == nullptr) {
778             continue;
779         }
780         observerCnt++;
781         ChangedData observerChangeData = changedData;
782         if (action.first != nullptr) {
783             FilterChangeDataByDetailsType(observerChangeData, action.first->GetCallbackDetailsType());
784         }
785         action.second(deviceName, std::move(observerChangeData), isChangedData);
786     }
787 }
788 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)789 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
790     ChangedData &&changedData, bool isChangedData)
791 {
792     IncObjRef(this);
793     int taskErrCode =
794         ConcurrentAdapter::ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
795             LOGD("begin to trigger relational observer.");
796             int observerCnt = 0;
797             ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
798             for (const auto &item : dataChangeCallbackMap_) {
799                 ExecuteDataChangeCallback(item, observerCnt, deviceName, changedData, isChangedData);
800             }
801             LOGD("relational observer size = %d", observerCnt);
802             DecObjRef(this);
803         }, &dataChangeCallbackMap_);
804     if (taskErrCode != E_OK) {
805         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
806         DecObjRef(this);
807     }
808 }
809 
RegisterHeartBeatListener(const std::function<void ()> & listener)810 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
811 {
812     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
813     heartBeatListener_ = listener;
814 }
815 
CheckAndInitQueryCondition(QueryObject & query) const816 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
817 {
818     RelationalSchemaObject schema = storageEngine_->GetSchema();
819     TableInfo table = schema.GetTable(query.GetTableName());
820     if (!table.IsValid()) {
821         LOGE("Query table is not a distributed table.");
822         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
823     }
824     if (table.GetTableSyncType() == CLOUD_COOPERATION) {
825         LOGE("cloud table mode is not support");
826         return -E_NOT_SUPPORT;
827     }
828     query.SetSchema(schema);
829 
830     int errCode = E_OK;
831     auto *handle = GetHandle(true, errCode);
832     if (handle == nullptr) {
833         return errCode;
834     }
835 
836     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
837     if (errCode != E_OK) {
838         LOGE("Check relational query condition failed. %d", errCode);
839         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
840     }
841 
842     ReleaseHandle(handle);
843     return errCode;
844 }
845 
CheckCompatible(const std::string & schema,uint8_t type) const846 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
847 {
848     // return true if is relational schema.
849     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
850 }
851 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const852 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
853     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
854 {
855     if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
856         return -E_NOT_SUPPORT;
857     }
858     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
859         LOGE("[ExecuteQuery] invalid args");
860         return -E_INVALID_ARGS;
861     }
862     int errCode = E_OK;
863     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
864     if (handle == nullptr) {
865         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
866         return errCode;
867     }
868     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
869     if (errCode != E_OK) {
870         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
871     }
872     ReleaseHandle(handle);
873     return errCode;
874 }
875 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const876 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
877     RelationalRowDataSet &dataSet, ContinueToken &token) const
878 {
879     dataSet.Clear();
880     if (token == nullptr) {
881         // start query
882         std::vector<std::string> colNames;
883         std::vector<RelationalRowData *> data;
884         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
885 
886         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
887         if (errCode != E_OK) {
888             return errCode;
889         }
890 
891         // create one token
892         token = static_cast<ContinueToken>(
893             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
894         if (token == nullptr) {
895             LOGE("ExecuteQuery OOM");
896             return -E_OUT_OF_MEMORY;
897         }
898     }
899 
900     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
901     if (!remoteToken->CheckValid()) {
902         LOGE("ExecuteQuery invalid token");
903         return -E_INVALID_ARGS;
904     }
905 
906     int errCode = remoteToken->GetData(packetSize, dataSet);
907     if (errCode == -E_UNFINISHED) {
908         errCode = E_OK;
909     } else {
910         if (errCode != E_OK) {
911             dataSet.Clear();
912         }
913         delete remoteToken;
914         remoteToken = nullptr;
915         token = nullptr;
916     }
917     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
918     return errCode;
919 }
920 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)921 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
922     uint8_t type)
923 {
924     if (ReadSchemaType(type) != SchemaType::RELATIVE) {
925         return -E_INVALID_ARGS;
926     }
927 
928     RelationalSchemaObject schemaObj;
929     int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
930     if (errCode != E_OK) {
931         LOGE("Parse remote schema failed. err=%d", errCode);
932         return errCode;
933     }
934 
935     std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
936     Key remoteSchemaKey(keyStr.begin(), keyStr.end());
937     Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
938     errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
939     if (errCode != E_OK) {
940         LOGE("Save remote schema failed. err=%d", errCode);
941         return errCode;
942     }
943 
944     return remoteDeviceSchema_.Put(deviceId, remoteSchema);
945 }
946 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)947 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
948 {
949     if (schemaObj.IsSchemaValid()) {
950         return -E_INVALID_ARGS;
951     }
952 
953     std::string remoteSchema;
954     int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
955     if (errCode == -E_NOT_FOUND) {
956         LOGW("Get remote device schema miss cached.");
957         std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
958         Key remoteSchemaKey(keyStr.begin(), keyStr.end());
959         Value remoteSchemaBuff;
960         errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
961         if (errCode != E_OK) {
962             LOGE("Get remote device schema from meta failed. err=%d", errCode);
963             return errCode;
964         }
965         remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
966         errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
967     }
968 
969     if (errCode != E_OK) {
970         LOGE("Get remote device schema failed. err=%d", errCode);
971         return errCode;
972     }
973 
974     errCode = schemaObj.ParseFromSchemaString(remoteSchema);
975     if (errCode != E_OK) {
976         LOGE("Parse remote schema failed. err=%d", errCode);
977     }
978     return errCode;
979 }
980 
SetReusedHandle(StorageExecutor * handle)981 void RelationalSyncAbleStorage::SetReusedHandle(StorageExecutor *handle)
982 {
983     std::lock_guard<std::mutex> autoLock(reusedHandleMutex_);
984     reusedHandle_ = handle;
985 }
986 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const987 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
988 {
989     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
990     delete remoteToken;
991     remoteToken = nullptr;
992     token = nullptr;
993 }
994 
GetStoreInfo() const995 StoreInfo RelationalSyncAbleStorage::GetStoreInfo() const
996 {
997     StoreInfo info = {
998         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
999         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
1000         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
1001     };
1002     return info;
1003 }
1004 
StartTransaction(TransactType type)1005 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
1006 {
1007     CHECK_STORAGE_ENGINE;
1008     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1009     if (transactionHandle_ != nullptr) {
1010         LOGD("Transaction started already.");
1011         return -E_TRANSACT_STATE;
1012     }
1013     int errCode = E_OK;
1014     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1015         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
1016     if (handle == nullptr) {
1017         ReleaseHandle(handle);
1018         return errCode;
1019     }
1020     errCode = handle->StartTransaction(type);
1021     if (errCode != E_OK) {
1022         ReleaseHandle(handle);
1023         return errCode;
1024     }
1025     transactionHandle_ = handle;
1026     return errCode;
1027 }
1028 
Commit()1029 int RelationalSyncAbleStorage::Commit()
1030 {
1031     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1032     if (transactionHandle_ == nullptr) {
1033         LOGE("relation database is null or the transaction has not been started");
1034         return -E_INVALID_DB;
1035     }
1036     int errCode = transactionHandle_->Commit();
1037     ReleaseHandle(transactionHandle_);
1038     transactionHandle_ = nullptr;
1039     LOGD("connection commit transaction!");
1040     return errCode;
1041 }
1042 
Rollback()1043 int RelationalSyncAbleStorage::Rollback()
1044 {
1045     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
1046     if (transactionHandle_ == nullptr) {
1047         LOGE("Invalid handle for rollback or the transaction has not been started.");
1048         return -E_INVALID_DB;
1049     }
1050 
1051     int errCode = transactionHandle_->Rollback();
1052     ReleaseHandle(transactionHandle_);
1053     transactionHandle_ = nullptr;
1054     LOGI("connection rollback transaction!");
1055     return errCode;
1056 }
1057 
GetUploadCount(const QuerySyncObject & query,const Timestamp & timestamp,bool isCloudForcePush,int64_t & count)1058 int RelationalSyncAbleStorage::GetUploadCount(const QuerySyncObject &query, const Timestamp &timestamp,
1059     bool isCloudForcePush, int64_t &count)
1060 {
1061     int errCode = E_OK;
1062     auto *handle = GetHandleExpectTransaction(false, errCode);
1063     if (handle == nullptr) {
1064         return errCode;
1065     }
1066     QuerySyncObject queryObj = query;
1067     queryObj.SetSchema(GetSchemaInfo());
1068     errCode = handle->GetUploadCount(timestamp, isCloudForcePush, queryObj, count);
1069     if (transactionHandle_ == nullptr) {
1070         ReleaseHandle(handle);
1071     }
1072     return errCode;
1073 }
1074 
GetCloudData(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1075 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1076     const Timestamp &beginTime, ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
1077 {
1078     if (transactionHandle_ == nullptr) {
1079         LOGE("the transaction has not been started");
1080         return -E_INVALID_DB;
1081     }
1082     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1083     QuerySyncObject query = querySyncObject;
1084     query.SetSchema(GetSchemaInfo());
1085     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, query);
1086     if (token == nullptr) {
1087         LOGE("[SingleVerNStore] Allocate continue token failed.");
1088         return -E_OUT_OF_MEMORY;
1089     }
1090     token->SetCloudTableSchema(tableSchema);
1091     continueStmtToken = static_cast<ContinueToken>(token);
1092     return GetCloudDataNext(continueStmtToken, cloudDataResult);
1093 }
1094 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1095 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1096     CloudSyncData &cloudDataResult)
1097 {
1098     if (continueStmtToken == nullptr) {
1099         return -E_INVALID_ARGS;
1100     }
1101     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1102     if (!token->CheckValid()) {
1103         return -E_INVALID_ARGS;
1104     }
1105     if (transactionHandle_ == nullptr) {
1106         LOGE("the transaction has not been started, release the token");
1107         ReleaseCloudDataToken(continueStmtToken);
1108         return -E_INVALID_DB;
1109     }
1110     cloudDataResult.isShared = IsSharedTable(cloudDataResult.tableName);
1111     int errCode = transactionHandle_->GetSyncCloudData(cloudDataResult, CloudDbConstant::MAX_UPLOAD_SIZE, *token);
1112     if (errCode != -E_UNFINISHED) {
1113         delete token;
1114         token = nullptr;
1115     }
1116     continueStmtToken = static_cast<ContinueToken>(token);
1117     if (errCode != E_OK && errCode != -E_UNFINISHED) {
1118         return errCode;
1119     }
1120     int fillRefGidCode = FillReferenceData(cloudDataResult);
1121     return fillRefGidCode == E_OK ? errCode : fillRefGidCode;
1122 }
1123 
GetCloudGid(const TableSchema & tableSchema,const QuerySyncObject & querySyncObject,bool isCloudForcePush,std::vector<std::string> & cloudGid)1124 int RelationalSyncAbleStorage::GetCloudGid(const TableSchema &tableSchema, const QuerySyncObject &querySyncObject,
1125     bool isCloudForcePush, std::vector<std::string> &cloudGid)
1126 {
1127     int errCode = E_OK;
1128     auto *handle = GetHandle(false, errCode);
1129     if (handle == nullptr) {
1130         return errCode;
1131     }
1132     Timestamp beginTime = 0u;
1133     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
1134     QuerySyncObject query = querySyncObject;
1135     query.SetSchema(GetSchemaInfo());
1136     errCode = handle->GetSyncCloudGid(query, syncTimeRange, isCloudForcePush, cloudGid);
1137     ReleaseHandle(handle);
1138     if (errCode != E_OK) {
1139         LOGE("[RelationalSyncAbleStorage] GetCloudGid failed %d", errCode);
1140     }
1141     return errCode;
1142 }
1143 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1144 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1145 {
1146     if (continueStmtToken == nullptr) {
1147         return E_OK;
1148     }
1149     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1150     if (!token->CheckValid()) {
1151         return E_OK;
1152     }
1153     int errCode = token->ReleaseCloudStatement();
1154     delete token;
1155     token = nullptr;
1156     return errCode;
1157 }
1158 
ChkSchema(const TableName & tableName)1159 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1160 {
1161     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1162     RelationalSchemaObject localSchema = GetSchemaInfo();
1163     return schemaMgr_.ChkSchema(tableName, localSchema);
1164 }
1165 
SetCloudDbSchema(const DataBaseSchema & schema)1166 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1167 {
1168     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1169     schemaMgr_.SetCloudDbSchema(schema);
1170     return E_OK;
1171 }
1172 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1173 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1174     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1175 {
1176     if (transactionHandle_ == nullptr) {
1177         LOGE(" the transaction has not been started");
1178         return -E_INVALID_DB;
1179     }
1180 
1181     return GetInfoByPrimaryKeyOrGidInner(transactionHandle_, tableName, vBucket, dataInfoWithLog, assetInfo);
1182 }
1183 
GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1184 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGidInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1185     const std::string &tableName, const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1186 {
1187     TableSchema tableSchema;
1188     int errCode = GetCloudTableSchema(tableName, tableSchema);
1189     if (errCode != E_OK) {
1190         LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1191         return errCode;
1192     }
1193     RelationalSchemaObject localSchema = GetSchemaInfo();
1194     handle->SetLocalSchema(localSchema);
1195     return handle->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1196 }
1197 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1198 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1199 {
1200     if (transactionHandle_ == nullptr) {
1201         LOGE(" the transaction has not been started");
1202         return -E_INVALID_DB;
1203     }
1204     return PutCloudSyncDataInner(transactionHandle_, tableName, downloadData);
1205 }
1206 
PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,DownloadData & downloadData)1207 int RelationalSyncAbleStorage::PutCloudSyncDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1208     const std::string &tableName, DownloadData &downloadData)
1209 {
1210     TableSchema tableSchema;
1211     int errCode = GetCloudTableSchema(tableName, tableSchema);
1212     if (errCode != E_OK) {
1213         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1214         return errCode;
1215     }
1216     RelationalSchemaObject localSchema = GetSchemaInfo();
1217     handle->SetLocalSchema(localSchema);
1218     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1219     handle->SetLogicDelete(IsCurrentLogicDelete());
1220     errCode = handle->PutCloudSyncData(tableName, tableSchema, trackerTable, downloadData);
1221     handle->SetLogicDelete(false);
1222     return errCode;
1223 }
1224 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)1225 int RelationalSyncAbleStorage::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
1226 {
1227     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1228     cloudSchema = schemaMgr_.GetCloudDbSchema();
1229     return E_OK;
1230 }
1231 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1232 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1233     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1234 {
1235     if (transactionHandle_ == nullptr) {
1236         LOGE("the transaction has not been started");
1237         return -E_INVALID_DB;
1238     }
1239     return transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets);
1240 }
1241 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1242 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1243 {
1244     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1245     return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1246 }
1247 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1248 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1249     bool isDownloadSuccess)
1250 {
1251     if (storageEngine_ == nullptr) {
1252         return -E_INVALID_DB;
1253     }
1254     if (transactionHandle_ == nullptr) {
1255         LOGE("the transaction has not been started when fill asset for download.");
1256         return -E_INVALID_DB;
1257     }
1258     TableSchema tableSchema;
1259     int errCode = GetCloudTableSchema(tableName, tableSchema);
1260     if (errCode != E_OK) {
1261         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1262         return errCode;
1263     }
1264     errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1265     if (errCode != E_OK) {
1266         LOGE("fill cloud asset for download failed.%d", errCode);
1267     }
1268     return errCode;
1269 }
1270 
SetLogTriggerStatus(bool status)1271 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1272 {
1273     int errCode = E_OK;
1274     auto *handle = GetHandleExpectTransaction(false, errCode);
1275     if (handle == nullptr) {
1276         return errCode;
1277     }
1278     errCode = handle->SetLogTriggerStatus(status);
1279     if (transactionHandle_ == nullptr) {
1280         ReleaseHandle(handle);
1281     }
1282     return errCode;
1283 }
1284 
FillCloudLogAndAsset(const OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1285 int RelationalSyncAbleStorage::FillCloudLogAndAsset(const OpType opType, const CloudSyncData &data, bool fillAsset,
1286     bool ignoreEmptyGid)
1287 {
1288     CHECK_STORAGE_ENGINE;
1289     if (opType == OpType::UPDATE && data.updData.assets.empty()) {
1290         return E_OK;
1291     }
1292     int errCode = E_OK;
1293     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1294         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1295     if (writeHandle == nullptr) {
1296         return errCode;
1297     }
1298     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1299     if (errCode != E_OK) {
1300         ReleaseHandle(writeHandle);
1301         return errCode;
1302     }
1303     errCode = FillCloudLogAndAssetInner(writeHandle, opType, data, fillAsset, ignoreEmptyGid);
1304     if (errCode != E_OK) {
1305         LOGE("Failed to fill version or cloud asset, opType:%d ret:%d.", opType, errCode);
1306         writeHandle->Rollback();
1307         ReleaseHandle(writeHandle);
1308         return errCode;
1309     }
1310     errCode = writeHandle->Commit();
1311     ReleaseHandle(writeHandle);
1312     return errCode;
1313 }
1314 
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1315 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1316 {
1317     syncAbleEngine_ = syncAbleEngine;
1318 }
1319 
GetIdentify() const1320 std::string RelationalSyncAbleStorage::GetIdentify() const
1321 {
1322     if (storageEngine_ == nullptr) {
1323         LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1324         return "";
1325     }
1326     return storageEngine_->GetIdentifier();
1327 }
1328 
EraseDataChangeCallback(uint64_t connectionId)1329 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1330 {
1331     TaskHandle handle = ConcurrentAdapter::ScheduleTaskH([this, connectionId] () mutable {
1332         ADAPTER_AUTO_LOCK(lock, dataChangeDeviceMutex_);
1333         auto it = dataChangeCallbackMap_.find(connectionId);
1334         if (it != dataChangeCallbackMap_.end()) {
1335             dataChangeCallbackMap_.erase(it);
1336             LOGI("erase all observer for this delegate.");
1337         }
1338     }, nullptr, &dataChangeCallbackMap_);
1339     ADAPTER_WAIT(handle);
1340 }
1341 
ReleaseContinueToken(ContinueToken & continueStmtToken) const1342 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1343 {
1344     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1345     if (token == nullptr || !(token->CheckValid())) {
1346         LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1347         return;
1348     }
1349     delete token;
1350     continueStmtToken = nullptr;
1351 }
1352 
GetCloudDataGid(const QuerySyncObject & query,Timestamp beginTime,std::vector<std::string> & gid)1353 int RelationalSyncAbleStorage::GetCloudDataGid(const QuerySyncObject &query, Timestamp beginTime,
1354     std::vector<std::string> &gid)
1355 {
1356     return E_OK;
1357 }
1358 
CheckQueryValid(const QuerySyncObject & query)1359 int RelationalSyncAbleStorage::CheckQueryValid(const QuerySyncObject &query)
1360 {
1361     int errCode = E_OK;
1362     auto *handle = GetHandle(false, errCode);
1363     if (handle == nullptr) {
1364         return errCode;
1365     }
1366     errCode = handle->CheckQueryObjectLegal(query);
1367     if (errCode != E_OK) {
1368         ReleaseHandle(handle);
1369         return errCode;
1370     }
1371     QuerySyncObject queryObj = query;
1372     queryObj.SetSchema(GetSchemaInfo());
1373     int64_t count = 0;
1374     errCode = handle->GetUploadCount(UINT64_MAX, false, queryObj, count);
1375     ReleaseHandle(handle);
1376     if (errCode != E_OK) {
1377         LOGE("[RelationalSyncAbleStorage] CheckQueryValid failed %d", errCode);
1378         return -E_INVALID_ARGS;
1379     }
1380     return errCode;
1381 }
1382 
CreateTempSyncTrigger(const std::string & tableName)1383 int RelationalSyncAbleStorage::CreateTempSyncTrigger(const std::string &tableName)
1384 {
1385     int errCode = E_OK;
1386     auto *handle = GetHandle(true, errCode);
1387     if (handle == nullptr) {
1388         return errCode;
1389     }
1390     errCode = CreateTempSyncTriggerInner(handle, tableName);
1391     ReleaseHandle(handle);
1392     if (errCode != E_OK) {
1393         LOGE("[RelationalSyncAbleStorage] Create temp sync trigger failed %d", errCode);
1394     }
1395     return errCode;
1396 }
1397 
GetAndResetServerObserverData(const std::string & tableName,ChangeProperties & changeProperties)1398 int RelationalSyncAbleStorage::GetAndResetServerObserverData(const std::string &tableName,
1399     ChangeProperties &changeProperties)
1400 {
1401     int errCode = E_OK;
1402     auto *handle = GetHandle(false, errCode);
1403     if (handle == nullptr) {
1404         return errCode;
1405     }
1406     errCode = handle->GetAndResetServerObserverData(tableName, changeProperties);
1407     ReleaseHandle(handle);
1408     if (errCode != E_OK) {
1409         LOGE("[RelationalSyncAbleStorage] get server observer data failed %d", errCode);
1410     }
1411     return errCode;
1412 }
1413 
FilterChangeDataByDetailsType(ChangedData & changedData,uint32_t type)1414 void RelationalSyncAbleStorage::FilterChangeDataByDetailsType(ChangedData &changedData, uint32_t type)
1415 {
1416     if ((type & static_cast<uint32_t>(CallbackDetailsType::DEFAULT)) == 0) {
1417         changedData.field = {};
1418         for (size_t i = ChangeType::OP_INSERT; i < ChangeType::OP_BUTT; ++i) {
1419             changedData.primaryData[i].clear();
1420         }
1421     }
1422     if ((type & static_cast<uint32_t>(CallbackDetailsType::BRIEF)) == 0) {
1423         changedData.properties = {};
1424     }
1425 }
1426 
ClearAllTempSyncTrigger()1427 int RelationalSyncAbleStorage::ClearAllTempSyncTrigger()
1428 {
1429     int errCode = E_OK;
1430     auto *handle = GetHandle(true, errCode);
1431     if (handle == nullptr) {
1432         return errCode;
1433     }
1434     errCode = handle->ClearAllTempSyncTrigger();
1435     ReleaseHandle(handle);
1436     if (errCode != E_OK) {
1437         LOGE("[RelationalSyncAbleStorage] clear all temp sync trigger failed %d", errCode);
1438     }
1439     return errCode;
1440 }
1441 
FillReferenceData(CloudSyncData & syncData)1442 int RelationalSyncAbleStorage::FillReferenceData(CloudSyncData &syncData)
1443 {
1444     std::map<int64_t, Entries> referenceGid;
1445     int errCode = GetReferenceGid(syncData.tableName, syncData.insData, referenceGid);
1446     if (errCode != E_OK) {
1447         LOGE("[RelationalSyncAbleStorage] get insert reference data failed %d", errCode);
1448         return errCode;
1449     }
1450     errCode = FillReferenceDataIntoExtend(syncData.insData.rowid, referenceGid, syncData.insData.extend);
1451     if (errCode != E_OK) {
1452         return errCode;
1453     }
1454     referenceGid.clear();
1455     errCode = GetReferenceGid(syncData.tableName, syncData.updData, referenceGid);
1456     if (errCode != E_OK) {
1457         LOGE("[RelationalSyncAbleStorage] get update reference data failed %d", errCode);
1458         return errCode;
1459     }
1460     return FillReferenceDataIntoExtend(syncData.updData.rowid, referenceGid, syncData.updData.extend);
1461 }
1462 
FillReferenceDataIntoExtend(const std::vector<int64_t> & rowid,const std::map<int64_t,Entries> & referenceGid,std::vector<VBucket> & extend)1463 int RelationalSyncAbleStorage::FillReferenceDataIntoExtend(const std::vector<int64_t> &rowid,
1464     const std::map<int64_t, Entries> &referenceGid, std::vector<VBucket> &extend)
1465 {
1466     if (referenceGid.empty()) {
1467         return E_OK;
1468     }
1469     int ignoredCount = 0;
1470     for (size_t index = 0u; index < rowid.size(); index++) {
1471         if (index >= extend.size()) {
1472             LOGE("[RelationalSyncAbleStorage] index out of range when fill reference gid into extend!");
1473             return -E_UNEXPECTED_DATA;
1474         }
1475         int64_t rowId = rowid[index];
1476         if (referenceGid.find(rowId) == referenceGid.end()) {
1477             // current data miss match reference data, we ignored it
1478             ignoredCount++;
1479             continue;
1480         }
1481         extend[index].insert({ CloudDbConstant::REFERENCE_FIELD, referenceGid.at(rowId) });
1482     }
1483     if (ignoredCount != 0) {
1484         LOGD("[RelationalSyncAbleStorage] ignored %d data when fill reference data", ignoredCount);
1485     }
1486     return E_OK;
1487 }
1488 
IsSharedTable(const std::string & tableName)1489 bool RelationalSyncAbleStorage::IsSharedTable(const std::string &tableName)
1490 {
1491     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1492     return schemaMgr_.IsSharedTable(tableName);
1493 }
1494 
GetSharedTableOriginNames()1495 std::map<std::string, std::string> RelationalSyncAbleStorage::GetSharedTableOriginNames()
1496 {
1497     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1498     return schemaMgr_.GetSharedTableOriginNames();
1499 }
1500 
GetReferenceGid(const std::string & tableName,const CloudSyncBatch & syncBatch,std::map<int64_t,Entries> & referenceGid)1501 int RelationalSyncAbleStorage::GetReferenceGid(const std::string &tableName, const CloudSyncBatch &syncBatch,
1502     std::map<int64_t, Entries> &referenceGid)
1503 {
1504     std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1505     int errCode = GetTableReference(tableName, tableReference);
1506     if (errCode != E_OK) {
1507         return errCode;
1508     }
1509     if (tableReference.empty()) {
1510         LOGD("[RelationalSyncAbleStorage] currentTable not exist reference property");
1511         return E_OK;
1512     }
1513     auto *handle = GetHandle(false, errCode);
1514     if (handle == nullptr) {
1515         return errCode;
1516     }
1517     errCode = handle->GetReferenceGid(tableName, syncBatch, tableReference, referenceGid);
1518     ReleaseHandle(handle);
1519     return errCode;
1520 }
1521 
GetTableReference(const std::string & tableName,std::map<std::string,std::vector<TableReferenceProperty>> & reference)1522 int RelationalSyncAbleStorage::GetTableReference(const std::string &tableName,
1523     std::map<std::string, std::vector<TableReferenceProperty>> &reference)
1524 {
1525     if (storageEngine_ == nullptr) {
1526         LOGE("[RelationalSyncAbleStorage] storage is null when get reference gid");
1527         return -E_INVALID_DB;
1528     }
1529     RelationalSchemaObject schema = storageEngine_->GetSchema();
1530     auto referenceProperty = schema.GetReferenceProperty();
1531     if (referenceProperty.empty()) {
1532         return E_OK;
1533     }
1534     auto [sourceTableName, errCode] = GetSourceTableName(tableName);
1535     if (errCode != E_OK) {
1536         return errCode;
1537     }
1538     for (const auto &property : referenceProperty) {
1539         if (DBCommon::CaseInsensitiveCompare(property.sourceTableName, sourceTableName)) {
1540             if (!IsSharedTable(tableName)) {
1541                 reference[property.targetTableName].push_back(property);
1542                 continue;
1543             }
1544             TableReferenceProperty tableReference;
1545             tableReference.sourceTableName = tableName;
1546             tableReference.columns = property.columns;
1547             tableReference.columns[CloudDbConstant::CLOUD_OWNER] = CloudDbConstant::CLOUD_OWNER;
1548             auto [sharedTargetTable, ret] = GetSharedTargetTableName(property.targetTableName);
1549             if (ret != E_OK) {
1550                 return ret;
1551             }
1552             tableReference.targetTableName = sharedTargetTable;
1553             reference[tableReference.targetTableName].push_back(tableReference);
1554         }
1555     }
1556     return E_OK;
1557 }
1558 
GetSourceTableName(const std::string & tableName)1559 std::pair<std::string, int> RelationalSyncAbleStorage::GetSourceTableName(const std::string &tableName)
1560 {
1561     std::pair<std::string, int> res = { "", E_OK };
1562     std::shared_ptr<DataBaseSchema> cloudSchema;
1563     (void) GetCloudDbSchema(cloudSchema);
1564     if (cloudSchema == nullptr) {
1565         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get source table");
1566         return { "", -E_INTERNAL_ERROR };
1567     }
1568     for (const auto &table : cloudSchema->tables) {
1569         if (CloudStorageUtils::IsSharedTable(table)) {
1570             continue;
1571         }
1572         if (DBCommon::CaseInsensitiveCompare(table.name, tableName) ||
1573             DBCommon::CaseInsensitiveCompare(table.sharedTableName, tableName)) {
1574             res.first = table.name;
1575             break;
1576         }
1577     }
1578     if (res.first.empty()) {
1579         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1580         res.second = -E_SCHEMA_MISMATCH;
1581     }
1582     return res;
1583 }
1584 
GetSharedTargetTableName(const std::string & tableName)1585 std::pair<std::string, int> RelationalSyncAbleStorage::GetSharedTargetTableName(const std::string &tableName)
1586 {
1587     std::pair<std::string, int> res = { "", E_OK };
1588     std::shared_ptr<DataBaseSchema> cloudSchema;
1589     (void) GetCloudDbSchema(cloudSchema);
1590     if (cloudSchema == nullptr) {
1591         LOGE("[RelationalSyncAbleStorage] cloud schema is null when get shared target table");
1592         return { "", -E_INTERNAL_ERROR };
1593     }
1594     for (const auto &table : cloudSchema->tables) {
1595         if (CloudStorageUtils::IsSharedTable(table)) {
1596             continue;
1597         }
1598         if (DBCommon::CaseInsensitiveCompare(table.name, tableName)) {
1599             res.first = table.sharedTableName;
1600             break;
1601         }
1602     }
1603     if (res.first.empty()) {
1604         LOGE("[RelationalSyncAbleStorage] not found table in cloud schema");
1605         res.second = -E_SCHEMA_MISMATCH;
1606     }
1607     return res;
1608 }
1609 
SetLogicDelete(bool logicDelete)1610 void RelationalSyncAbleStorage::SetLogicDelete(bool logicDelete)
1611 {
1612     logicDelete_ = logicDelete;
1613     LOGI("[RelationalSyncAbleStorage] set logic delete %d", static_cast<int>(logicDelete));
1614 }
1615 
SetCloudTaskConfig(const CloudTaskConfig & config)1616 void RelationalSyncAbleStorage::SetCloudTaskConfig(const CloudTaskConfig &config)
1617 {
1618     allowLogicDelete_ = config.allowLogicDelete;
1619     LOGD("[RelationalSyncAbleStorage] allow logic delete %d", static_cast<int>(config.allowLogicDelete));
1620 }
1621 
IsCurrentLogicDelete() const1622 bool RelationalSyncAbleStorage::IsCurrentLogicDelete() const
1623 {
1624     return allowLogicDelete_ && logicDelete_;
1625 }
1626 
GetAssetsByGidOrHashKey(const TableSchema & tableSchema,const std::string & gid,const Bytes & hashKey,VBucket & assets)1627 int RelationalSyncAbleStorage::GetAssetsByGidOrHashKey(const TableSchema &tableSchema, const std::string &gid,
1628     const Bytes &hashKey, VBucket &assets)
1629 {
1630     if (gid.empty() && hashKey.empty()) {
1631         LOGE("both gid and hashKey are empty.");
1632         return -E_INVALID_ARGS;
1633     }
1634     if (transactionHandle_ == nullptr) {
1635         LOGE("the transaction has not been started");
1636         return -E_INVALID_DB;
1637     }
1638     int errCode = transactionHandle_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
1639     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1640         LOGE("get assets by gid or hashKey failed. %d", errCode);
1641     }
1642     return errCode;
1643 }
1644 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1645 int RelationalSyncAbleStorage::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1646 {
1647     int errCode = E_OK;
1648     auto *wHandle = GetHandle(true, errCode);
1649     if (wHandle == nullptr) {
1650         return errCode;
1651     }
1652     wHandle->SetIAssetLoader(loader);
1653     ReleaseHandle(wHandle);
1654     return errCode;
1655 }
1656 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1657 int RelationalSyncAbleStorage::UpsertData(RecordStatus status, const std::string &tableName,
1658     const std::vector<VBucket> &records)
1659 {
1660     int errCode = E_OK;
1661     auto *handle = GetHandle(true, errCode);
1662     if (errCode != E_OK) {
1663         return errCode;
1664     }
1665     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::USER);
1666     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::SET_WAIT_COMPENSATED_SYNC);
1667     errCode = UpsertDataInner(handle, tableName, records);
1668     handle->SetPutDataMode(SQLiteSingleVerRelationalStorageExecutor::PutDataMode::SYNC);
1669     handle->SetMarkFlagOption(SQLiteSingleVerRelationalStorageExecutor::MarkFlagOption::DEFAULT);
1670     ReleaseHandle(handle);
1671     return errCode;
1672 }
1673 
UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1674 int RelationalSyncAbleStorage::UpsertDataInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1675     const std::string &tableName, const std::vector<VBucket> &records)
1676 {
1677     int errCode = E_OK;
1678     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1679     if (errCode != E_OK) {
1680         return errCode;
1681     }
1682     errCode = CreateTempSyncTriggerInner(handle, tableName);
1683     if (errCode == E_OK) {
1684         errCode = UpsertDataInTransaction(handle, tableName, records);
1685         (void) handle->ClearAllTempSyncTrigger();
1686     }
1687     if (errCode == E_OK) {
1688         errCode = handle->Commit();
1689         if (errCode != E_OK) {
1690             LOGE("[RDBStorageEngine] commit failed %d when upsert data", errCode);
1691         }
1692     } else {
1693         int ret = handle->Rollback();
1694         if (ret != E_OK) {
1695             LOGW("[RDBStorageEngine] rollback failed %d when upsert data", ret);
1696         }
1697     }
1698     return errCode;
1699 }
1700 
UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const std::vector<VBucket> & records)1701 int RelationalSyncAbleStorage::UpsertDataInTransaction(SQLiteSingleVerRelationalStorageExecutor *handle,
1702     const std::string &tableName, const std::vector<VBucket> &records)
1703 {
1704     TableSchema tableSchema;
1705     int errCode = GetCloudTableSchema(tableName, tableSchema);
1706     if (errCode != E_OK) {
1707         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1708         return errCode;
1709     }
1710     TableInfo localTable = GetSchemaInfo().GetTable(tableName); // for upsert, the table must exist in local
1711     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema, true);
1712     std::set<std::vector<uint8_t>> primaryKeys;
1713     DownloadData downloadData;
1714     for (const auto &record : records) {
1715         DataInfoWithLog dataInfoWithLog;
1716         VBucket assetInfo;
1717         auto [errorCode, hashValue] = CloudStorageUtils::GetHashValueWithPrimaryKeyMap(record,
1718             tableSchema, localTable, pkMap, false);
1719         if (errorCode != E_OK) {
1720             return errorCode;
1721         }
1722         errCode = GetInfoByPrimaryKeyOrGidInner(handle, tableName, record, dataInfoWithLog, assetInfo);
1723         if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1724             return errCode;
1725         }
1726         VBucket recordCopy = record;
1727         if ((errCode == -E_NOT_FOUND ||
1728             (dataInfoWithLog.logInfo.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) != 0) &&
1729             primaryKeys.find(hashValue) == primaryKeys.end()) {
1730             downloadData.opType.push_back(OpType::INSERT);
1731             auto currentTime = TimeHelper::GetSysCurrentTime();
1732             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(currentTime);
1733             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(currentTime);
1734             primaryKeys.insert(hashValue);
1735         } else {
1736             downloadData.opType.push_back(OpType::UPDATE);
1737             recordCopy[CloudDbConstant::GID_FIELD] = dataInfoWithLog.logInfo.cloudGid;
1738             recordCopy[CloudDbConstant::MODIFY_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.timestamp);
1739             recordCopy[CloudDbConstant::CREATE_FIELD] = static_cast<int64_t>(dataInfoWithLog.logInfo.wTimestamp);
1740         }
1741         downloadData.existDataKey.push_back(dataInfoWithLog.logInfo.dataKey);
1742         downloadData.data.push_back(std::move(recordCopy));
1743     }
1744     return PutCloudSyncDataInner(handle, tableName, downloadData);
1745 }
1746 
UpdateRecordFlag(const std::string & tableName,const std::string & gid,bool recordConflict)1747 int RelationalSyncAbleStorage::UpdateRecordFlag(const std::string &tableName, const std::string &gid,
1748     bool recordConflict)
1749 {
1750     if (transactionHandle_ == nullptr) {
1751         LOGE("[RelationalSyncAbleStorage] the transaction has not been started");
1752         return -E_INVALID_DB;
1753     }
1754     if (gid.empty()) {
1755         LOGE("[RelationalSyncAbleStorage] gid is empty.");
1756         return -E_INVALID_ARGS;
1757     }
1758     return transactionHandle_->UpdateRecordFlag(tableName, recordConflict, gid);
1759 }
1760 
FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor * handle,OpType opType,const CloudSyncData & data,bool fillAsset,bool ignoreEmptyGid)1761 int RelationalSyncAbleStorage::FillCloudLogAndAssetInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1762     OpType opType, const CloudSyncData &data, bool fillAsset, bool ignoreEmptyGid)
1763 {
1764     TableSchema tableSchema;
1765     int errCode = GetCloudTableSchema(data.tableName, tableSchema);
1766     if (errCode != E_OK) {
1767         LOGE("get table schema failed when fill log and asset. %d", errCode);
1768         return errCode;
1769     }
1770     errCode = handle->FillHandleWithOpType(opType, data, fillAsset, ignoreEmptyGid, tableSchema);
1771     if (errCode != E_OK) {
1772         return errCode;
1773     }
1774     if (opType == OpType::INSERT) {
1775         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.insData);
1776     } else if (opType == OpType::UPDATE) {
1777         errCode = UpdateRecordFlagAfterUpload(handle, data.tableName, data.updData);
1778     }
1779     return errCode;
1780 }
1781 
UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,const CloudSyncBatch & updateData)1782 int RelationalSyncAbleStorage::UpdateRecordFlagAfterUpload(SQLiteSingleVerRelationalStorageExecutor *handle,
1783     const std::string &tableName, const CloudSyncBatch &updateData)
1784 {
1785     for (size_t i = 0; i < updateData.extend.size(); ++i) {
1786         const auto &record = updateData.extend[i];
1787         if (DBCommon::IsRecordError(record)) {
1788             continue;
1789         }
1790         const auto &rowId = updateData.rowid[i];
1791         int errCode = handle->UpdateRecordFlag(tableName, DBCommon::IsRecordIgnored(record), "", rowId);
1792         if (errCode != E_OK) {
1793             LOGE("[RDBStorage] Update record flag failed in index %zu", i);
1794             return errCode;
1795         }
1796     }
1797     return E_OK;
1798 }
1799 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery)1800 int RelationalSyncAbleStorage::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery)
1801 {
1802     std::vector<TableSchema> tables;
1803     int errCode = GetCloudTableWithoutShared(tables);
1804     if (errCode != E_OK) {
1805         return errCode;
1806     }
1807     if (tables.empty()) {
1808         LOGD("[RDBStorage] Table is empty, no need to compensated sync");
1809         return E_OK;
1810     }
1811     auto *handle = GetHandle(true, errCode);
1812     if (errCode != E_OK) {
1813         return errCode;
1814     }
1815     errCode = GetCompensatedSyncQueryInner(handle, tables, syncQuery);
1816     ReleaseHandle(handle);
1817     return errCode;
1818 }
1819 
GetCloudTableWithoutShared(std::vector<TableSchema> & tables)1820 int RelationalSyncAbleStorage::GetCloudTableWithoutShared(std::vector<TableSchema> &tables)
1821 {
1822     const auto tableInfos = GetSchemaInfo().GetTables();
1823     for (const auto &[tableName, info] : tableInfos) {
1824         if (info.GetSharedTableMark()) {
1825             continue;
1826         }
1827         TableSchema schema;
1828         int errCode = GetCloudTableSchema(tableName, schema);
1829         if (errCode == -E_NOT_FOUND) {
1830             continue;
1831         }
1832         if (errCode != E_OK) {
1833             LOGW("[RDBStorage] Get cloud table failed %d", errCode);
1834             return errCode;
1835         }
1836         tables.push_back(schema);
1837     }
1838     return E_OK;
1839 }
1840 
GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::vector<TableSchema> & tables,std::vector<QuerySyncObject> & syncQuery)1841 int RelationalSyncAbleStorage::GetCompensatedSyncQueryInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1842     const std::vector<TableSchema> &tables, std::vector<QuerySyncObject> &syncQuery)
1843 {
1844     int errCode = E_OK;
1845     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1846     if (errCode != E_OK) {
1847         return errCode;
1848     }
1849     for (const auto &table : tables) {
1850         // check whether reference exist
1851         std::map<std::string, std::vector<TableReferenceProperty>> tableReference;
1852         errCode = RelationalSyncAbleStorage::GetTableReference(table.name, tableReference);
1853         if (errCode != E_OK) {
1854             LOGW("[RelationalSyncAbleStorage] Get table reference failed, continue next! errCode = %d", errCode);
1855             errCode = E_OK;
1856             continue;
1857         }
1858         if (!tableReference.empty()) {
1859             LOGD("[RelationalSyncAbleStorage] current table exist reference property");
1860             continue;
1861         }
1862 
1863         std::vector<VBucket> syncDataPk;
1864         errCode = handle->GetWaitCompensatedSyncDataPk(table, syncDataPk);
1865         if (errCode != E_OK) {
1866             LOGW("[GetWaitCompensatedSyncDataPk] Get wait compensated sync date failed, continue! errCode=%d", errCode);
1867             errCode = E_OK;
1868             continue;
1869         }
1870         if (syncDataPk.empty()) {
1871             // no data need to compensated sync
1872             continue;
1873         }
1874         QuerySyncObject syncObject;
1875         errCode = GetSyncQueryByPk(table.name, syncDataPk, syncObject);
1876         if (errCode != E_OK) {
1877             LOGW("[RDBStorageEngine] Get compensated sync query happen error, ignore it! errCode = %d", errCode);
1878             errCode = E_OK;
1879             continue;
1880         }
1881         syncQuery.push_back(syncObject);
1882     }
1883     if (errCode == E_OK) {
1884         errCode = handle->Commit();
1885         if (errCode != E_OK) {
1886             LOGE("[RDBStorageEngine] commit failed %d when get compensated sync query", errCode);
1887         }
1888     } else {
1889         int ret = handle->Rollback();
1890         if (ret != E_OK) {
1891             LOGW("[RDBStorageEngine] rollback failed %d when get compensated sync query", ret);
1892         }
1893     }
1894     return errCode;
1895 }
1896 
GetSyncQueryByPk(const std::string & tableName,const std::vector<VBucket> & data,QuerySyncObject & querySyncObject)1897 int RelationalSyncAbleStorage::GetSyncQueryByPk(const std::string &tableName,
1898     const std::vector<VBucket> &data, QuerySyncObject &querySyncObject)
1899 {
1900     std::map<std::string, size_t> dataIndex;
1901     std::map<std::string, std::vector<Type>> syncPk;
1902     int ignoreCount = 0;
1903     for (const auto &oneRow : data) {
1904         if (oneRow.size() >= 2u) { // mean this data has more than 2 pk
1905             LOGW("[RDBStorageEngine] Not support compensated sync with composite primary key now");
1906             return -E_NOT_SUPPORT;
1907         }
1908         for (const auto &[col, value] : oneRow) {
1909             if (dataIndex.find(col) == dataIndex.end()) {
1910                 dataIndex[col] = value.index();
1911             } else if (dataIndex[col] != value.index()) {
1912                 ignoreCount++;
1913                 continue;
1914             }
1915             syncPk[col].push_back(value);
1916         }
1917     }
1918     LOGD("[RDBStorageEngine] match %zu data for compensated sync, ignore %d", data.size(), ignoreCount);
1919     Query query = Query::Select().From(tableName);
1920     for (const auto &[col, pkList] : syncPk) {
1921         FillQueryInKeys(col, pkList, dataIndex[col], query);
1922     }
1923     auto objectList = QuerySyncObject::GetQuerySyncObject(query);
1924     if (objectList.size() != 1u) {
1925         return -E_INTERNAL_ERROR;
1926     }
1927     querySyncObject = objectList[0];
1928     return E_OK;
1929 }
1930 
FillQueryInKeys(const std::string & col,const std::vector<Type> & data,size_t valueType,Query & query)1931 void RelationalSyncAbleStorage::FillQueryInKeys(const std::string &col, const std::vector<Type> &data, size_t valueType,
1932     Query &query)
1933 {
1934     switch (valueType) {
1935         case TYPE_INDEX<int64_t>: {
1936             std::vector<int64_t> pkList;
1937             for (const auto &pk : data) {
1938                 pkList.push_back(std::get<int64_t>(pk));
1939             }
1940             query.In(col, pkList);
1941             break;
1942         }
1943         case TYPE_INDEX<std::string>: {
1944             std::vector<std::string> pkList;
1945             for (const auto &pk : data) {
1946                 pkList.push_back(std::get<std::string>(pk));
1947             }
1948             query.In(col, pkList);
1949             break;
1950         }
1951         case TYPE_INDEX<double>: {
1952             std::vector<double> pkList;
1953             for (const auto &pk : data) {
1954                 pkList.push_back(std::get<double>(pk));
1955             }
1956             query.In(col, pkList);
1957             break;
1958         }
1959         case TYPE_INDEX<bool>: {
1960             std::vector<bool> pkList;
1961             for (const auto &pk : data) {
1962                 pkList.push_back(std::get<bool>(pk));
1963             }
1964             query.In(col, pkList);
1965             break;
1966         }
1967         default:
1968             break;
1969     }
1970 }
1971 
CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName)1972 int RelationalSyncAbleStorage::CreateTempSyncTriggerInner(SQLiteSingleVerRelationalStorageExecutor *handle,
1973     const std::string &tableName)
1974 {
1975     TrackerTable trackerTable = storageEngine_->GetTrackerSchema().GetTrackerTable(tableName);
1976     if (trackerTable.IsEmpty()) {
1977         trackerTable.SetTableName(tableName);
1978     }
1979     return handle->CreateTempSyncTrigger(trackerTable);
1980 }
1981 }
1982 #endif