• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "res_finalizer.h"
26 #include "sqlite_log_table_manager.h"
27 #include "sqlite_relational_utils.h"
28 #include "sqlite_relational_store_connection.h"
29 #include "storage_engine_manager.h"
30 #include "cloud_sync_utils.h"
31 
32 namespace DistributedDB {
33 namespace {
34 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
35 }
36 
~SQLiteRelationalStore()37 SQLiteRelationalStore::~SQLiteRelationalStore()
38 {
39     sqliteStorageEngine_ = nullptr;
40 }
41 
42 // Called when a new connection created.
IncreaseConnectionCounter()43 void SQLiteRelationalStore::IncreaseConnectionCounter()
44 {
45     if (sqliteStorageEngine_ == nullptr) {
46         LOGE("[SQLiteRelationalStore] [IncreaseConnectionCounter] sqliteStorageEngine_ is nullptr.");
47         return;
48     }
49     connectionCount_.fetch_add(1, std::memory_order_seq_cst);
50     if (connectionCount_.load() > 0) {
51         sqliteStorageEngine_->SetConnectionFlag(true);
52     }
53 }
54 
GetDBConnection(int & errCode)55 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
56 {
57     std::lock_guard<std::mutex> lock(connectMutex_);
58     RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
59     if (connection == nullptr) {
60         errCode = -E_OUT_OF_MEMORY;
61         return nullptr;
62     }
63     IncObjRef(this);
64     IncreaseConnectionCounter();
65     return connection;
66 }
67 
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)68 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
69 {
70     option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
71     option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
72     if (properties.IsEncrypted()) {
73         option.cipherType = properties.GetCipherType();
74         option.passwd = properties.GetPasswd();
75         option.iterTimes = properties.GetIterTimes();
76     }
77 }
78 
InitStorageEngine(const RelationalDBProperties & properties)79 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
80 {
81     OpenDbProperties option;
82     InitDataBaseOption(properties, option);
83     std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
84 
85     StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
86     int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
87     if (errCode != E_OK) {
88         LOGE("Init the sqlite storage engine failed:%d", errCode);
89     }
90     return errCode;
91 }
92 
ReleaseResources()93 void SQLiteRelationalStore::ReleaseResources()
94 {
95     if (sqliteStorageEngine_ != nullptr) {
96         sqliteStorageEngine_->ClearEnginePasswd();
97         sqliteStorageEngine_ = nullptr;
98     }
99 #ifdef USE_DISTRIBUTEDDB_CLOUD
100     if (cloudSyncer_ != nullptr) {
101         cloudSyncer_->Close();
102         RefObject::KillAndDecObjRef(cloudSyncer_);
103         cloudSyncer_ = nullptr;
104     }
105 #endif
106     RefObject::DecObjRef(storageEngine_);
107 }
108 
CheckDBMode()109 int SQLiteRelationalStore::CheckDBMode()
110 {
111     int errCode = E_OK;
112     auto *handle = GetHandle(true, errCode);
113     if (handle == nullptr) {
114         return errCode;
115     }
116     errCode = handle->CheckDBModeForRelational();
117     if (errCode != E_OK) {
118         LOGE("check relational DB mode failed. %d", errCode);
119     }
120 
121     ReleaseHandle(handle);
122     return errCode;
123 }
124 
GetSchemaFromMeta(RelationalSchemaObject & schema)125 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
126 {
127     Key schemaKey;
128     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
129     Value schemaVal;
130     int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
131     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
132         LOGE("Get relational schema from meta table failed. %d", errCode);
133         return errCode;
134     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
135         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
136         return -E_NOT_FOUND;
137     }
138 
139     std::string schemaStr;
140     DBCommon::VectorToString(schemaVal, schemaStr);
141     errCode = schema.ParseFromSchemaString(schemaStr);
142     if (errCode != E_OK) {
143         LOGE("Parse schema string from meta table failed.");
144         return errCode;
145     }
146 
147     sqliteStorageEngine_->SetSchema(schema);
148     return E_OK;
149 }
150 
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)151 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
152 {
153     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
154     Value modeVal;
155     int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
156     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
157         LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
158         return errCode;
159     }
160 
161     DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
162     if (!modeVal.empty()) {
163         std::string value(modeVal.begin(), modeVal.end());
164         orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
165     } else if (isUnSet) {
166         return E_OK; // First set table mode.
167     }
168 
169     if (orgMode == DistributedTableMode::COLLABORATION && orgMode != mode) {
170         LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
171         return -E_INVALID_ARGS;
172     }
173     return E_OK;
174 }
175 
CheckProperties(RelationalDBProperties properties)176 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
177 {
178     RelationalSchemaObject schema;
179     int errCode = GetSchemaFromMeta(schema);
180     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
181         LOGE("Get relational schema from meta failed. errcode=%d", errCode);
182         return errCode;
183     }
184     int ret = InitTrackerSchemaFromMeta();
185     if (ret != E_OK) {
186         LOGE("Init tracker schema from meta failed. errcode=%d", ret);
187         return ret;
188     }
189     properties.SetSchema(schema);
190 
191     // Empty schema means no distributed table has been used, we may set DB to any table mode
192     // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
193     bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
194     auto mode = properties.GetDistributedTableMode();
195     errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
196     if (errCode != E_OK) {
197         LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
198         return errCode;
199     }
200     if (!isSchemaEmpty) {
201         return errCode;
202     }
203 
204     errCode = SaveTableModeToMeta(mode);
205     if (errCode != E_OK) {
206         LOGE("Save table mode to meta failed. errCode=%d", errCode);
207         return errCode;
208     }
209 
210     return E_OK;
211 }
212 
SaveTableModeToMeta(DistributedTableMode mode)213 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
214 {
215     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
216     Value modeVal;
217     DBCommon::StringToVector(std::to_string(static_cast<int>(mode)), modeVal);
218     int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
219     if (errCode != E_OK) {
220         LOGE("Save relational schema to meta table failed. %d", errCode);
221     }
222     return errCode;
223 }
224 
SaveLogTableVersionToMeta()225 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
226 {
227     LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
228     const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY,
229         DBConstant::LOG_TABLE_VERSION_KEY + strlen(DBConstant::LOG_TABLE_VERSION_KEY));
230     Value logVersion;
231     int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
232     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
233         LOGE("Get log version from meta table failed. %d", errCode);
234         return errCode;
235     }
236     std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
237     Value logVersionVal(versionStr.begin(), versionStr.end());
238     // log version is same, no need to update
239     if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
240         return errCode;
241     }
242     // If the log version does not exist or is different, update the log version
243     errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
244     if (errCode != E_OK) {
245         LOGE("save log table version to meta table failed. %d", errCode);
246     }
247     return errCode;
248 }
249 
CleanDistributedDeviceTable()250 int SQLiteRelationalStore::CleanDistributedDeviceTable()
251 {
252     std::vector<std::string> missingTables;
253     int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
254     if (errCode != E_OK) {
255         LOGE("Clean distributed device table failed. %d", errCode);
256     }
257     for (const auto &deviceTableName : missingTables) {
258         std::string deviceHash;
259         std::string tableName;
260         DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
261         errCode = syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
262         if (errCode != E_OK) {
263             LOGE("Erase water mark failed:%d", errCode);
264             return errCode;
265         }
266     }
267     return errCode;
268 }
269 
Open(const RelationalDBProperties & properties)270 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
271 {
272     std::lock_guard<std::mutex> lock(initalMutex_);
273     if (isInitialized_) {
274         LOGD("[RelationalStore][Open] relational db was already initialized.");
275         return E_OK;
276     }
277     int errCode = InitSQLiteStorageEngine(properties);
278     if (errCode != E_OK) {
279         return errCode;
280     }
281 
282     do {
283         errCode = InitStorageEngine(properties);
284         if (errCode != E_OK) {
285             LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
286             break;
287         }
288 
289         storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
290         if (storageEngine_ == nullptr) {
291             LOGE("[RelationalStore][Open] Create syncable storage failed");
292             errCode = -E_OUT_OF_MEMORY;
293             break;
294         }
295 
296         syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
297         // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
298         // be destructed when close store
299         storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
300 #ifdef USE_DISTRIBUTEDDB_CLOUD
301         cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
302 #endif
303         errCode = CheckDBMode();
304         if (errCode != E_OK) {
305             break;
306         }
307 
308         errCode = CheckProperties(properties);
309         if (errCode != E_OK) {
310             break;
311         }
312 
313         errCode = SaveLogTableVersionToMeta();
314         if (errCode != E_OK) {
315             break;
316         }
317 
318         errCode = CleanDistributedDeviceTable();
319         if (errCode != E_OK) {
320             break;
321         }
322 
323         isInitialized_ = true;
324         return E_OK;
325     } while (false);
326 
327     ReleaseResources();
328     return errCode;
329 }
330 
OnClose(const std::function<void (void)> & notifier)331 void SQLiteRelationalStore::OnClose(const std::function<void(void)> &notifier)
332 {
333     AutoLock lockGuard(this);
334     if (notifier) {
335         closeNotifiers_.push_back(notifier);
336     } else {
337         LOGW("Register 'Close()' notifier failed, notifier is null.");
338     }
339 }
340 
GetHandle(bool isWrite,int & errCode) const341 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
342 {
343     if (sqliteStorageEngine_ == nullptr) {
344         errCode = -E_INVALID_DB;
345         return nullptr;
346     }
347 
348     return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
349         sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
350 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const351 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
352 {
353     if (handle == nullptr) {
354         return;
355     }
356 
357     if (sqliteStorageEngine_ != nullptr) {
358         StorageExecutor *databaseHandle = handle;
359         sqliteStorageEngine_->Recycle(databaseHandle);
360         handle = nullptr;
361     }
362 }
363 
Sync(const ISyncer::SyncParam & syncParam,uint64_t connectionId)364 int SQLiteRelationalStore::Sync(const ISyncer::SyncParam &syncParam, uint64_t connectionId)
365 {
366     return syncAbleEngine_->Sync(syncParam, connectionId);
367 }
368 
369 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)370 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
371 {
372     int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
373     if (count <= 0) {
374         LOGF("Decrease db connection counter failed, count <= 0.");
375         return;
376     }
377     if (storageEngine_ != nullptr) {
378         storageEngine_->EraseDataChangeCallback(connectionId);
379     }
380     if (count != 1) {
381         return;
382     }
383 
384     LockObj();
385     auto notifiers = std::move(closeNotifiers_);
386     UnlockObj();
387     for (const auto &notifier : notifiers) {
388         if (notifier) {
389             notifier();
390         }
391     }
392 
393     // Sync Close
394     syncAbleEngine_->Close();
395 
396 #ifdef USE_DISTRIBUTEDDB_CLOUD
397     if (cloudSyncer_ != nullptr) {
398         cloudSyncer_->Close();
399         RefObject::KillAndDecObjRef(cloudSyncer_);
400         cloudSyncer_ = nullptr;
401     }
402 #endif
403 
404     if (sqliteStorageEngine_ != nullptr) {
405         sqliteStorageEngine_ = nullptr;
406     }
407     {
408         if (storageEngine_ != nullptr) {
409             storageEngine_->RegisterHeartBeatListener(nullptr);
410         }
411         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
412         StopLifeCycleTimer();
413         lifeCycleNotifier_ = nullptr;
414     }
415     // close will dec sync ref of storageEngine_
416     DecObjRef(storageEngine_);
417 }
418 
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)419 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
420 {
421     if (connectionCount_.load() == 1) {
422         sqliteStorageEngine_->SetConnectionFlag(false);
423     }
424 
425     connectMutex_.lock();
426     if (connection != nullptr) {
427         KillAndDecObjRef(connection);
428         DecreaseConnectionCounter(connectionId);
429         connectMutex_.unlock();
430         KillAndDecObjRef(this);
431     } else {
432         connectMutex_.unlock();
433     }
434 }
435 
WakeUpSyncer()436 void SQLiteRelationalStore::WakeUpSyncer()
437 {
438     syncAbleEngine_->WakeUpSyncer();
439 }
440 
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)441 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
442     bool trackerSchemaChanged)
443 {
444     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
445     TableInfo tableInfo = localSchema.GetTable(tableName);
446     if (!tableInfo.Empty()) {
447         bool isSharedTable = tableInfo.GetSharedTableMark();
448         if (isSharedTable && !trackerSchemaChanged) {
449             return E_OK; // shared table will create distributed table when use SetCloudDbSchema
450         }
451     }
452 
453     auto mode = sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode();
454     std::string localIdentity; // collaboration mode need local identify
455     if (mode == DistributedTableMode::COLLABORATION) {
456         int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
457         if (errCode != E_OK || localIdentity.empty()) {
458             LOGW("Get local identity failed: %d", errCode);
459         }
460     }
461     bool schemaChanged = false;
462     int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(localIdentity),
463         schemaChanged, syncType, trackerSchemaChanged);
464     if (errCode != E_OK) {
465         LOGE("Create distributed table failed. %d", errCode);
466     } else {
467         CleanDirtyLogIfNeed(tableName);
468     }
469     if (schemaChanged) {
470         LOGD("Notify schema changed.");
471         storageEngine_->NotifySchemaChanged();
472     }
473     return errCode;
474 }
475 
476 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetCloudSyncTaskCount()477 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
478 {
479     if (cloudSyncer_ == nullptr) {
480         LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
481         return -1;
482     }
483     return cloudSyncer_->GetCloudSyncTaskCount();
484 }
485 
CleanCloudData(ClearMode mode)486 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
487 {
488     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
489     TableInfoMap tables = localSchema.GetTables();
490     std::vector<std::string> cloudTableNameList;
491     for (const auto &tableInfo : tables) {
492         bool isSharedTable = tableInfo.second.GetSharedTableMark();
493         if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
494             continue;
495         }
496         if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
497             cloudTableNameList.push_back(tableInfo.first);
498         }
499     }
500     if (cloudTableNameList.empty()) {
501         LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
502         return E_OK;
503     }
504     if (cloudSyncer_ == nullptr) {
505         LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
506         return -E_INVALID_DB;
507     }
508     int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
509     if (errCode != E_OK) {
510         LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
511     }
512 
513     return errCode;
514 }
515 
ClearCloudWatermark(const std::set<std::string> & tableNames)516 int SQLiteRelationalStore::ClearCloudWatermark(const std::set<std::string> &tableNames)
517 {
518     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
519     TableInfoMap tables = localSchema.GetTables();
520     std::vector<std::string> cloudTableNameList;
521     bool isClearAllTables = tableNames.empty();
522     for (const auto &tableInfo : tables) {
523         if (tableInfo.second.GetSharedTableMark()) {
524             continue;
525         }
526         std::string tableName = tableInfo.first;
527         std::string maskedName = DBCommon::StringMiddleMasking(tableName);
528         bool isTargetTable = tableNames.count(tableName) > 0;
529         if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION && (isClearAllTables || isTargetTable)) {
530             cloudTableNameList.push_back(tableName);
531             LOGI("[RelationalStore] cloud watermark of table %s will be cleared, original name length: %zu",
532                 maskedName.c_str(), tableName.length());
533         } else if (!isClearAllTables && isTargetTable) {
534             LOGW("[RelationalStore] table %s ignored when clear cloud watermark, original name length: %zu",
535                 maskedName.c_str(), tableName.length());
536         }
537     }
538     if (cloudTableNameList.empty()) {
539         LOGI("[RelationalStore] no target tables found for clear cloud watermark");
540         return E_OK;
541     }
542     if (cloudSyncer_ == nullptr) {
543         LOGE("[RelationalStore] cloudSyncer was not initialized when clear cloud watermark");
544         return -E_INVALID_DB;
545     }
546     int errCode = cloudSyncer_->ClearCloudWatermark(cloudTableNameList);
547     if (errCode != E_OK) {
548         LOGE("[RelationalStore] failed to clear cloud watermark, %d.", errCode);
549     }
550     return errCode;
551 }
552 #endif
553 
RemoveDeviceData()554 int SQLiteRelationalStore::RemoveDeviceData()
555 {
556     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetRelationalProperties().GetIntProp(
557         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
558     if (mode == DistributedTableMode::COLLABORATION) {
559         LOGE("Not support remove all device data in collaboration mode.");
560         return -E_NOT_SUPPORT;
561     }
562 
563     std::vector<std::string> tableNameList = GetAllDistributedTableName();
564     if (tableNameList.empty()) {
565         return E_OK;
566     }
567     // erase watermark first
568     int errCode = EraseAllDeviceWatermark(tableNameList);
569     if (errCode != E_OK) {
570         LOGE("remove watermark failed %d", errCode);
571         return errCode;
572     }
573     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
574     errCode = GetHandleAndStartTransaction(handle);
575     if (handle == nullptr) {
576         return errCode;
577     }
578 
579     for (const auto &table : tableNameList) {
580         errCode = handle->DeleteDistributedDeviceTable("", table);
581         if (errCode != E_OK) {
582             LOGE("delete device data failed. %d", errCode);
583             break;
584         }
585 
586         errCode = handle->DeleteDistributedAllDeviceTableLog(table);
587         if (errCode != E_OK) {
588             LOGE("delete device data failed. %d", errCode);
589             break;
590         }
591     }
592 
593     if (errCode != E_OK) {
594         (void)handle->Rollback();
595         ReleaseHandle(handle);
596         return errCode;
597     }
598 
599     errCode = handle->Commit();
600     ReleaseHandle(handle);
601     storageEngine_->NotifySchemaChanged();
602     return errCode;
603 }
604 
RemoveDeviceData(const std::string & device,const std::string & tableName)605 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
606 {
607     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetRelationalProperties().GetIntProp(
608         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
609     if (mode == DistributedTableMode::COLLABORATION) {
610         LOGE("Not support remove device data in collaboration mode.");
611         return -E_NOT_SUPPORT;
612     }
613 
614     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
615     auto iter = tables.find(tableName);
616     if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
617         LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
618         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
619     }
620     // cloud mode is not permit
621     if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
622         LOGE("Remove device data with cloud sync table name.");
623         return -E_NOT_SUPPORT;
624     }
625     bool isNeedHash = false;
626     std::string hashDeviceId;
627     int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
628     if (errCode == -E_NOT_SUPPORT) {
629         isNeedHash = true;
630         hashDeviceId = device;
631         errCode = E_OK;
632     }
633     if (errCode != E_OK) {
634         return errCode;
635     }
636     if (isNeedHash) {
637         // check device is uuid in meta
638         std::set<std::string> hashDevices;
639         errCode = GetExistDevices(hashDevices);
640         if (errCode != E_OK) {
641             return errCode;
642         }
643         if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
644             LOGD("[SQLiteRelationalStore] not match device, just return");
645             return E_OK;
646         }
647     }
648     return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
649 }
650 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)651 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
652     const RelationalObserverAction &action)
653 {
654     return storageEngine_->RegisterObserverAction(connectionId, observer, action);
655 }
656 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)657 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
658 {
659     return storageEngine_->UnRegisterObserverAction(connectionId, observer);
660 }
661 
StopLifeCycleTimer()662 int SQLiteRelationalStore::StopLifeCycleTimer()
663 {
664     auto runtimeCxt = RuntimeContext::GetInstance();
665     if (runtimeCxt == nullptr) {
666         return -E_INVALID_ARGS;
667     }
668     if (lifeTimerId_ != 0) {
669         TimerId timerId = lifeTimerId_;
670         lifeTimerId_ = 0;
671         runtimeCxt->RemoveTimer(timerId, false);
672     }
673     return E_OK;
674 }
675 
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)676 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier)
677 {
678     auto runtimeCxt = RuntimeContext::GetInstance();
679     if (runtimeCxt == nullptr) {
680         return -E_INVALID_ARGS;
681     }
682     RefObject::IncObjRef(this);
683     TimerId timerId = 0;
684     int errCode = runtimeCxt->SetTimer(
685         DBConstant::DEF_LIFE_CYCLE_TIME,
686         [this](TimerId id) -> int {
687             std::lock_guard<std::mutex> lock(lifeCycleMutex_);
688             if (lifeCycleNotifier_) {
689                 // normal identifier mode
690                 std::string identifier;
691                 if (sqliteStorageEngine_->GetRelationalProperties().GetBoolProp(
692                     DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
693                     identifier = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(
694                         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
695                 } else {
696                     identifier = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(
697                         DBProperties::IDENTIFIER_DATA, "");
698                 }
699                 auto userId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, "");
700                 lifeCycleNotifier_(identifier, userId);
701             }
702             return 0;
703         },
704         [this]() {
705             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
706             if (ret != E_OK) {
707                 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
708             }
709         },
710         timerId);
711     if (errCode != E_OK) {
712         lifeTimerId_ = 0;
713         LOGE("SetTimer failed:%d", errCode);
714         RefObject::DecObjRef(this);
715         return errCode;
716     }
717 
718     lifeCycleNotifier_ = notifier;
719     lifeTimerId_ = timerId;
720     return E_OK;
721 }
722 
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)723 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier)
724 {
725     int errCode;
726     {
727         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
728         if (lifeTimerId_ != 0) {
729             errCode = StopLifeCycleTimer();
730             if (errCode != E_OK) {
731                 LOGE("Stop the life cycle timer failed:%d", errCode);
732                 return errCode;
733             }
734         }
735 
736         if (!notifier) {
737             return E_OK;
738         }
739         errCode = StartLifeCycleTimer(notifier);
740         if (errCode != E_OK) {
741             LOGE("Register life cycle timer failed:%d", errCode);
742             return errCode;
743         }
744     }
745     auto listener = [this] { HeartBeat(); };
746     storageEngine_->RegisterHeartBeatListener(listener);
747     return errCode;
748 }
749 
HeartBeat()750 void SQLiteRelationalStore::HeartBeat()
751 {
752     std::lock_guard<std::mutex> lock(lifeCycleMutex_);
753     int errCode = ResetLifeCycleTimer();
754     if (errCode != E_OK) {
755         LOGE("Heart beat for life cycle failed:%d", errCode);
756     }
757 }
758 
ResetLifeCycleTimer()759 int SQLiteRelationalStore::ResetLifeCycleTimer()
760 {
761     if (lifeTimerId_ == 0) {
762         return E_OK;
763     }
764     auto lifeNotifier = lifeCycleNotifier_;
765     lifeCycleNotifier_ = nullptr;
766     int errCode = StopLifeCycleTimer();
767     if (errCode != E_OK) {
768         LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
769     }
770     return StartLifeCycleTimer(lifeNotifier);
771 }
772 
GetStorePath() const773 std::string SQLiteRelationalStore::GetStorePath() const
774 {
775     return sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::DATA_DIR, "");
776 }
777 
GetProperties() const778 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
779 {
780     return sqliteStorageEngine_->GetRelationalProperties();
781 }
782 
StopSync(uint64_t connectionId)783 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
784 {
785     return syncAbleEngine_->StopSync(connectionId);
786 }
787 
Dump(int fd)788 void SQLiteRelationalStore::Dump(int fd)
789 {
790     std::string userId = "";
791     std::string appId = "";
792     std::string storeId = "";
793     std::string label = "";
794     if (sqliteStorageEngine_ != nullptr) {
795         userId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, "");
796         appId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, "");
797         storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
798         label = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
799     }
800     label = DBCommon::TransferStringToHex(label);
801     DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
802         storeId.c_str(), label.c_str());
803     if (syncAbleEngine_ != nullptr) {
804         syncAbleEngine_->Dump(fd);
805     }
806 }
807 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)808 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
809     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
810 {
811     if (sqliteStorageEngine_ == nullptr) {
812         return -E_INVALID_DB;
813     }
814     if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
815         LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
816         return -E_MAX_LIMITS;
817     }
818 
819     if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
820         LOGW("not a distributed relational store.");
821         return -E_NOT_SUPPORT;
822     }
823 
824     // Check whether to be able to operate the db.
825     int errCode = E_OK;
826     auto *handle = GetHandle(false, errCode);
827     if (handle == nullptr) {
828         return errCode;
829     }
830     errCode = handle->CheckEncryptedOrCorrupted();
831     ReleaseHandle(handle);
832     if (errCode != E_OK) {
833         return errCode;
834     }
835 
836     return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
837 }
838 
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)839 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
840 {
841     std::set<std::string> devices;
842     int errCode = GetExistDevices(devices);
843     if (errCode != E_OK) {
844         return errCode;
845     }
846     for (const auto &tableName : tableNameList) {
847         for (const auto &device : devices) {
848             errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
849             if (errCode != E_OK) {
850                 return errCode;
851             }
852         }
853     }
854     return E_OK;
855 }
856 
GetDevTableName(const std::string & device,const std::string & hashDev) const857 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
858 {
859     std::string devTableName;
860     StoreInfo info = { sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, ""),
861         sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, ""),
862         sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "") };
863     if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
864         devTableName = hashDev;
865     }
866     return devTableName;
867 }
868 
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const869 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
870 {
871     int errCode = E_OK;
872     handle = GetHandle(true, errCode);
873     if (handle == nullptr) {
874         LOGE("get handle failed %d", errCode);
875         return errCode;
876     }
877 
878     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
879     if (errCode != E_OK) {
880         LOGE("start transaction failed %d", errCode);
881         ReleaseHandle(handle);
882     }
883     return errCode;
884 }
885 
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)886 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
887     const std::string &tableName, bool isNeedHash)
888 {
889     std::string hashHexDev;
890     std::string hashDev;
891     std::string devTableName;
892     if (!isNeedHash) {
893         // if is not need hash mappingDev mean hash(uuid) device is param device
894         hashHexDev = DBCommon::TransferStringToHex(mappingDev);
895         hashDev = mappingDev;
896         devTableName = device;
897     } else {
898         // if is need hash mappingDev mean uuid
899         hashDev = DBCommon::TransferHashString(mappingDev);
900         hashHexDev = DBCommon::TransferStringToHex(hashDev);
901         devTableName = GetDevTableName(mappingDev, hashHexDev);
902     }
903     // erase watermark first
904     int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
905     if (errCode != E_OK) {
906         LOGE("erase watermark failed %d", errCode);
907         return errCode;
908     }
909     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
910     errCode = GetHandleAndStartTransaction(handle);
911     if (handle == nullptr) {
912         return errCode;
913     }
914 
915     errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
916     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
917     if (errCode != E_OK) {
918         LOGE("delete device data failed. %d", errCode);
919         tables.clear();
920     }
921 
922     for (const auto &it : tables) {
923         if (tableName.empty() || it.second.GetTableName() == tableName) {
924             errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
925             if (errCode != E_OK) {
926                 LOGE("delete device data failed. %d", errCode);
927                 break;
928             }
929         }
930     }
931 
932     if (errCode != E_OK) {
933         (void)handle->Rollback();
934         ReleaseHandle(handle);
935         return errCode;
936     }
937     errCode = handle->Commit();
938     ReleaseHandle(handle);
939     storageEngine_->NotifySchemaChanged();
940     return errCode;
941 }
942 
GetExistDevices(std::set<std::string> & hashDevices) const943 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
944 {
945     int errCode = E_OK;
946     auto *handle = GetHandle(true, errCode);
947     if (handle == nullptr) {
948         LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
949         return errCode;
950     }
951     errCode = handle->GetExistsDeviceList(hashDevices);
952     if (errCode != E_OK) {
953         LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
954     }
955     ReleaseHandle(handle);
956     return errCode;
957 }
958 
GetAllDistributedTableName(TableSyncType tableSyncType)959 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName(TableSyncType tableSyncType)
960 {
961     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
962     std::vector<std::string> tableNames;
963     for (const auto &table : tables) {
964         if (table.second.GetTableSyncType() != tableSyncType) {
965             continue;
966         }
967         tableNames.push_back(table.second.GetTableName());
968     }
969     return tableNames;
970 }
971 
972 #ifdef USE_DISTRIBUTEDDB_CLOUD
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)973 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
974 {
975     if (cloudSyncer_ == nullptr) {
976         LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
977         return -E_INVALID_DB;
978     }
979     cloudSyncer_->SetCloudDB(cloudDb);
980     return E_OK;
981 }
982 #endif
983 
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)984 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
985     std::vector<Field> &addFields)
986 {
987     for (const auto &newField : newFields) {
988         if (equalFields.find(newField.colName) == equalFields.end()) {
989             addFields.push_back(newField);
990         }
991     }
992 }
993 
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)994 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
995     std::vector<Field> &addFields)
996 {
997     std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
998     if (newFields.size() < oldFields.size()) {
999         return false;
1000     }
1001     std::set<std::string> equalFields;
1002     for (const auto &oldField : oldFields) {
1003         bool isFieldExist = false;
1004         for (const auto &newField : newFields) {
1005             if (newField.colName != oldField.GetFieldName()) {
1006                 continue;
1007             }
1008             isFieldExist = true;
1009             int32_t type = newField.type;
1010             // Field type need to match storage type
1011             // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
1012             // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
1013             if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
1014                 type++; // storage type - field type = 1
1015             } else if (type == TYPE_INDEX<bool>) {
1016                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
1017             } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
1018                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
1019             }
1020             auto primaryKeyMap = tableInfo.GetPrimaryKey();
1021             auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
1022                 [&newField](const std::map<int, std::string>::value_type &pair) {
1023                     return pair.second == newField.colName;
1024                 });
1025             if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
1026                 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
1027                 return false;
1028             }
1029             equalFields.insert(newField.colName);
1030         }
1031         if (!isFieldExist) {
1032             return false;
1033         }
1034     }
1035     AddFields(newFields, equalFields, addFields);
1036     return true;
1037 }
1038 
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)1039 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
1040     std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
1041 {
1042     std::set<std::string> tableNames;
1043     std::map<std::string, std::string> sharedTableNamesMap;
1044     std::map<std::string, std::vector<Field>> fieldsMap;
1045     for (const auto &table : schema.tables) {
1046         tableNames.insert(table.name);
1047         sharedTableNamesMap[table.name] = table.sharedTableName;
1048         std::vector<Field> fields = table.fields;
1049         bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
1050         Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
1051         Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
1052         fields.insert(fields.begin(), privilegeField);
1053         fields.insert(fields.begin(), ownerField);
1054         fieldsMap[table.name] = fields;
1055     }
1056 
1057     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1058     TableInfoMap tableList = localSchema.GetTables();
1059     for (const auto &tableInfo : tableList) {
1060         if (!tableInfo.second.GetSharedTableMark()) {
1061             continue;
1062         }
1063         std::string oldSharedTableName = tableInfo.second.GetTableName();
1064         std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1065         std::vector<Field> addFields;
1066         if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1067             deleteTableNames.push_back(oldSharedTableName);
1068         } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1069             deleteTableNames.push_back(oldSharedTableName);
1070         } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1071             if (!addFields.empty()) {
1072                 updateTableNames[oldSharedTableName] = addFields;
1073             }
1074             if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1075                 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1076             }
1077         } else {
1078             return false;
1079         }
1080     }
1081     return true;
1082 }
1083 
1084 #ifdef USE_DISTRIBUTEDDB_CLOUD
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1085 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1086 {
1087     if (storageEngine_ == nullptr) {
1088         LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1089         return -E_INVALID_DB;
1090     }
1091     int errCode = CheckCloudSchema(schema);
1092     if (errCode != E_OK) {
1093         return errCode;
1094     }
1095     // delete, update and create shared table and its distributed table
1096     errCode = ExecuteCreateSharedTable(schema);
1097     if (errCode != E_OK) {
1098         LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1099         return errCode;
1100     }
1101     return storageEngine_->SetCloudDbSchema(schema);
1102 }
1103 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1104 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1105 {
1106     if (cloudSyncer_ == nullptr) {
1107         LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1108         return -E_INVALID_DB;
1109     }
1110     cloudSyncer_->SetIAssetLoader(loader);
1111     return E_OK;
1112 }
1113 #endif
1114 
ChkSchema(const TableName & tableName)1115 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1116 {
1117     // check schema first then compare columns to avoid change the origin return error code
1118     if (storageEngine_ == nullptr) {
1119         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1120         return -E_INVALID_DB;
1121     }
1122     int errCode = storageEngine_->ChkSchema(tableName);
1123     if (errCode != E_OK) {
1124         LOGE("[SQLiteRelationalStore][ChkSchema] ChkSchema failed %d.", errCode);
1125         return errCode;
1126     }
1127     auto *handle = GetHandle(false, errCode);
1128     if (handle == nullptr) {
1129         LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1130         return errCode;
1131     }
1132     RelationalSchemaObject localSchema = storageEngine_->GetSchemaInfo();
1133     handle->SetLocalSchema(localSchema);
1134     errCode = handle->CompareSchemaTableColumns(tableName);
1135     if (errCode != E_OK) {
1136         LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1137     }
1138     ReleaseHandle(handle);
1139     return errCode;
1140 }
1141 
1142 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1143 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1144 {
1145     if (storageEngine_ == nullptr) {
1146         LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1147         return -E_INVALID_DB;
1148     }
1149     int errCode = CheckBeforeSync(option);
1150     if (errCode != E_OK) {
1151         return errCode;
1152     }
1153     LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1154     if (option.compensatedSyncOnly) {
1155         CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1156         info.storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
1157         cloudSyncer_->GenerateCompensatedSync(info);
1158         return E_OK;
1159     }
1160     CloudSyncer::CloudTaskInfo info;
1161     FillSyncInfo(option, onProcess, info);
1162     auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1163     if (ret != E_OK) {
1164         return ret;
1165     }
1166     ret = ReFillSyncInfoTable(table, info);
1167     if (ret != E_OK) {
1168         return ret;
1169     }
1170     info.taskId = taskId;
1171     errCode = cloudSyncer_->Sync(info);
1172     return errCode;
1173 }
1174 
CheckBeforeSync(const CloudSyncOption & option)1175 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1176 {
1177     if (cloudSyncer_ == nullptr) {
1178         LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1179         return -E_INVALID_DB;
1180     }
1181     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1182         return -E_INVALID_ARGS;
1183     }
1184     if (option.priorityLevel < CloudDbConstant::PRIORITY_TASK_DEFALUT_LEVEL ||
1185         option.priorityLevel > CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1186         LOGE("[RelationalStore] priority level is invalid value:%d", option.priorityLevel);
1187         return -E_INVALID_ARGS;
1188     }
1189     if (option.compensatedSyncOnly && option.asyncDownloadAssets) {
1190         return -E_NOT_SUPPORT;
1191     }
1192     int errCode = CheckQueryValid(option);
1193     if (errCode != E_OK) {
1194         return errCode;
1195     }
1196     SecurityOption securityOption;
1197     errCode = storageEngine_->GetSecurityOption(securityOption);
1198     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1199         return -E_SECURITY_OPTION_CHECK_ERROR;
1200     }
1201     CloudSyncConfig config = storageEngine_->GetCloudSyncConfig();
1202     if ((errCode == E_OK) && (securityOption.securityLabel == S4) && (!config.isSupportEncrypt)) {
1203         LOGE("[RelationalStore] The current data does not support synchronization.");
1204         return -E_SECURITY_OPTION_CHECK_ERROR;
1205     }
1206     return E_OK;
1207 }
1208 
CheckAssetsOnlyValid(const QuerySyncObject & querySyncObject,const CloudSyncOption & option)1209 int SQLiteRelationalStore::CheckAssetsOnlyValid(const QuerySyncObject &querySyncObject, const CloudSyncOption &option)
1210 {
1211     if (!querySyncObject.IsAssetsOnly()) {
1212         return E_OK;
1213     }
1214     if (option.mode != SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1215         LOGE("[RelationalStore] not support mode %d when sync with assets only", option.mode);
1216         return -E_NOT_SUPPORT;
1217     }
1218     if (option.priorityLevel != CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1219         LOGE("[RelationalStore] priorityLevel must be 2 when sync with assets only, now is %d",
1220             option.priorityLevel);
1221         return -E_INVALID_ARGS;
1222     }
1223     if (querySyncObject.AssetsOnlyErrFlag() == -E_INVALID_ARGS) {
1224         LOGE("[RelationalStore] the query statement of assets only is incorrect.");
1225         return -E_INVALID_ARGS;
1226     }
1227     return E_OK;
1228 }
1229 
CheckQueryValid(const CloudSyncOption & option)1230 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1231 {
1232     if (option.compensatedSyncOnly) {
1233         return E_OK;
1234     }
1235     QuerySyncObject syncObject(option.query);
1236     int errCode = syncObject.GetValidStatus();
1237     if (errCode != E_OK) {
1238         LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1239         return errCode;
1240     }
1241     std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1242     bool isFromTable = object.empty();
1243     if (!option.priorityTask && !isFromTable) {
1244         LOGE("[RelationalStore] not support normal sync with query");
1245         return -E_NOT_SUPPORT;
1246     }
1247     const auto tableNames = syncObject.GetRelationTableNames();
1248     for (const auto &tableName : tableNames) {
1249         QuerySyncObject querySyncObject;
1250         querySyncObject.SetTableName(tableName);
1251         object.push_back(querySyncObject);
1252     }
1253     std::vector<std::string> syncTableNames;
1254     for (const auto &item : object) {
1255         std::string tableName = item.GetRelationTableName();
1256         syncTableNames.emplace_back(tableName);
1257         if (item.IsContainQueryNodes() && option.asyncDownloadAssets) {
1258             LOGE("[RelationalStore] not support async download assets with query table %s length %zu",
1259                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length());
1260             return -E_NOT_SUPPORT;
1261         }
1262         errCode = CheckAssetsOnlyValid(item, option);
1263         if (errCode != E_OK) {
1264             return errCode;
1265         }
1266     }
1267     errCode = CheckTableName(syncTableNames);
1268     if (errCode != E_OK) {
1269         return errCode;
1270     }
1271     return CheckObjectValid(option.priorityTask, object, isFromTable);
1272 }
1273 
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1274 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1275     bool isFromTable)
1276 {
1277     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1278     for (const auto &item : object) {
1279         if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1280             LOGE("[RelationalStore] not support priority sync with full table");
1281             return -E_INVALID_ARGS;
1282         }
1283         int errCode = storageEngine_->CheckQueryValid(item);
1284         if (errCode != E_OK) {
1285             return errCode;
1286         }
1287         if (!priorityTask || isFromTable) {
1288             continue;
1289         }
1290         if (!item.IsInValueOutOfLimit()) {
1291             LOGE("[RelationalStore] not support priority sync in count out of limit");
1292             return -E_MAX_LIMITS;
1293         }
1294         std::string tableName = item.GetRelationTableName();
1295         TableInfo tableInfo = localSchema.GetTable(tableName);
1296         if (!tableInfo.Empty()) {
1297             const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1298             errCode = item.CheckPrimaryKey(primaryKeyMap);
1299             if (errCode != E_OK) {
1300                 return errCode;
1301             }
1302         }
1303     }
1304     return E_OK;
1305 }
1306 
CheckTableName(const std::vector<std::string> & tableNames)1307 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1308 {
1309     if (tableNames.empty()) {
1310         LOGE("[RelationalStore] sync with empty table");
1311         return -E_INVALID_ARGS;
1312     }
1313     for (const auto &table : tableNames) {
1314         int errCode = ChkSchema(table);
1315         if (errCode != E_OK) {
1316             LOGE("[RelationalStore] schema check failed when sync");
1317             return errCode;
1318         }
1319     }
1320     return E_OK;
1321 }
1322 
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1323 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1324     CloudSyncer::CloudTaskInfo &info)
1325 {
1326     auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1327     if (syncObject.empty()) {
1328         QuerySyncObject querySyncObject(option.query);
1329         info.table = querySyncObject.GetRelationTableNames();
1330         for (const auto &item : info.table) {
1331             QuerySyncObject object(Query::Select());
1332             object.SetTableName(item);
1333             info.queryList.push_back(object);
1334         }
1335     } else {
1336         for (auto &item : syncObject) {
1337             info.table.push_back(item.GetRelationTableName());
1338             info.queryList.push_back(std::move(item));
1339         }
1340     }
1341     info.devices = option.devices;
1342     info.mode = option.mode;
1343     info.callback = onProcess;
1344     info.timeout = option.waitTime;
1345     info.priorityTask = option.priorityTask;
1346     info.compensatedTask = option.compensatedSyncOnly;
1347     info.priorityLevel = option.priorityLevel;
1348     info.users.emplace_back("");
1349     info.lockAction = option.lockAction;
1350     info.merge = option.merge;
1351     info.storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
1352     info.prepareTraceId = option.prepareTraceId;
1353     info.asyncDownloadAssets = option.asyncDownloadAssets;
1354 }
1355 #endif
1356 
SetTrackerTable(const TrackerSchema & trackerSchema)1357 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1358 {
1359     TableInfo tableInfo;
1360     bool isFirstCreate = false;
1361     bool isNoTableInSchema = false;
1362     int errCode = CheckTrackerTable(trackerSchema, tableInfo, isNoTableInSchema, isFirstCreate);
1363     if (errCode != E_OK) {
1364         if (errCode != -E_IGNORE_DATA) {
1365             return errCode;
1366         }
1367         auto *handle = GetHandle(true, errCode);
1368         if (handle != nullptr) {
1369             handle->CheckAndCreateTrigger(tableInfo);
1370             // Try clear historical mismatched log, which usually do not occur and apply to tracker table only.
1371             if (isNoTableInSchema) {
1372                 handle->ClearLogOfMismatchedData(trackerSchema.tableName);
1373             }
1374             handle->RecoverNullExtendLog(trackerSchema, tableInfo.GetTrackerTable());
1375             ReleaseHandle(handle);
1376         }
1377         CleanDirtyLogIfNeed(trackerSchema.tableName);
1378         return E_OK;
1379     }
1380     errCode = sqliteStorageEngine_->UpdateExtendField(trackerSchema);
1381     if (errCode != E_OK) {
1382         LOGE("[RelationalStore] update [%s [%zu]] extend_field failed: %d",
1383             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1384         return errCode;
1385     }
1386     if (isNoTableInSchema) {
1387         errCode = sqliteStorageEngine_->SetTrackerTable(trackerSchema, tableInfo, isFirstCreate);
1388         if (errCode == E_OK) {
1389             CleanDirtyLogIfNeed(trackerSchema.tableName);
1390         }
1391         return errCode;
1392     }
1393     sqliteStorageEngine_->CacheTrackerSchema(trackerSchema);
1394     errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1395     if (errCode != E_OK) {
1396         LOGE("[RelationalStore] create distributed table of [%s [%zu]] failed: %d",
1397             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1398         return errCode;
1399     }
1400     return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1401 }
1402 
CheckTrackerTable(const TrackerSchema & trackerSchema,TableInfo & table,bool & isNoTableInSchema,bool & isFirstCreate)1403 int SQLiteRelationalStore::CheckTrackerTable(const TrackerSchema &trackerSchema, TableInfo &table,
1404     bool &isNoTableInSchema, bool &isFirstCreate)
1405 {
1406     const RelationalSchemaObject tracker = sqliteStorageEngine_->GetTrackerSchema();
1407     isFirstCreate = tracker.GetTrackerTable(trackerSchema.tableName).GetTableName().empty();
1408     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1409     table = localSchema.GetTable(trackerSchema.tableName);
1410     TrackerTable trackerTable;
1411     trackerTable.Init(trackerSchema);
1412     int errCode = E_OK;
1413     auto *handle = GetHandle(true, errCode);
1414     if (handle == nullptr) {
1415         return errCode;
1416     }
1417     ResFinalizer finalizer([this, handle]() {
1418         SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle;
1419         ReleaseHandle(releaseHandle);
1420     });
1421     bool isOnceDrop = false;
1422     (void)handle->IsTableOnceDropped(trackerSchema.tableName, isOnceDrop);
1423     if (table.Empty()) {
1424         isNoTableInSchema = true;
1425         table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
1426         errCode = handle->AnalysisTrackerTable(trackerTable, table);
1427         if (errCode != E_OK) {
1428             LOGE("[CheckTrackerTable] analysis table schema failed %d.", errCode);
1429             return errCode;
1430         }
1431     } else {
1432         table.SetTrackerTable(trackerTable);
1433         errCode = table.CheckTrackerTable();
1434         if (errCode != E_OK) {
1435             LOGE("[CheckTrackerTable] check tracker table schema failed. %d", errCode);
1436             return errCode;
1437         }
1438     }
1439     if (!trackerSchema.isForceUpgrade && !tracker.GetTrackerTable(trackerSchema.tableName).IsChanging(trackerSchema)
1440         && !isOnceDrop) {
1441         LOGW("[CheckTrackerTable] tracker schema is no change, table[%s [%zu]]",
1442             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size());
1443         return -E_IGNORE_DATA;
1444     }
1445     return E_OK;
1446 }
1447 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1448 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1449 {
1450     if (condition.sql.empty()) {
1451         LOGE("[RelationalStore] execute sql is empty.");
1452         return -E_INVALID_ARGS;
1453     }
1454     return sqliteStorageEngine_->ExecuteSql(condition, records);
1455 }
1456 
CleanWaterMarkInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::set<std::string> & clearWaterMarkTable)1457 int SQLiteRelationalStore::CleanWaterMarkInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
1458     const std::set<std::string> &clearWaterMarkTable)
1459 {
1460     int errCode = E_OK;
1461     for (const auto &tableName : clearWaterMarkTable) {
1462         std::string cloudWaterMark;
1463         Value blobMetaVal;
1464         errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1465         if (errCode != E_OK) {
1466             LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1467             return errCode;
1468         }
1469         errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1470         if (errCode != E_OK) {
1471             LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1472             return errCode;
1473         }
1474         errCode = handle->CleanUploadFinishedFlag(tableName);
1475         if (errCode != E_OK) {
1476             LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1477             return errCode;
1478         }
1479     }
1480 #ifdef USE_DISTRIBUTEDDB_CLOUD
1481     errCode = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTable);
1482     if (errCode != E_OK) {
1483         LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1484     }
1485 #endif
1486     return errCode;
1487 }
1488 
CleanWaterMark(const std::set<std::string> clearWaterMarkTables)1489 std::function<int(void)> SQLiteRelationalStore::CleanWaterMark(const std::set<std::string> clearWaterMarkTables)
1490 {
1491     return [this, clearWaterMarkTables]()->int {
1492         int errCode = E_OK;
1493         auto *handle = GetHandle(true, errCode);
1494         if (handle == nullptr) {
1495             LOGE("[SQLiteRelationalStore] get handle failed:%d", errCode);
1496             return errCode;
1497         }
1498         storageEngine_->SetReusedHandle(handle);
1499         errCode = CleanWaterMarkInner(handle, clearWaterMarkTables);
1500         storageEngine_->SetReusedHandle(nullptr);
1501         ReleaseHandle(handle);
1502         if (errCode != E_OK) {
1503             LOGE("[SQLiteRelationalStore] SetReference clear water mark failed, errCode = %d", errCode);
1504         }
1505         LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1506         return errCode;
1507     };
1508 }
1509 
SetReferenceInner(const std::vector<TableReferenceProperty> & tableReferenceProperty,std::set<std::string> & clearWaterMarkTables)1510 int SQLiteRelationalStore::SetReferenceInner(const std::vector<TableReferenceProperty> &tableReferenceProperty,
1511     std::set<std::string> &clearWaterMarkTables)
1512 {
1513     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1514     int errCode = GetHandleAndStartTransaction(handle);
1515     if (errCode != E_OK) {
1516         LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1517         return errCode;
1518     }
1519     RelationalSchemaObject schema;
1520     errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1521     if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1522         LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1523         (void)handle->Rollback();
1524         ReleaseHandle(handle);
1525         return errCode;
1526     }
1527 
1528     int ret = handle->Commit();
1529     ReleaseHandle(handle);
1530     if (ret == E_OK) {
1531         sqliteStorageEngine_->SetSchema(schema);
1532         return errCode;
1533     }
1534     LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1535     return ret;
1536 }
1537 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1538 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1539 {
1540     std::set<std::string> clearWaterMarkTables;
1541     int errCode = SetReferenceInner(tableReferenceProperty, clearWaterMarkTables);
1542     if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1543         LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1544         return errCode;
1545     }
1546     std::function<int(void)> clearWaterMarkFunc = [] { return E_OK; };
1547     if (!clearWaterMarkTables.empty()) {
1548         clearWaterMarkFunc = CleanWaterMark(clearWaterMarkTables);
1549     }
1550     if (errCode == -E_TABLE_REFERENCE_CHANGED) {
1551         int ret = E_OK;
1552 #ifdef USE_DISTRIBUTEDDB_CLOUD
1553         ret = cloudSyncer_->StopSyncTask(clearWaterMarkFunc);
1554 #endif
1555         return ret != E_OK ? ret : errCode;
1556     }
1557     return E_OK;
1558 }
1559 
InitTrackerSchemaFromMeta()1560 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1561 {
1562     int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1563     return errCode == -E_NOT_FOUND ? E_OK : errCode;
1564 }
1565 
CleanTrackerData(const std::string & tableName,int64_t cursor)1566 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1567 {
1568     if (tableName.empty()) {
1569         return -E_INVALID_ARGS;
1570     }
1571     return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1572 }
1573 
ExecuteCreateSharedTable(const DataBaseSchema & schema)1574 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1575 {
1576     if (sqliteStorageEngine_ == nullptr) {
1577         LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1578         return -E_INVALID_DB;
1579     }
1580     std::vector<std::string> deleteTableNames;
1581     std::map<std::string, std::vector<Field>> updateTableNames;
1582     std::map<std::string, std::string> alterTableNames;
1583     if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1584         LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1585         return -E_INVALID_ARGS;
1586     }
1587     LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1588     // upgrade contains delete, alter, update and create
1589     int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1590     if (errCode != E_OK) {
1591         LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1592     } else {
1593         LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1594     }
1595     return errCode;
1596 }
1597 
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1598 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1599     CloudSyncer::CloudTaskInfo &info)
1600 {
1601     if (info.priorityTask && actualTable.size() != info.table.size()) {
1602         LOGE("[RelationalStore] Not support regenerate table with priority task");
1603         return -E_NOT_SUPPORT;
1604     }
1605     if (actualTable.size() == info.table.size()) {
1606         return E_OK;
1607     }
1608     LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1609     info.table = actualTable;
1610     info.queryList.clear();
1611     for (const auto &item : info.table) {
1612         QuerySyncObject object(Query::Select());
1613         object.SetTableName(item);
1614         info.queryList.push_back(object);
1615     }
1616     return E_OK;
1617 }
1618 
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1619 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1620 {
1621     if (cmd != LOGIC_DELETE_SYNC_DATA) {
1622         return -E_NOT_SUPPORT;
1623     }
1624     if (pragmaData == nullptr) {
1625         return -E_INVALID_ARGS;
1626     }
1627     auto logicDelete = *(static_cast<bool *>(pragmaData));
1628     if (storageEngine_ == nullptr) {
1629         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1630         return -E_INVALID_DB;
1631     }
1632     storageEngine_->SetLogicDelete(logicDelete);
1633     return E_OK;
1634 }
1635 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1636 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1637     const std::vector<VBucket> &records)
1638 {
1639     if (storageEngine_ == nullptr) {
1640         LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1641         return -E_INVALID_DB;
1642     }
1643     int errCode = CheckParamForUpsertData(status, tableName, records);
1644     if (errCode != E_OK) {
1645         return errCode;
1646     }
1647     return storageEngine_->UpsertData(status, tableName, records);
1648 }
1649 
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1650 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1651     const std::vector<VBucket> &records)
1652 {
1653     if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1654         LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1655         return -E_INVALID_ARGS;
1656     }
1657     if (records.empty()) {
1658         LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1659         return -E_INVALID_ARGS;
1660     }
1661     size_t recordSize = records.size();
1662     if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1663         LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1664         return -E_MAX_LIMITS;
1665     }
1666     return CheckSchemaForUpsertData(tableName, records);
1667 }
1668 
ChkTable(const TableInfo & table)1669 static int ChkTable(const TableInfo &table)
1670 {
1671     if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1672         LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1673         return -E_NOT_SUPPORT;
1674     }
1675     if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1676         return -E_NOT_FOUND;
1677     }
1678     return E_OK;
1679 }
1680 
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1681 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1682 {
1683     if (tableName.empty()) {
1684         return -E_INVALID_ARGS;
1685     }
1686     auto schema = storageEngine_->GetSchemaInfo();
1687     auto table = schema.GetTable(tableName);
1688     int errCode = ChkTable(table);
1689     if (errCode != E_OK) {
1690         return errCode;
1691     }
1692     TableSchema cloudTableSchema;
1693     errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1694     if (errCode != E_OK) {
1695         LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1696         return errCode;
1697     }
1698     errCode = ChkSchema(tableName);
1699     if (errCode != E_OK) {
1700         return errCode;
1701     }
1702     std::set<std::string> dbPkFields;
1703     for (auto &field : table.GetIdentifyKey()) {
1704         dbPkFields.insert(field);
1705     }
1706     std::set<std::string> schemaFields;
1707     for (auto &fieldInfo : table.GetFieldInfos()) {
1708         schemaFields.insert(fieldInfo.GetFieldName());
1709     }
1710     for (const auto &record : records) {
1711         std::set<std::string> recordPkFields;
1712         for (const auto &item : record) {
1713             if (schemaFields.find(item.first) == schemaFields.end()) {
1714                 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1715                 return -E_INVALID_ARGS;
1716             }
1717             if (dbPkFields.find(item.first) == dbPkFields.end()) {
1718                 continue;
1719             }
1720             recordPkFields.insert(item.first);
1721         }
1722         if (recordPkFields.size() != dbPkFields.size()) {
1723             LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1724                 recordPkFields.size(), dbPkFields.size());
1725             return -E_INVALID_ARGS;
1726         }
1727     }
1728     return errCode;
1729 }
1730 
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1731 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1732 {
1733     auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1734     if (engine == nullptr) {
1735         LOGE("[RelationalStore][Open] Create storage engine failed");
1736         return -E_OUT_OF_MEMORY;
1737     }
1738     sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1739         [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1740         RefObject::KillAndDecObjRef(releaseEngine);
1741     });
1742     return E_OK;
1743 }
1744 
1745 #ifdef USE_DISTRIBUTEDDB_CLOUD
CheckCloudSchema(const DataBaseSchema & schema)1746 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1747 {
1748     if (storageEngine_ == nullptr) {
1749         LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1750         return -E_INVALID_DB;
1751     }
1752     std::shared_ptr<DataBaseSchema> cloudSchema;
1753     (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1754     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1755     for (const auto &tableSchema : schema.tables) {
1756         TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1757         if (tableInfo.Empty()) {
1758             continue;
1759         }
1760         if (tableInfo.GetSharedTableMark()) {
1761             LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1762             return -E_INVALID_ARGS;
1763         }
1764     }
1765     for (const auto &tableSchema : schema.tables) {
1766         if (cloudSchema == nullptr) {
1767             continue;
1768         }
1769         for (const auto &oldSchema : cloudSchema->tables) {
1770             if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1771                 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1772                 return -E_INVALID_ARGS;
1773             }
1774         }
1775     }
1776     return E_OK;
1777 }
1778 
SetCloudSyncConfig(const CloudSyncConfig & config)1779 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1780 {
1781     if (storageEngine_ == nullptr) {
1782         LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1783         return -E_INVALID_DB;
1784     }
1785     storageEngine_->SetCloudSyncConfig(config);
1786     LOGI("[RelationalStore][SetCloudSyncConfig] value:[%" PRId32 ", %" PRId32 ", %" PRId32 ", %d]",
1787         config.maxUploadCount, config.maxUploadSize, config.maxRetryConflictTimes, config.isSupportEncrypt);
1788     return E_OK;
1789 }
1790 
GetCloudTaskStatus(uint64_t taskId)1791 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1792 {
1793     return cloudSyncer_->GetCloudTaskStatus(taskId);
1794 }
1795 #endif
1796 
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1797 int SQLiteRelationalStore::SetDistributedSchema(const DistributedSchema &schema, bool isForceUpgrade)
1798 {
1799     if (sqliteStorageEngine_ == nullptr || storageEngine_ == nullptr) {
1800         LOGE("[RelationalStore] engine was not initialized");
1801         return -E_INVALID_DB;
1802     }
1803     auto mode = sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode();
1804     std::string localIdentity; // collaboration mode need local identify
1805     if (mode == DistributedTableMode::COLLABORATION) {
1806         int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
1807         if (errCode != E_OK || localIdentity.empty()) {
1808             LOGW("Get local identity failed: %d", errCode);
1809         }
1810     }
1811     auto [errCode, isSchemaChange] = sqliteStorageEngine_->SetDistributedSchema(schema, localIdentity, isForceUpgrade);
1812     if (errCode != E_OK) {
1813         return errCode;
1814     }
1815     if (isSchemaChange) {
1816         LOGI("[RelationalStore] schema was changed by setting distributed schema");
1817         storageEngine_->NotifySchemaChanged();
1818     }
1819     return E_OK;
1820 }
1821 
GetDownloadingAssetsCount(int32_t & count)1822 int SQLiteRelationalStore::GetDownloadingAssetsCount(int32_t &count)
1823 {
1824     std::vector<std::string> tableNameList = GetAllDistributedTableName(TableSyncType::CLOUD_COOPERATION);
1825     if (tableNameList.empty()) {
1826         return E_OK;
1827     }
1828 
1829     int errCode = E_OK;
1830     SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(false, errCode);
1831     if (handle == nullptr) {
1832         return errCode;
1833     }
1834     for (const auto &tableName : tableNameList) {
1835         TableSchema tableSchema;
1836         int errCode = storageEngine_->GetCloudTableSchema(tableName, tableSchema);
1837         if (errCode != E_OK) {
1838             LOGE("[RelationalStore] Get schema failed when get download assets count, %d, tableName: %s, length: %zu",
1839                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1840             break;
1841         }
1842         errCode = handle->GetDownloadingAssetsCount(tableSchema, count);
1843         if (errCode != E_OK) {
1844             LOGE("[RelationalStore] Get download assets count failed: %d, tableName: %s, length: %zu",
1845                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1846             break;
1847         }
1848     }
1849     ReleaseHandle(handle);
1850     return errCode;
1851 }
1852 
SetTableMode(DistributedTableMode tableMode)1853 int SQLiteRelationalStore::SetTableMode(DistributedTableMode tableMode)
1854 {
1855     if (sqliteStorageEngine_ == nullptr) {
1856         LOGE("[RelationalStore][SetTableMode] sqliteStorageEngine was not initialized");
1857         return -E_INVALID_DB;
1858     }
1859     if (sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode() != tableMode) {
1860         auto schema = sqliteStorageEngine_->GetSchema();
1861         for (const auto &tableMap : schema.GetTables()) {
1862             if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1863                 LOGW("[RelationalStore][SetTableMode] Can not set table mode for table %s[%zu]",
1864                     DBCommon::StringMiddleMasking(tableMap.first).c_str(), tableMap.first.size());
1865                 return -E_NOT_SUPPORT;
1866             }
1867         }
1868     }
1869     RelationalDBProperties properties = sqliteStorageEngine_->GetRelationalProperties();
1870     properties.SetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(tableMode));
1871     sqliteStorageEngine_->SetProperties(properties);
1872     LOGI("[RelationalStore][SetTableMode] Set table mode to %d successful", static_cast<int>(tableMode));
1873     return E_OK;
1874 }
1875 
OperateDataStatus(uint32_t dataOperator)1876 int SQLiteRelationalStore::OperateDataStatus(uint32_t dataOperator)
1877 {
1878     LOGI("[RelationalStore] OperateDataStatus %" PRIu32, dataOperator);
1879     if ((dataOperator & static_cast<uint32_t>(DataOperator::UPDATE_TIME)) == 0) {
1880         return E_OK;
1881     }
1882     if (sqliteStorageEngine_ == nullptr) {
1883         LOGE("[RelationalStore] sqliteStorageEngine was not initialized when operate data status");
1884         return -E_INVALID_DB;
1885     }
1886     if (syncAbleEngine_ == nullptr) {
1887         LOGE("[RelationalStore] syncAbleEngine was not initialized when operate data status");
1888         return -E_INVALID_DB;
1889     }
1890     auto virtualTime = syncAbleEngine_->GetTimestamp();
1891     auto schema = sqliteStorageEngine_->GetSchema();
1892     std::vector<std::string> operateTables;
1893     for (const auto &tableMap : schema.GetTables()) {
1894         if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1895             operateTables.push_back(tableMap.second.GetTableName());
1896         }
1897     }
1898     return OperateDataStatusInner(operateTables, virtualTime);
1899 }
1900 
OperateDataStatusInner(const std::vector<std::string> & tables,uint64_t virtualTime) const1901 int SQLiteRelationalStore::OperateDataStatusInner(const std::vector<std::string> &tables, uint64_t virtualTime) const
1902 {
1903     int errCode = E_OK;
1904     SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(true, errCode);
1905     if (handle == nullptr) {
1906         LOGE("[RelationalStore][OperateDataStatus] Get handle failed %d when operate data status", errCode);
1907         return errCode;
1908     }
1909     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1910     if (errCode != E_OK) {
1911         LOGE("[RelationalStore][OperateDataStatus] Start transaction failed %d when operate data status", errCode);
1912         ReleaseHandle(handle);
1913         return errCode;
1914     }
1915     sqlite3 *db = nullptr;
1916     errCode = handle->GetDbHandle(db);
1917     if (errCode != E_OK) {
1918         LOGE("[RelationalStore][OperateDataStatus] Get db failed %d when operate data status", errCode);
1919         (void)handle->Rollback();
1920         ReleaseHandle(handle);
1921         return errCode;
1922     }
1923     errCode = SQLiteRelationalUtils::OperateDataStatus(db, tables, virtualTime);
1924     if (errCode == E_OK) {
1925         errCode = handle->Commit();
1926         if (errCode != E_OK) {
1927             LOGE("[RelationalStore][OperateDataStatus] Commit failed %d when operate data status", errCode);
1928         }
1929     } else {
1930         int ret = handle->Rollback();
1931         if (ret != E_OK) {
1932             LOGE("[RelationalStore][OperateDataStatus] Rollback failed %d when operate data status", ret);
1933         }
1934     }
1935     ReleaseHandle(handle);
1936     return errCode;
1937 }
1938 
GetDeviceSyncTaskCount() const1939 int32_t SQLiteRelationalStore::GetDeviceSyncTaskCount() const
1940 {
1941     if (syncAbleEngine_ == nullptr) {
1942         LOGW("[RelationalStore] syncAbleEngine was not initialized when get device sync task count");
1943         return 0;
1944     }
1945     return syncAbleEngine_->GetDeviceSyncTaskCount();
1946 }
1947 
CleanDirtyLogIfNeed(const std::string & tableName) const1948 void SQLiteRelationalStore::CleanDirtyLogIfNeed(const std::string &tableName) const
1949 {
1950     int errCode = E_OK;
1951     SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(true, errCode);
1952     if (handle == nullptr) {
1953         LOGW("[RDBStore][ClearDirtyLog] Get handle failed %d", errCode);
1954         return;
1955     }
1956     ResFinalizer finalizer([this, handle]() {
1957         SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle;
1958         ReleaseHandle(releaseHandle);
1959     });
1960     sqlite3 *db = nullptr;
1961     errCode = handle->GetDbHandle(db);
1962     if (errCode != E_OK) {
1963         LOGW("[RDBStore][ClearDirtyLog] Get db handle failed %d", errCode);
1964         return;
1965     }
1966     bool isExistDirtyLog = false;
1967     std::tie(errCode, isExistDirtyLog) = SQLiteRelationalUtils::CheckExistDirtyLog(db, tableName);
1968     if (errCode != E_OK) {
1969         LOGW("[RDBStore][ClearDirtyLog] Check dirty log failed %d", errCode);
1970         return;
1971     }
1972     if (!isExistDirtyLog) {
1973         return;
1974     }
1975     auto obj = GetSchemaObj();
1976     if (!obj.IsSchemaValid()) {
1977         return;
1978     }
1979     errCode = SQLiteRelationalUtils::CleanDirtyLog(db, tableName, obj);
1980     if (errCode != E_OK) {
1981         LOGW("[RDBStore][ClearDirtyLog] Clean dirty log failed %d", errCode);
1982     }
1983 }
1984 
GetSchemaObj() const1985 RelationalSchemaObject SQLiteRelationalStore::GetSchemaObj() const
1986 {
1987     if (storageEngine_ == nullptr) {
1988         return {};
1989     }
1990     return storageEngine_->GetSchemaInfo();
1991 }
1992 } // namespace DistributedDB
1993 #endif
1994