• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_sync_able_storage.h"
17 
18 #include <utility>
19 
20 #include "cloud/cloud_db_constant.h"
21 #include "data_compression.h"
22 #include "db_common.h"
23 #include "db_dfx_adapter.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "platform_specific.h"
26 #include "relational_remote_query_continue_token.h"
27 #include "relational_sync_data_inserter.h"
28 #include "res_finalizer.h"
29 #include "runtime_context.h"
30 
31 namespace DistributedDB {
32 namespace {
TriggerCloseAutoLaunchConn(const RelationalDBProperties & properties)33 void TriggerCloseAutoLaunchConn(const RelationalDBProperties &properties)
34 {
35     static constexpr const char *CLOSE_CONN_TASK = "auto launch close relational connection";
36     (void)RuntimeContext::GetInstance()->ScheduleQueuedTask(
37         std::string(CLOSE_CONN_TASK),
38         [properties] { RuntimeContext::GetInstance()->CloseAutoLaunchConnection(DBTypeInner::DB_RELATION, properties); }
39     );
40 }
41 }
42 
43 #define CHECK_STORAGE_ENGINE do { \
44     if (storageEngine_ == nullptr) { \
45         return -E_INVALID_DB; \
46     } \
47 } while (0)
48 
RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)49 RelationalSyncAbleStorage::RelationalSyncAbleStorage(std::shared_ptr<SQLiteSingleRelationalStorageEngine> engine)
50     : storageEngine_(std::move(engine)),
51       isCachedOption_(false)
52 {}
53 
~RelationalSyncAbleStorage()54 RelationalSyncAbleStorage::~RelationalSyncAbleStorage()
55 {
56     syncAbleEngine_ = nullptr;
57 }
58 
59 // Get interface type of this relational db.
GetInterfaceType() const60 int RelationalSyncAbleStorage::GetInterfaceType() const
61 {
62     return SYNC_RELATION;
63 }
64 
65 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()66 void RelationalSyncAbleStorage::IncRefCount()
67 {
68     LOGD("RelationalSyncAbleStorage ref +1");
69     IncObjRef(this);
70 }
71 
72 // Drop the interface ref-count.
DecRefCount()73 void RelationalSyncAbleStorage::DecRefCount()
74 {
75     LOGD("RelationalSyncAbleStorage ref -1");
76     DecObjRef(this);
77 }
78 
79 // Get the identifier of this rdb.
GetIdentifier() const80 std::vector<uint8_t> RelationalSyncAbleStorage::GetIdentifier() const
81 {
82     std::string identifier = storageEngine_->GetIdentifier();
83     return std::vector<uint8_t>(identifier.begin(), identifier.end());
84 }
85 
GetDualTupleIdentifier() const86 std::vector<uint8_t> RelationalSyncAbleStorage::GetDualTupleIdentifier() const
87 {
88     std::string identifier = storageEngine_->GetProperties().GetStringProp(
89         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
90     std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
91     return identifierVect;
92 }
93 
94 // Get the max timestamp of all entries in database.
GetMaxTimestamp(Timestamp & timestamp) const95 void RelationalSyncAbleStorage::GetMaxTimestamp(Timestamp &timestamp) const
96 {
97     int errCode = E_OK;
98     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
99     if (handle == nullptr) {
100         return;
101     }
102     timestamp = 0;
103     errCode = handle->GetMaxTimestamp(storageEngine_->GetSchema().GetTableNames(), timestamp);
104     if (errCode != E_OK) {
105         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
106         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
107     }
108     ReleaseHandle(handle);
109     return;
110 }
111 
GetMaxTimestamp(const std::string & tableName,Timestamp & timestamp) const112 int RelationalSyncAbleStorage::GetMaxTimestamp(const std::string &tableName, Timestamp &timestamp) const
113 {
114     int errCode = E_OK;
115     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
116     if (handle == nullptr) {
117         return errCode;
118     }
119     timestamp = 0;
120     errCode = handle->GetMaxTimestamp({ tableName }, timestamp);
121     if (errCode != E_OK) {
122         LOGE("GetMaxTimestamp failed, errCode:%d", errCode);
123         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
124     }
125     ReleaseHandle(handle);
126     return errCode;
127 }
128 
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const129 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandle(bool isWrite, int &errCode,
130     OperatePerm perm) const
131 {
132     if (storageEngine_ == nullptr) {
133         errCode = -E_INVALID_DB;
134         return nullptr;
135     }
136     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
137         storageEngine_->FindExecutor(isWrite, perm, errCode));
138     if (handle == nullptr) {
139         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
140     }
141     return handle;
142 }
143 
GetHandleExpectTransaction(bool isWrite,int & errCode,OperatePerm perm) const144 SQLiteSingleVerRelationalStorageExecutor *RelationalSyncAbleStorage::GetHandleExpectTransaction(bool isWrite,
145     int &errCode, OperatePerm perm) const
146 {
147     if (storageEngine_ == nullptr) {
148         errCode = -E_INVALID_DB;
149         return nullptr;
150     }
151     if (transactionHandle_ != nullptr) {
152         return transactionHandle_;
153     }
154     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
155         storageEngine_->FindExecutor(isWrite, perm, errCode));
156     if (errCode != E_OK) {
157         ReleaseHandle(handle);
158         handle = nullptr;
159     }
160     return handle;
161 }
162 
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const163 void RelationalSyncAbleStorage::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
164 {
165     if (storageEngine_ == nullptr) {
166         return;
167     }
168     StorageExecutor *databaseHandle = handle;
169     storageEngine_->Recycle(databaseHandle);
170     std::function<void()> listener = nullptr;
171     {
172         std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
173         listener = heartBeatListener_;
174     }
175     if (listener) {
176         listener();
177     }
178 }
179 
180 // Get meta data associated with the given key.
GetMetaData(const Key & key,Value & value) const181 int RelationalSyncAbleStorage::GetMetaData(const Key &key, Value &value) const
182 {
183     CHECK_STORAGE_ENGINE;
184     if (key.size() > DBConstant::MAX_KEY_SIZE) {
185         return -E_INVALID_ARGS;
186     }
187     int errCode = E_OK;
188     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
189     if (handle == nullptr) {
190         return errCode;
191     }
192     errCode = handle->GetKvData(key, value);
193     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
194         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
195     }
196     ReleaseHandle(handle);
197     return errCode;
198 }
199 
200 // Put meta data as a key-value entry.
PutMetaData(const Key & key,const Value & value)201 int RelationalSyncAbleStorage::PutMetaData(const Key &key, const Value &value)
202 {
203     CHECK_STORAGE_ENGINE;
204     int errCode = E_OK;
205     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
206     if (handle == nullptr) {
207         return errCode;
208     }
209 
210     errCode = handle->PutKvData(key, value); // meta doesn't need time.
211     if (errCode != E_OK) {
212         LOGE("Put kv data err:%d", errCode);
213         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
214     }
215     ReleaseHandle(handle);
216     return errCode;
217 }
218 
219 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)220 int RelationalSyncAbleStorage::DeleteMetaData(const std::vector<Key> &keys)
221 {
222     CHECK_STORAGE_ENGINE;
223     for (const auto &key : keys) {
224         if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
225             return -E_INVALID_ARGS;
226         }
227     }
228     int errCode = E_OK;
229     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
230     if (handle == nullptr) {
231         return errCode;
232     }
233 
234     handle->StartTransaction(TransactType::IMMEDIATE);
235     errCode = handle->DeleteMetaData(keys);
236     if (errCode != E_OK) {
237         handle->Rollback();
238         LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
239         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
240     } else {
241         handle->Commit();
242     }
243     ReleaseHandle(handle);
244     return errCode;
245 }
246 
247 // Delete multiple meta data records with key prefix in a transaction.
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const248 int RelationalSyncAbleStorage::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
249 {
250     CHECK_STORAGE_ENGINE;
251     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
252         return -E_INVALID_ARGS;
253     }
254 
255     int errCode = E_OK;
256     auto handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
257     if (handle == nullptr) {
258         return errCode;
259     }
260 
261     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
262     if (errCode != E_OK) {
263         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
264         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
265     }
266     ReleaseHandle(handle);
267     return errCode;
268 }
269 
270 // Get all meta data keys.
GetAllMetaKeys(std::vector<Key> & keys) const271 int RelationalSyncAbleStorage::GetAllMetaKeys(std::vector<Key> &keys) const
272 {
273     CHECK_STORAGE_ENGINE;
274     int errCode = E_OK;
275     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
276     if (handle == nullptr) {
277         return errCode;
278     }
279 
280     errCode = handle->GetAllMetaKeys(keys);
281     if (errCode != E_OK) {
282         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
283     }
284     ReleaseHandle(handle);
285     return errCode;
286 }
287 
GetDbProperties() const288 const RelationalDBProperties &RelationalSyncAbleStorage::GetDbProperties() const
289 {
290     return storageEngine_->GetProperties();
291 }
292 
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)293 static int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
294 {
295     int errCode = E_OK;
296     for (auto &item : dataItems) {
297         auto entry = new (std::nothrow) GenericSingleVerKvEntry();
298         if (entry == nullptr) {
299             errCode = -E_OUT_OF_MEMORY;
300             LOGE("GetKvEntries failed, errCode:%d", errCode);
301             SingleVerKvEntry::Release(entries);
302             break;
303         }
304         entry->SetEntryData(std::move(item));
305         entries.push_back(entry);
306     }
307     return errCode;
308 }
309 
GetDataItemSerialSize(const DataItem & item,size_t appendLen)310 static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen)
311 {
312     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
313     // the size would not be very large.
314     static const size_t maxOrigDevLength = 40;
315     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
316     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
317                        Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
318     return dataSize;
319 }
320 
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)321 static bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
322     size_t appendLen)
323 {
324     bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize);
325     for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) {
326         blockSize += GetDataItemSerialSize(dataItems[i], appendLen);
327         reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
328     }
329     return !reachThreshold;
330 }
331 
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerRelationalContinueToken * & token)332 static void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
333     SQLiteSingleVerRelationalContinueToken *&token)
334 {
335     if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
336         delete token;
337         token = nullptr;
338         return;
339     }
340 
341     if (dataItems.empty()) {
342         errCode = -E_INTERNAL_ERROR;
343         LOGE("Get data unfinished but data items is empty.");
344         delete token;
345         token = nullptr;
346         return;
347     }
348     token->SetNextBeginTime(dataItems.back());
349     token->UpdateNextSyncOffset(dataItems.size());
350 }
351 
352 /**
353  * Caller must ensure that parameter token is valid.
354  * If error happened, token will be deleted here.
355  */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerRelationalContinueToken * & token,const DataSizeSpecInfo & dataSizeInfo) const356 int RelationalSyncAbleStorage::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
357     SQLiteSingleVerRelationalContinueToken *&token, const DataSizeSpecInfo &dataSizeInfo) const
358 {
359     if (storageEngine_ == nullptr) {
360         return -E_INVALID_DB;
361     }
362 
363     int errCode = E_OK;
364     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(storageEngine_->FindExecutor(false,
365         OperatePerm::NORMAL_PERM, errCode));
366     if (handle == nullptr) {
367         goto ERROR;
368     }
369 
370     do {
371         errCode = handle->GetSyncDataByQuery(dataItems,
372             Parcel::GetAppendedLen(),
373             dataSizeInfo,
374             std::bind(&SQLiteSingleVerRelationalContinueToken::GetStatement, *token,
375                 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4),
376             storageEngine_->GetSchema().GetTable(token->GetQuery().GetTableName()));
377         if (errCode == -E_FINISHED) {
378             token->FinishGetData();
379             errCode = token->IsGetAllDataFinished() ? E_OK : -E_UNFINISHED;
380         }
381     } while (errCode == -E_UNFINISHED && CanHoldDeletedData(dataItems, dataSizeInfo, Parcel::GetAppendedLen()));
382 
383 ERROR:
384     if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
385         dataItems.clear();
386     }
387     ProcessContinueTokenForQuerySync(dataItems, errCode, token);
388     ReleaseHandle(handle);
389     return errCode;
390 }
391 
392 // use kv struct data to sync
393 // Get the data which would be synced with query condition
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const394 int RelationalSyncAbleStorage::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
395     const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
396     std::vector<SingleVerKvEntry *> &entries) const
397 {
398     if (!timeRange.IsValid()) {
399         return -E_INVALID_ARGS;
400     }
401     query.SetSchema(storageEngine_->GetSchema());
402     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(timeRange, query);
403     if (token == nullptr) {
404         LOGE("[SingleVerNStore] Allocate continue token failed.");
405         return -E_OUT_OF_MEMORY;
406     }
407 
408     continueStmtToken = static_cast<ContinueToken>(token);
409     return GetSyncDataNext(entries, continueStmtToken, dataSizeInfo);
410 }
411 
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const412 int RelationalSyncAbleStorage::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
413     ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
414 {
415     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
416     if (!token->CheckValid()) {
417         return -E_INVALID_ARGS;
418     }
419     RelationalSchemaObject schema = storageEngine_->GetSchema();
420     const auto fieldInfos = schema.GetTable(token->GetQuery().GetTableName()).GetFieldInfos();
421     std::vector<std::string> fieldNames;
422     fieldNames.reserve(fieldInfos.size());
423     for (const auto &fieldInfo : fieldInfos) { // order by cid
424         fieldNames.push_back(fieldInfo.GetFieldName());
425     }
426     token->SetFieldNames(fieldNames);
427 
428     std::vector<DataItem> dataItems;
429     int errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
430     if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
431         continueStmtToken = static_cast<ContinueToken>(token);
432         return errCode;
433     }
434 
435     int innerCode = GetKvEntriesByDataItems(entries, dataItems);
436     if (innerCode != E_OK) {
437         errCode = innerCode;
438         delete token;
439         token = nullptr;
440     }
441     continueStmtToken = static_cast<ContinueToken>(token);
442     return errCode;
443 }
444 
445 namespace {
ConvertEntries(std::vector<SingleVerKvEntry * > entries)446 std::vector<DataItem> ConvertEntries(std::vector<SingleVerKvEntry *> entries)
447 {
448     std::vector<DataItem> dataItems;
449     for (const auto &itemEntry : entries) {
450         GenericSingleVerKvEntry *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
451         if (entry != nullptr) {
452             DataItem item;
453             item.origDev = entry->GetOrigDevice();
454             item.flag = entry->GetFlag();
455             item.timestamp = entry->GetTimestamp();
456             item.writeTimestamp = entry->GetWriteTimestamp();
457             entry->GetKey(item.key);
458             entry->GetValue(item.value);
459             entry->GetHashKey(item.hashKey);
460             dataItems.push_back(item);
461         }
462     }
463     return dataItems;
464 }
465 }
466 
PutSyncDataWithQuery(const QueryObject & object,const std::vector<SingleVerKvEntry * > & entries,const DeviceID & deviceName)467 int RelationalSyncAbleStorage::PutSyncDataWithQuery(const QueryObject &object,
468     const std::vector<SingleVerKvEntry *> &entries, const DeviceID &deviceName)
469 {
470     std::vector<DataItem> dataItems = ConvertEntries(entries);
471     return PutSyncData(object, dataItems, deviceName);
472 }
473 
474 namespace {
GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)475 inline DistributedTableMode GetCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
476 {
477     return static_cast<DistributedTableMode>(engine->GetProperties().GetIntProp(
478         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
479 }
480 
IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> & engine)481 inline bool IsCollaborationMode(const std::shared_ptr<SQLiteSingleRelationalStorageEngine> &engine)
482 {
483     return GetCollaborationMode(engine) == DistributedTableMode::COLLABORATION;
484 }
485 }
486 
SaveSyncDataItems(const QueryObject & object,std::vector<DataItem> & dataItems,const std::string & deviceName)487 int RelationalSyncAbleStorage::SaveSyncDataItems(const QueryObject &object, std::vector<DataItem> &dataItems,
488     const std::string &deviceName)
489 {
490     int errCode = E_OK;
491     LOGD("[RelationalSyncAbleStorage::SaveSyncDataItems] Get write handle.");
492     QueryObject query = object;
493     query.SetSchema(storageEngine_->GetSchema());
494 
495     RelationalSchemaObject remoteSchema;
496     errCode = GetRemoteDeviceSchema(deviceName, remoteSchema);
497     if (errCode != E_OK) {
498         LOGE("Find remote schema failed. err=%d", errCode);
499         return errCode;
500     }
501 
502     StoreInfo info = {
503         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
504         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
505         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
506     };
507     auto inserter = RelationalSyncDataInserter::CreateInserter(deviceName, query, storageEngine_->GetSchema(),
508         remoteSchema.GetTable(query.GetTableName()).GetFieldInfos(), info);
509     inserter.SetEntries(dataItems);
510 
511     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
512     if (handle == nullptr) {
513         return errCode;
514     }
515 
516     DBDfxAdapter::StartTraceSQL();
517 
518     errCode = handle->SaveSyncItems(inserter);
519 
520     DBDfxAdapter::FinishTraceSQL();
521     if (errCode == E_OK) {
522         // dataItems size > 0 now because already check before
523         // all dataItems will write into db now, so need to observer notify here
524         // if some dataItems will not write into db in the future, observer notify here need change
525         ChangedData data;
526         TriggerObserverAction(deviceName, std::move(data), false);
527     }
528 
529     ReleaseHandle(handle);
530     return errCode;
531 }
532 
PutSyncData(const QueryObject & query,std::vector<DataItem> & dataItems,const std::string & deviceName)533 int RelationalSyncAbleStorage::PutSyncData(const QueryObject &query, std::vector<DataItem> &dataItems,
534     const std::string &deviceName)
535 {
536     if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
537         LOGW("Device length is invalid for sync put");
538         return -E_INVALID_ARGS;
539     }
540 
541     int errCode = SaveSyncDataItems(query, dataItems, deviceName); // Currently true to check value content
542     if (errCode != E_OK) {
543         LOGE("[Relational] PutSyncData errCode:%d", errCode);
544         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
545     }
546     return errCode;
547 }
548 
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)549 int RelationalSyncAbleStorage::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
550 {
551     (void) deviceName;
552     (void) isNeedNotify;
553     return -E_NOT_SUPPORT;
554 }
555 
GetSchemaInfo() const556 RelationalSchemaObject RelationalSyncAbleStorage::GetSchemaInfo() const
557 {
558     return storageEngine_->GetSchema();
559 }
560 
GetSecurityOption(SecurityOption & option) const561 int RelationalSyncAbleStorage::GetSecurityOption(SecurityOption &option) const
562 {
563     std::lock_guard<std::mutex> autoLock(securityOptionMutex_);
564     if (isCachedOption_) {
565         option = securityOption_;
566         return E_OK;
567     }
568     std::string dbPath = storageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
569     int errCode = RuntimeContext::GetInstance()->GetSecurityOption(dbPath, securityOption_);
570     if (errCode == E_OK) {
571         option = securityOption_;
572         isCachedOption_ = true;
573     }
574     return errCode;
575 }
576 
NotifyRemotePushFinished(const std::string & deviceId) const577 void RelationalSyncAbleStorage::NotifyRemotePushFinished(const std::string &deviceId) const
578 {
579     return;
580 }
581 
582 // Get the timestamp when database created or imported
GetDatabaseCreateTimestamp(Timestamp & outTime) const583 int RelationalSyncAbleStorage::GetDatabaseCreateTimestamp(Timestamp &outTime) const
584 {
585     return OS::GetCurrentSysTimeInMicrosecond(outTime);
586 }
587 
GetTablesQuery()588 std::vector<QuerySyncObject> RelationalSyncAbleStorage::GetTablesQuery()
589 {
590     auto tableNames = storageEngine_->GetSchema().GetTableNames();
591     std::vector<QuerySyncObject> queries;
592     queries.reserve(tableNames.size());
593     for (const auto &it : tableNames) {
594         queries.emplace_back(Query::Select(it));
595     }
596     return queries;
597 }
598 
LocalDataChanged(int notifyEvent,std::vector<QuerySyncObject> & queryObj)599 int RelationalSyncAbleStorage::LocalDataChanged(int notifyEvent, std::vector<QuerySyncObject> &queryObj)
600 {
601     (void) queryObj;
602     return -E_NOT_SUPPORT;
603 }
604 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const605 int RelationalSyncAbleStorage::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
606     const std::string &targetID) const
607 {
608     return E_OK;
609 }
610 
CreateDistributedDeviceTable(const std::string & device,const RelationalSyncStrategy & syncStrategy)611 int RelationalSyncAbleStorage::CreateDistributedDeviceTable(const std::string &device,
612     const RelationalSyncStrategy &syncStrategy)
613 {
614     auto mode = storageEngine_->GetProperties().GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE,
615         DistributedTableMode::SPLIT_BY_DEVICE);
616     if (mode != DistributedTableMode::SPLIT_BY_DEVICE) {
617         LOGD("No need create device table in COLLABORATION mode.");
618         return E_OK;
619     }
620 
621     int errCode = E_OK;
622     auto *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
623     if (handle == nullptr) {
624         return errCode;
625     }
626 
627     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
628     if (errCode != E_OK) {
629         LOGE("Start transaction failed:%d", errCode);
630         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
631         ReleaseHandle(handle);
632         return errCode;
633     }
634 
635     StoreInfo info = {
636         storageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
637         storageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
638         storageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "")
639     };
640     for (const auto &[table, strategy] : syncStrategy) {
641         if (!strategy.permitSync) {
642             continue;
643         }
644 
645         errCode = handle->CreateDistributedDeviceTable(device, storageEngine_->GetSchema().GetTable(table), info);
646         if (errCode != E_OK) {
647             LOGE("Create distributed device table failed. %d", errCode);
648             break;
649         }
650     }
651 
652     if (errCode == E_OK) {
653         errCode = handle->Commit();
654     } else {
655         (void)handle->Rollback();
656     }
657 
658     ReleaseHandle(handle);
659     return errCode;
660 }
661 
RegisterSchemaChangedCallback(const std::function<void ()> & callback)662 int RelationalSyncAbleStorage::RegisterSchemaChangedCallback(const std::function<void()> &callback)
663 {
664     std::lock_guard lock(onSchemaChangedMutex_);
665     onSchemaChanged_ = callback;
666     return E_OK;
667 }
668 
NotifySchemaChanged()669 void RelationalSyncAbleStorage::NotifySchemaChanged()
670 {
671     std::lock_guard lock(onSchemaChangedMutex_);
672     if (onSchemaChanged_) {
673         LOGD("Notify relational schema was changed");
674         onSchemaChanged_();
675     }
676 }
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const677 int RelationalSyncAbleStorage::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
678 {
679     algorithmSet.clear();
680     DataCompression::GetCompressionAlgo(algorithmSet);
681     return E_OK;
682 }
683 
RegisterObserverAction(uint64_t connectionId,const RelationalObserverAction & action)684 void RelationalSyncAbleStorage::RegisterObserverAction(uint64_t connectionId, const RelationalObserverAction &action)
685 {
686     std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
687     dataChangeCallbackMap_[connectionId] = action;
688 }
689 
TriggerObserverAction(const std::string & deviceName,ChangedData && changedData,bool isChangedData)690 void RelationalSyncAbleStorage::TriggerObserverAction(const std::string &deviceName,
691     ChangedData &&changedData, bool isChangedData)
692 {
693     IncObjRef(this);
694     int taskErrCode =
695         RuntimeContext::GetInstance()->ScheduleTask([this, deviceName, changedData, isChangedData] () mutable {
696         std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
697         if (!dataChangeCallbackMap_.empty()) {
698             auto it = dataChangeCallbackMap_.rbegin(); // call the last valid observer
699             if (it->second != nullptr) {
700                 it->second(deviceName, std::move(changedData), isChangedData);
701             }
702         }
703         DecObjRef(this);
704     });
705     if (taskErrCode != E_OK) {
706         LOGE("TriggerObserverAction scheduletask retCode=%d", taskErrCode);
707         DecObjRef(this);
708     }
709 }
710 
RegisterHeartBeatListener(const std::function<void ()> & listener)711 void RelationalSyncAbleStorage::RegisterHeartBeatListener(const std::function<void()> &listener)
712 {
713     std::lock_guard<std::mutex> autoLock(heartBeatMutex_);
714     heartBeatListener_ = listener;
715 }
716 
CheckAndInitQueryCondition(QueryObject & query) const717 int RelationalSyncAbleStorage::CheckAndInitQueryCondition(QueryObject &query) const
718 {
719     RelationalSchemaObject schema = storageEngine_->GetSchema();
720     TableInfo table = schema.GetTable(query.GetTableName());
721     if (!table.IsValid()) {
722         LOGE("Query table is not a distributed table.");
723         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
724     }
725     if (table.GetTableSyncType() == CLOUD_COOPERATION) {
726         LOGE("cloud table mode is not support");
727         return -E_NOT_SUPPORT;
728     }
729     query.SetSchema(schema);
730 
731     int errCode = E_OK;
732     auto *handle = GetHandle(true, errCode);
733     if (handle == nullptr) {
734         return errCode;
735     }
736 
737     errCode = handle->CheckQueryObjectLegal(table, query, schema.GetSchemaVersion());
738     if (errCode != E_OK) {
739         LOGE("Check relational query condition failed. %d", errCode);
740         TriggerCloseAutoLaunchConn(storageEngine_->GetProperties());
741     }
742 
743     ReleaseHandle(handle);
744     return errCode;
745 }
746 
CheckCompatible(const std::string & schema,uint8_t type) const747 bool RelationalSyncAbleStorage::CheckCompatible(const std::string &schema, uint8_t type) const
748 {
749     // return true if is relational schema.
750     return !schema.empty() && ReadSchemaType(type) == SchemaType::RELATIVE;
751 }
752 
GetRemoteQueryData(const PreparedStmt & prepStmt,size_t packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data) const753 int RelationalSyncAbleStorage::GetRemoteQueryData(const PreparedStmt &prepStmt, size_t packetSize,
754     std::vector<std::string> &colNames, std::vector<RelationalRowData *> &data) const
755 {
756     if (IsCollaborationMode(storageEngine_) || !storageEngine_->GetSchema().IsSchemaValid()) {
757         return -E_NOT_SUPPORT;
758     }
759     if (prepStmt.GetOpCode() != PreparedStmt::ExecutorOperation::QUERY || !prepStmt.IsValid()) {
760         LOGE("[ExecuteQuery] invalid args");
761         return -E_INVALID_ARGS;
762     }
763     int errCode = E_OK;
764     auto handle = GetHandle(false, errCode, OperatePerm::NORMAL_PERM);
765     if (handle == nullptr) {
766         LOGE("[ExecuteQuery] get handle fail:%d", errCode);
767         return errCode;
768     }
769     errCode = handle->ExecuteQueryBySqlStmt(prepStmt.GetSql(), prepStmt.GetBindArgs(), packetSize, colNames, data);
770     if (errCode != E_OK) {
771         LOGE("[ExecuteQuery] ExecuteQueryBySqlStmt failed:%d", errCode);
772     }
773     ReleaseHandle(handle);
774     return errCode;
775 }
776 
ExecuteQuery(const PreparedStmt & prepStmt,size_t packetSize,RelationalRowDataSet & dataSet,ContinueToken & token) const777 int RelationalSyncAbleStorage::ExecuteQuery(const PreparedStmt &prepStmt, size_t packetSize,
778     RelationalRowDataSet &dataSet, ContinueToken &token) const
779 {
780     dataSet.Clear();
781     if (token == nullptr) {
782         // start query
783         std::vector<std::string> colNames;
784         std::vector<RelationalRowData *> data;
785         ResFinalizer finalizer([&data] { RelationalRowData::Release(data); });
786 
787         int errCode = GetRemoteQueryData(prepStmt, packetSize, colNames, data);
788         if (errCode != E_OK) {
789             return errCode;
790         }
791 
792         // create one token
793         token = static_cast<ContinueToken>(
794             new (std::nothrow) RelationalRemoteQueryContinueToken(std::move(colNames), std::move(data)));
795         if (token == nullptr) {
796             LOGE("ExecuteQuery OOM");
797             return -E_OUT_OF_MEMORY;
798         }
799     }
800 
801     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
802     if (!remoteToken->CheckValid()) {
803         LOGE("ExecuteQuery invalid token");
804         return -E_INVALID_ARGS;
805     }
806 
807     int errCode = remoteToken->GetData(packetSize, dataSet);
808     if (errCode == -E_UNFINISHED) {
809         errCode = E_OK;
810     } else {
811         if (errCode != E_OK) {
812             dataSet.Clear();
813         }
814         delete remoteToken;
815         remoteToken = nullptr;
816         token = nullptr;
817     }
818     LOGI("ExecuteQuery finished, errCode:%d, size:%d", errCode, dataSet.GetSize());
819     return errCode;
820 }
821 
SaveRemoteDeviceSchema(const std::string & deviceId,const std::string & remoteSchema,uint8_t type)822 int RelationalSyncAbleStorage::SaveRemoteDeviceSchema(const std::string &deviceId, const std::string &remoteSchema,
823     uint8_t type)
824 {
825     if (ReadSchemaType(type) != SchemaType::RELATIVE) {
826         return -E_INVALID_ARGS;
827     }
828 
829     RelationalSchemaObject schemaObj;
830     int errCode = schemaObj.ParseFromSchemaString(remoteSchema);
831     if (errCode != E_OK) {
832         LOGE("Parse remote schema failed. err=%d", errCode);
833         return errCode;
834     }
835 
836     std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
837     Key remoteSchemaKey(keyStr.begin(), keyStr.end());
838     Value remoteSchemaBuff(remoteSchema.begin(), remoteSchema.end());
839     errCode = PutMetaData(remoteSchemaKey, remoteSchemaBuff);
840     if (errCode != E_OK) {
841         LOGE("Save remote schema failed. err=%d", errCode);
842         return errCode;
843     }
844 
845     return remoteDeviceSchema_.Put(deviceId, remoteSchema);
846 }
847 
GetRemoteDeviceSchema(const std::string & deviceId,RelationalSchemaObject & schemaObj)848 int RelationalSyncAbleStorage::GetRemoteDeviceSchema(const std::string &deviceId, RelationalSchemaObject &schemaObj)
849 {
850     if (schemaObj.IsSchemaValid()) {
851         return -E_INVALID_ARGS;
852     }
853 
854     std::string remoteSchema;
855     int errCode = remoteDeviceSchema_.Get(deviceId, remoteSchema);
856     if (errCode == -E_NOT_FOUND) {
857         LOGW("Get remote device schema miss cached.");
858         std::string keyStr = DBConstant::REMOTE_DEVICE_SCHEMA_KEY_PREFIX + DBCommon::TransferHashString(deviceId);
859         Key remoteSchemaKey(keyStr.begin(), keyStr.end());
860         Value remoteSchemaBuff;
861         errCode = GetMetaData(remoteSchemaKey, remoteSchemaBuff);
862         if (errCode != E_OK) {
863             LOGE("Get remote device schema from meta failed. err=%d", errCode);
864             return errCode;
865         }
866         remoteSchema = std::string(remoteSchemaBuff.begin(), remoteSchemaBuff.end());
867         errCode = remoteDeviceSchema_.Put(deviceId, remoteSchema);
868     }
869 
870     if (errCode != E_OK) {
871         LOGE("Get remote device schema failed. err=%d", errCode);
872         return errCode;
873     }
874 
875     errCode = schemaObj.ParseFromSchemaString(remoteSchema);
876     if (errCode != E_OK) {
877         LOGE("Parse remote schema failed. err=%d", errCode);
878     }
879     return errCode;
880 }
881 
ReleaseRemoteQueryContinueToken(ContinueToken & token) const882 void RelationalSyncAbleStorage::ReleaseRemoteQueryContinueToken(ContinueToken &token) const
883 {
884     auto remoteToken = static_cast<RelationalRemoteQueryContinueToken *>(token);
885     delete remoteToken;
886     remoteToken = nullptr;
887     token = nullptr;
888 }
889 
StartTransaction(TransactType type)890 int RelationalSyncAbleStorage::StartTransaction(TransactType type)
891 {
892     CHECK_STORAGE_ENGINE;
893     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
894     if (transactionHandle_ != nullptr) {
895         LOGD("Transaction started already.");
896         return -E_TRANSACT_STATE;
897     }
898     int errCode = E_OK;
899     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
900         storageEngine_->FindExecutor(type == TransactType::IMMEDIATE, OperatePerm::NORMAL_PERM, errCode));
901     if (handle == nullptr) {
902         ReleaseHandle(handle);
903         return errCode;
904     }
905     errCode = handle->StartTransaction(type);
906     if (errCode != E_OK) {
907         ReleaseHandle(handle);
908         return errCode;
909     }
910     transactionHandle_ = handle;
911     return errCode;
912 }
913 
Commit()914 int RelationalSyncAbleStorage::Commit()
915 {
916     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
917     if (transactionHandle_ == nullptr) {
918         LOGE("relation database is null or the transaction has not been started");
919         return -E_INVALID_DB;
920     }
921     int errCode = transactionHandle_->Commit();
922     ReleaseHandle(transactionHandle_);
923     transactionHandle_ = nullptr;
924     LOGD("connection commit transaction!");
925     return errCode;
926 }
927 
Rollback()928 int RelationalSyncAbleStorage::Rollback()
929 {
930     std::unique_lock<std::shared_mutex> lock(transactionMutex_);
931     if (transactionHandle_ == nullptr) {
932         LOGE("Invalid handle for rollback or the transaction has not been started.");
933         return -E_INVALID_DB;
934     }
935 
936     int errCode = transactionHandle_->Rollback();
937     ReleaseHandle(transactionHandle_);
938     transactionHandle_ = nullptr;
939     LOGI("connection rollback transaction!");
940     return errCode;
941 }
942 
GetUploadCount(const std::string & tableName,const Timestamp & timestamp,const bool isCloudForcePush,int64_t & count)943 int RelationalSyncAbleStorage::GetUploadCount(const std::string &tableName, const Timestamp &timestamp,
944     const bool isCloudForcePush, int64_t &count)
945 {
946     int errCode = E_OK;
947     auto *handle = GetHandleExpectTransaction(false, errCode);
948     if (handle == nullptr) {
949         return errCode;
950     }
951     errCode = handle->GetUploadCount(tableName, timestamp, isCloudForcePush, count);
952     if (transactionHandle_ == nullptr) {
953         ReleaseHandle(handle);
954     }
955     return errCode;
956 }
957 
FillCloudGid(const CloudSyncData & data)958 int RelationalSyncAbleStorage::FillCloudGid(const CloudSyncData &data)
959 {
960     if (storageEngine_ == nullptr) {
961         return -E_INVALID_DB;
962     }
963     int errCode = E_OK;
964     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
965         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
966     if (writeHandle == nullptr) {
967         return errCode;
968     }
969     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
970     if (errCode != E_OK) {
971         ReleaseHandle(writeHandle);
972         return errCode;
973     }
974     errCode = writeHandle->UpdateCloudLogGid(data);
975     if (errCode != E_OK) {
976         writeHandle->Rollback();
977         ReleaseHandle(writeHandle);
978         return errCode;
979     }
980     errCode = writeHandle->Commit();
981     ReleaseHandle(writeHandle);
982     return errCode;
983 }
984 
GetCloudData(const TableSchema & tableSchema,const Timestamp & beginTime,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)985 int RelationalSyncAbleStorage::GetCloudData(const TableSchema &tableSchema, const Timestamp &beginTime,
986     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
987 {
988     if (transactionHandle_ == nullptr) {
989         LOGE(" the transaction has not been started");
990         return -E_INVALID_DB;
991     }
992     SyncTimeRange syncTimeRange = { .beginTime = beginTime };
993     QueryObject queryObject;
994     queryObject.SetTableName(tableSchema.name);
995     auto token = new (std::nothrow) SQLiteSingleVerRelationalContinueToken(syncTimeRange, queryObject);
996     if (token == nullptr) {
997         LOGE("[SingleVerNStore] Allocate continue token failed.");
998         return -E_OUT_OF_MEMORY;
999     }
1000     token->SetCloudTableSchema(tableSchema);
1001     continueStmtToken = static_cast<ContinueToken>(token);
1002     return GetCloudDataNext(continueStmtToken, cloudDataResult);
1003 }
1004 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)1005 int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken,
1006     CloudSyncData &cloudDataResult)
1007 {
1008     if (continueStmtToken == nullptr) {
1009         return -E_INVALID_ARGS;
1010     }
1011     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1012     if (!token->CheckValid()) {
1013         return -E_INVALID_ARGS;
1014     }
1015     if (transactionHandle_ == nullptr) {
1016         LOGE("the transaction has not been started, release the token");
1017         ReleaseCloudDataToken(continueStmtToken);
1018         return -E_INVALID_DB;
1019     }
1020     int errCode = transactionHandle_->GetSyncCloudData(cloudDataResult, CloudDbConstant::MAX_UPLOAD_SIZE, *token);
1021     if (errCode != -E_UNFINISHED) {
1022         delete token;
1023         token = nullptr;
1024     }
1025     continueStmtToken = static_cast<ContinueToken>(token);
1026     return errCode;
1027 }
1028 
ReleaseCloudDataToken(ContinueToken & continueStmtToken)1029 int RelationalSyncAbleStorage::ReleaseCloudDataToken(ContinueToken &continueStmtToken)
1030 {
1031     if (continueStmtToken == nullptr) {
1032         return E_OK;
1033     }
1034     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1035     if (!token->CheckValid()) {
1036         return E_OK;
1037     }
1038     int errCode = token->ReleaseCloudStatement();
1039     delete token;
1040     token = nullptr;
1041     return errCode;
1042 }
1043 
ChkSchema(const TableName & tableName)1044 int RelationalSyncAbleStorage::ChkSchema(const TableName &tableName)
1045 {
1046     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1047     RelationalSchemaObject localSchema = GetSchemaInfo();
1048     return schemaMgr_.ChkSchema(tableName, localSchema);
1049 }
1050 
SetCloudDbSchema(const DataBaseSchema & schema)1051 int RelationalSyncAbleStorage::SetCloudDbSchema(const DataBaseSchema &schema)
1052 {
1053     std::unique_lock<std::shared_mutex> writeLock(schemaMgrMutex_);
1054     schemaMgr_.SetCloudDbSchema(schema);
1055     return E_OK;
1056 }
1057 
GetCloudDbSchema(DataBaseSchema & cloudSchema)1058 int RelationalSyncAbleStorage::GetCloudDbSchema(DataBaseSchema &cloudSchema)
1059 {
1060     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1061     cloudSchema = *(schemaMgr_.GetCloudDbSchema());
1062     return E_OK;
1063 }
1064 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1065 int RelationalSyncAbleStorage::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
1066     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1067 {
1068     if (transactionHandle_ == nullptr) {
1069         LOGE(" the transaction has not been started");
1070         return -E_INVALID_DB;
1071     }
1072 
1073     TableSchema tableSchema;
1074     int errCode = GetCloudTableSchema(tableName, tableSchema);
1075     if (errCode != E_OK) {
1076         LOGE("Get cloud schema failed when query log for cloud sync, %d", errCode);
1077         return errCode;
1078     }
1079     return transactionHandle_->GetInfoByPrimaryKeyOrGid(tableSchema, vBucket, dataInfoWithLog, assetInfo);
1080 }
1081 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)1082 int RelationalSyncAbleStorage::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
1083 {
1084     if (transactionHandle_ == nullptr) {
1085         LOGE(" the transaction has not been started");
1086         return -E_INVALID_DB;
1087     }
1088 
1089     TableSchema tableSchema;
1090     int errCode = GetCloudTableSchema(tableName, tableSchema);
1091     if (errCode != E_OK) {
1092         LOGE("Get cloud schema failed when save cloud data, %d", errCode);
1093         return errCode;
1094     }
1095     return transactionHandle_->PutCloudSyncData(tableName, tableSchema, downloadData);
1096 }
1097 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1098 int RelationalSyncAbleStorage::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1099     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
1100 {
1101     if (transactionHandle_ == nullptr) {
1102         LOGE("the transaction has not been started");
1103         return -E_INVALID_DB;
1104     }
1105 
1106     return transactionHandle_->DoCleanInner(mode, tableNameList, localSchema, assets);
1107 }
1108 
GetCloudTableSchema(const TableName & tableName,TableSchema & tableSchema)1109 int RelationalSyncAbleStorage::GetCloudTableSchema(const TableName &tableName, TableSchema &tableSchema)
1110 {
1111     std::shared_lock<std::shared_mutex> readLock(schemaMgrMutex_);
1112     return schemaMgr_.GetCloudTableSchema(tableName, tableSchema);
1113 }
1114 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)1115 int RelationalSyncAbleStorage::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset,
1116     bool isDownloadSuccess)
1117 {
1118     if (storageEngine_ == nullptr) {
1119         return -E_INVALID_DB;
1120     }
1121     if (transactionHandle_ == nullptr) {
1122         LOGE("the transaction has not been started when fill asset for download.");
1123         return -E_INVALID_DB;
1124     }
1125     TableSchema tableSchema;
1126     int errCode = GetCloudTableSchema(tableName, tableSchema);
1127     if (errCode != E_OK) {
1128         LOGE("Get cloud schema failed when fill cloud asset, %d", errCode);
1129         return errCode;
1130     }
1131     errCode = transactionHandle_->FillCloudAssetForDownload(tableSchema, asset, isDownloadSuccess);
1132     if (errCode != E_OK) {
1133         LOGE("fill cloud asset for download failed.%d", errCode);
1134     }
1135     return errCode;
1136 }
1137 
SetLogTriggerStatus(bool status)1138 int RelationalSyncAbleStorage::SetLogTriggerStatus(bool status)
1139 {
1140     int errCode = E_OK;
1141     auto *handle = GetHandleExpectTransaction(false, errCode);
1142     if (handle == nullptr) {
1143         return errCode;
1144     }
1145     errCode = handle->SetLogTriggerStatus(status);
1146     if (transactionHandle_ == nullptr) {
1147         ReleaseHandle(handle);
1148     }
1149     return errCode;
1150 }
1151 
FillCloudGidAndAsset(const OpType opType,const CloudSyncData & data)1152 int RelationalSyncAbleStorage::FillCloudGidAndAsset(const OpType opType, const CloudSyncData &data)
1153 {
1154     CHECK_STORAGE_ENGINE;
1155     if (opType == OpType::UPDATE && data.updData.assets.empty()) {
1156         return E_OK;
1157     }
1158     int errCode = E_OK;
1159     auto writeHandle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
1160         storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
1161     if (writeHandle == nullptr) {
1162         return errCode;
1163     }
1164     errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE);
1165     if (errCode != E_OK) {
1166         ReleaseHandle(writeHandle);
1167         return errCode;
1168     }
1169     if (opType == OpType::INSERT) {
1170         errCode = writeHandle->UpdateCloudLogGid(data);
1171         if (errCode != E_OK) {
1172             LOGE("Failed to fill cloud log gid, %d.", errCode);
1173             writeHandle->Rollback();
1174             ReleaseHandle(writeHandle);
1175             return errCode;
1176         }
1177         if (!data.insData.assets.empty()) {
1178             errCode = writeHandle->FillCloudAssetForUpload(data.tableName, data.insData);
1179         }
1180     } else {
1181         errCode = writeHandle->FillCloudAssetForUpload(data.tableName, data.updData);
1182     }
1183     if (errCode != E_OK) {
1184         LOGE("Failed to fill cloud asset, %d.", errCode);
1185         writeHandle->Rollback();
1186         ReleaseHandle(writeHandle);
1187         return errCode;
1188     }
1189     errCode = writeHandle->Commit();
1190     ReleaseHandle(writeHandle);
1191     return errCode;
1192 }
1193 
SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)1194 void RelationalSyncAbleStorage::SetSyncAbleEngine(std::shared_ptr<SyncAbleEngine> syncAbleEngine)
1195 {
1196     syncAbleEngine_ = syncAbleEngine;
1197 }
1198 
GetIdentify() const1199 std::string RelationalSyncAbleStorage::GetIdentify() const
1200 {
1201     if (storageEngine_ == nullptr) {
1202         LOGW("[RelationalSyncAbleStorage] engine is nullptr return default");
1203         return "";
1204     }
1205     return storageEngine_->GetIdentifier();
1206 }
1207 
EraseDataChangeCallback(uint64_t connectionId)1208 void RelationalSyncAbleStorage::EraseDataChangeCallback(uint64_t connectionId)
1209 {
1210     std::lock_guard<std::mutex> lock(dataChangeDeviceMutex_);
1211     auto it = dataChangeCallbackMap_.find(connectionId);
1212     if (it != dataChangeCallbackMap_.end()) {
1213         dataChangeCallbackMap_.erase(it);
1214     }
1215 }
1216 
ReleaseContinueToken(ContinueToken & continueStmtToken) const1217 void RelationalSyncAbleStorage::ReleaseContinueToken(ContinueToken &continueStmtToken) const
1218 {
1219     auto token = static_cast<SQLiteSingleVerRelationalContinueToken *>(continueStmtToken);
1220     if (token == nullptr || !(token->CheckValid())) {
1221         LOGW("[RelationalSyncAbleStorage][ReleaseContinueToken] Input is not a continue token.");
1222         return;
1223     }
1224     delete token;
1225     continueStmtToken = nullptr;
1226 }
1227 }
1228 #endif