• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "sqlite_log_table_manager.h"
26 #include "sqlite_relational_store_connection.h"
27 #include "storage_engine_manager.h"
28 #include "cloud_sync_utils.h"
29 
30 namespace DistributedDB {
31 namespace {
32 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
33 }
34 
~SQLiteRelationalStore()35 SQLiteRelationalStore::~SQLiteRelationalStore()
36 {
37     sqliteStorageEngine_ = nullptr;
38 }
39 
40 // Called when a new connection created.
IncreaseConnectionCounter()41 void SQLiteRelationalStore::IncreaseConnectionCounter()
42 {
43     connectionCount_.fetch_add(1, std::memory_order_seq_cst);
44     if (connectionCount_.load() > 0) {
45         sqliteStorageEngine_->SetConnectionFlag(true);
46     }
47 }
48 
GetDBConnection(int & errCode)49 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
50 {
51     std::lock_guard<std::mutex> lock(connectMutex_);
52     RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
53     if (connection == nullptr) {
54         errCode = -E_OUT_OF_MEMORY;
55         return nullptr;
56     }
57     IncObjRef(this);
58     IncreaseConnectionCounter();
59     return connection;
60 }
61 
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)62 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
63 {
64     option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
65     option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
66     if (properties.IsEncrypted()) {
67         option.cipherType = properties.GetCipherType();
68         option.passwd = properties.GetPasswd();
69         option.iterTimes = properties.GetIterTimes();
70     }
71 }
72 
InitStorageEngine(const RelationalDBProperties & properties)73 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
74 {
75     OpenDbProperties option;
76     InitDataBaseOption(properties, option);
77     std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
78 
79     StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
80     int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
81     if (errCode != E_OK) {
82         LOGE("Init the sqlite storage engine failed:%d", errCode);
83     }
84     return errCode;
85 }
86 
ReleaseResources()87 void SQLiteRelationalStore::ReleaseResources()
88 {
89     if (sqliteStorageEngine_ != nullptr) {
90         sqliteStorageEngine_->ClearEnginePasswd();
91         sqliteStorageEngine_ = nullptr;
92     }
93 #ifdef USE_DISTRIBUTEDDB_CLOUD
94     if (cloudSyncer_ != nullptr) {
95         cloudSyncer_->Close();
96         RefObject::KillAndDecObjRef(cloudSyncer_);
97         cloudSyncer_ = nullptr;
98     }
99 #endif
100     RefObject::DecObjRef(storageEngine_);
101 }
102 
CheckDBMode()103 int SQLiteRelationalStore::CheckDBMode()
104 {
105     int errCode = E_OK;
106     auto *handle = GetHandle(true, errCode);
107     if (handle == nullptr) {
108         return errCode;
109     }
110     errCode = handle->CheckDBModeForRelational();
111     if (errCode != E_OK) {
112         LOGE("check relational DB mode failed. %d", errCode);
113     }
114 
115     ReleaseHandle(handle);
116     return errCode;
117 }
118 
GetSchemaFromMeta(RelationalSchemaObject & schema)119 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
120 {
121     Key schemaKey;
122     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
123     Value schemaVal;
124     int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
125     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
126         LOGE("Get relational schema from meta table failed. %d", errCode);
127         return errCode;
128     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
129         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
130         return -E_NOT_FOUND;
131     }
132 
133     std::string schemaStr;
134     DBCommon::VectorToString(schemaVal, schemaStr);
135     errCode = schema.ParseFromSchemaString(schemaStr);
136     if (errCode != E_OK) {
137         LOGE("Parse schema string from meta table failed.");
138         return errCode;
139     }
140 
141     sqliteStorageEngine_->SetSchema(schema);
142     return E_OK;
143 }
144 
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)145 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
146 {
147     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
148     Value modeVal;
149     int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
150     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
151         LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
152         return errCode;
153     }
154 
155     DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
156     if (!modeVal.empty()) {
157         std::string value(modeVal.begin(), modeVal.end());
158         orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
159     } else if (isUnSet) {
160         return E_OK; // First set table mode.
161     }
162 
163     if (orgMode == DistributedTableMode::COLLABORATION && orgMode != mode) {
164         LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
165         return -E_INVALID_ARGS;
166     }
167     return E_OK;
168 }
169 
CheckProperties(RelationalDBProperties properties)170 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
171 {
172     RelationalSchemaObject schema;
173     int errCode = GetSchemaFromMeta(schema);
174     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
175         LOGE("Get relational schema from meta failed. errcode=%d", errCode);
176         return errCode;
177     }
178     int ret = InitTrackerSchemaFromMeta();
179     if (ret != E_OK) {
180         LOGE("Init tracker schema from meta failed. errcode=%d", ret);
181         return ret;
182     }
183     properties.SetSchema(schema);
184 
185     // Empty schema means no distributed table has been used, we may set DB to any table mode
186     // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
187     bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
188     auto mode = properties.GetDistributedTableMode();
189     errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
190     if (errCode != E_OK) {
191         LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
192         return errCode;
193     }
194     if (!isSchemaEmpty) {
195         return errCode;
196     }
197 
198     errCode = SaveTableModeToMeta(mode);
199     if (errCode != E_OK) {
200         LOGE("Save table mode to meta failed. errCode=%d", errCode);
201         return errCode;
202     }
203 
204     return E_OK;
205 }
206 
SaveTableModeToMeta(DistributedTableMode mode)207 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
208 {
209     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
210     Value modeVal;
211     DBCommon::StringToVector(std::to_string(static_cast<int>(mode)), modeVal);
212     int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
213     if (errCode != E_OK) {
214         LOGE("Save relational schema to meta table failed. %d", errCode);
215     }
216     return errCode;
217 }
218 
SaveLogTableVersionToMeta()219 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
220 {
221     LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
222     const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY.begin(), DBConstant::LOG_TABLE_VERSION_KEY.end());
223     Value logVersion;
224     int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
225     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
226         LOGE("Get log version from meta table failed. %d", errCode);
227         return errCode;
228     }
229     std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
230     Value logVersionVal(versionStr.begin(), versionStr.end());
231     // log version is same, no need to update
232     if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
233         return errCode;
234     }
235     // If the log version does not exist or is different, update the log version
236     errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
237     if (errCode != E_OK) {
238         LOGE("save log table version to meta table failed. %d", errCode);
239     }
240     return errCode;
241 }
242 
CleanDistributedDeviceTable()243 int SQLiteRelationalStore::CleanDistributedDeviceTable()
244 {
245     std::vector<std::string> missingTables;
246     int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
247     if (errCode != E_OK) {
248         LOGE("Clean distributed device table failed. %d", errCode);
249     }
250     for (const auto &deviceTableName : missingTables) {
251         std::string deviceHash;
252         std::string tableName;
253         DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
254         syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
255         if (errCode != E_OK) {
256             LOGE("Erase water mark failed:%d", errCode);
257             return errCode;
258         }
259     }
260     return errCode;
261 }
262 
Open(const RelationalDBProperties & properties)263 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
264 {
265     std::lock_guard<std::mutex> lock(initalMutex_);
266     if (isInitialized_) {
267         LOGD("[RelationalStore][Open] relational db was already initialized.");
268         return E_OK;
269     }
270     int errCode = InitSQLiteStorageEngine(properties);
271     if (errCode != E_OK) {
272         return errCode;
273     }
274 
275     do {
276         errCode = InitStorageEngine(properties);
277         if (errCode != E_OK) {
278             LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
279             break;
280         }
281 
282         storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
283         if (storageEngine_ == nullptr) {
284             LOGE("[RelationalStore][Open] Create syncable storage failed");
285             errCode = -E_OUT_OF_MEMORY;
286             break;
287         }
288 
289         syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
290         // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
291         // be destructed when close store
292         storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
293 #ifdef USE_DISTRIBUTEDDB_CLOUD
294         cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
295 #endif
296         errCode = CheckDBMode();
297         if (errCode != E_OK) {
298             break;
299         }
300 
301         errCode = CheckProperties(properties);
302         if (errCode != E_OK) {
303             break;
304         }
305 
306         errCode = SaveLogTableVersionToMeta();
307         if (errCode != E_OK) {
308             break;
309         }
310 
311         errCode = CleanDistributedDeviceTable();
312         if (errCode != E_OK) {
313             break;
314         }
315 
316         isInitialized_ = true;
317         return E_OK;
318     } while (false);
319 
320     ReleaseResources();
321     return errCode;
322 }
323 
OnClose(const std::function<void (void)> & notifier)324 void SQLiteRelationalStore::OnClose(const std::function<void(void)> &notifier)
325 {
326     AutoLock lockGuard(this);
327     if (notifier) {
328         closeNotifiers_.push_back(notifier);
329     } else {
330         LOGW("Register 'Close()' notifier failed, notifier is null.");
331     }
332 }
333 
GetHandle(bool isWrite,int & errCode) const334 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
335 {
336     if (sqliteStorageEngine_ == nullptr) {
337         errCode = -E_INVALID_DB;
338         return nullptr;
339     }
340 
341     return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
342         sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
343 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const344 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
345 {
346     if (handle == nullptr) {
347         return;
348     }
349 
350     if (sqliteStorageEngine_ != nullptr) {
351         StorageExecutor *databaseHandle = handle;
352         sqliteStorageEngine_->Recycle(databaseHandle);
353         handle = nullptr;
354     }
355 }
356 
Sync(const ISyncer::SyncParma & syncParam,uint64_t connectionId)357 int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId)
358 {
359     return syncAbleEngine_->Sync(syncParam, connectionId);
360 }
361 
362 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)363 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
364 {
365     int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
366     if (count <= 0) {
367         LOGF("Decrease db connection counter failed, count <= 0.");
368         return;
369     }
370     if (storageEngine_ != nullptr) {
371         storageEngine_->EraseDataChangeCallback(connectionId);
372     }
373     if (count != 1) {
374         return;
375     }
376 
377     LockObj();
378     auto notifiers = std::move(closeNotifiers_);
379     UnlockObj();
380     for (const auto &notifier : notifiers) {
381         if (notifier) {
382             notifier();
383         }
384     }
385 
386     // Sync Close
387     syncAbleEngine_->Close();
388 
389 #ifdef USE_DISTRIBUTEDDB_CLOUD
390     if (cloudSyncer_ != nullptr) {
391         cloudSyncer_->Close();
392         RefObject::KillAndDecObjRef(cloudSyncer_);
393         cloudSyncer_ = nullptr;
394     }
395 #endif
396 
397     if (sqliteStorageEngine_ != nullptr) {
398         sqliteStorageEngine_ = nullptr;
399     }
400     {
401         if (storageEngine_ != nullptr) {
402             storageEngine_->RegisterHeartBeatListener(nullptr);
403         }
404         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
405         StopLifeCycleTimer();
406         lifeCycleNotifier_ = nullptr;
407     }
408     // close will dec sync ref of storageEngine_
409     DecObjRef(storageEngine_);
410 }
411 
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)412 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
413 {
414     if (connectionCount_.load() == 1) {
415         sqliteStorageEngine_->SetConnectionFlag(false);
416     }
417 
418     connectMutex_.lock();
419     if (connection != nullptr) {
420         KillAndDecObjRef(connection);
421         DecreaseConnectionCounter(connectionId);
422         connectMutex_.unlock();
423         KillAndDecObjRef(this);
424     } else {
425         connectMutex_.unlock();
426     }
427 }
428 
WakeUpSyncer()429 void SQLiteRelationalStore::WakeUpSyncer()
430 {
431     syncAbleEngine_->WakeUpSyncer();
432 }
433 
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)434 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
435     bool trackerSchemaChanged)
436 {
437     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
438     TableInfo tableInfo = localSchema.GetTable(tableName);
439     if (!tableInfo.Empty()) {
440         bool isSharedTable = tableInfo.GetSharedTableMark();
441         if (isSharedTable && !trackerSchemaChanged) {
442             return E_OK; // shared table will create distributed table when use SetCloudDbSchema
443         }
444     }
445 
446     bool schemaChanged = false;
447     int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(""),
448         schemaChanged, syncType, trackerSchemaChanged);
449     if (errCode != E_OK) {
450         LOGE("Create distributed table failed. %d", errCode);
451     }
452     if (schemaChanged) {
453         LOGD("Notify schema changed.");
454         storageEngine_->NotifySchemaChanged();
455     }
456     return errCode;
457 }
458 
459 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetCloudSyncTaskCount()460 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
461 {
462     if (cloudSyncer_ == nullptr) {
463         LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
464         return -1;
465     }
466     return cloudSyncer_->GetCloudSyncTaskCount();
467 }
468 
CleanCloudData(ClearMode mode)469 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
470 {
471     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
472     TableInfoMap tables = localSchema.GetTables();
473     std::vector<std::string> cloudTableNameList;
474     for (const auto &tableInfo : tables) {
475         bool isSharedTable = tableInfo.second.GetSharedTableMark();
476         if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
477             continue;
478         }
479         if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
480             cloudTableNameList.push_back(tableInfo.first);
481         }
482     }
483     if (cloudTableNameList.empty()) {
484         LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
485         return E_OK;
486     }
487     if (cloudSyncer_ == nullptr) {
488         LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
489         return -E_INVALID_DB;
490     }
491     int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
492     if (errCode != E_OK) {
493         LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
494     }
495 
496     return errCode;
497 }
498 #endif
499 
RemoveDeviceData()500 int SQLiteRelationalStore::RemoveDeviceData()
501 {
502     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
503         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
504     if (mode == DistributedTableMode::COLLABORATION) {
505         LOGE("Not support remove all device data in collaboration mode.");
506         return -E_NOT_SUPPORT;
507     }
508 
509     std::vector<std::string> tableNameList = GetAllDistributedTableName();
510     if (tableNameList.empty()) {
511         return E_OK;
512     }
513     // erase watermark first
514     int errCode = EraseAllDeviceWatermark(tableNameList);
515     if (errCode != E_OK) {
516         LOGE("remove watermark failed %d", errCode);
517         return errCode;
518     }
519     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
520     errCode = GetHandleAndStartTransaction(handle);
521     if (handle == nullptr) {
522         return errCode;
523     }
524 
525     for (const auto &table : tableNameList) {
526         errCode = handle->DeleteDistributedDeviceTable("", table);
527         if (errCode != E_OK) {
528             LOGE("delete device data failed. %d", errCode);
529             break;
530         }
531 
532         errCode = handle->DeleteDistributedAllDeviceTableLog(table);
533         if (errCode != E_OK) {
534             LOGE("delete device data failed. %d", errCode);
535             break;
536         }
537     }
538 
539     if (errCode != E_OK) {
540         (void)handle->Rollback();
541         ReleaseHandle(handle);
542         return errCode;
543     }
544 
545     errCode = handle->Commit();
546     ReleaseHandle(handle);
547     storageEngine_->NotifySchemaChanged();
548     return errCode;
549 }
550 
RemoveDeviceData(const std::string & device,const std::string & tableName)551 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
552 {
553     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
554         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
555     if (mode == DistributedTableMode::COLLABORATION) {
556         LOGE("Not support remove device data in collaboration mode.");
557         return -E_NOT_SUPPORT;
558     }
559 
560     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
561     auto iter = tables.find(tableName);
562     if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
563         LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
564         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
565     }
566     // cloud mode is not permit
567     if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
568         LOGE("Remove device data with cloud sync table name.");
569         return -E_NOT_SUPPORT;
570     }
571     bool isNeedHash = false;
572     std::string hashDeviceId;
573     int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
574     if (errCode == -E_NOT_SUPPORT) {
575         isNeedHash = true;
576         hashDeviceId = device;
577         errCode = E_OK;
578     }
579     if (errCode != E_OK) {
580         return errCode;
581     }
582     if (isNeedHash) {
583         // check device is uuid in meta
584         std::set<std::string> hashDevices;
585         errCode = GetExistDevices(hashDevices);
586         if (errCode != E_OK) {
587             return errCode;
588         }
589         if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
590             LOGD("[SQLiteRelationalStore] not match device, just return");
591             return E_OK;
592         }
593     }
594     return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
595 }
596 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)597 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
598     const RelationalObserverAction &action)
599 {
600     return storageEngine_->RegisterObserverAction(connectionId, observer, action);
601 }
602 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)603 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
604 {
605     return storageEngine_->UnRegisterObserverAction(connectionId, observer);
606 }
607 
StopLifeCycleTimer()608 int SQLiteRelationalStore::StopLifeCycleTimer()
609 {
610     auto runtimeCxt = RuntimeContext::GetInstance();
611     if (runtimeCxt == nullptr) {
612         return -E_INVALID_ARGS;
613     }
614     if (lifeTimerId_ != 0) {
615         TimerId timerId = lifeTimerId_;
616         lifeTimerId_ = 0;
617         runtimeCxt->RemoveTimer(timerId, false);
618     }
619     return E_OK;
620 }
621 
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)622 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier)
623 {
624     auto runtimeCxt = RuntimeContext::GetInstance();
625     if (runtimeCxt == nullptr) {
626         return -E_INVALID_ARGS;
627     }
628     RefObject::IncObjRef(this);
629     TimerId timerId = 0;
630     int errCode = runtimeCxt->SetTimer(
631         DBConstant::DEF_LIFE_CYCLE_TIME,
632         [this](TimerId id) -> int {
633             std::lock_guard<std::mutex> lock(lifeCycleMutex_);
634             if (lifeCycleNotifier_) {
635                 // normal identifier mode
636                 std::string identifier;
637                 if (sqliteStorageEngine_->GetProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
638                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(
639                         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
640                 } else {
641                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
642                 }
643                 auto userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
644                 lifeCycleNotifier_(identifier, userId);
645             }
646             return 0;
647         },
648         [this]() {
649             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
650             if (ret != E_OK) {
651                 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
652             }
653         },
654         timerId);
655     if (errCode != E_OK) {
656         lifeTimerId_ = 0;
657         LOGE("SetTimer failed:%d", errCode);
658         RefObject::DecObjRef(this);
659         return errCode;
660     }
661 
662     lifeCycleNotifier_ = notifier;
663     lifeTimerId_ = timerId;
664     return E_OK;
665 }
666 
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)667 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier)
668 {
669     int errCode;
670     {
671         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
672         if (lifeTimerId_ != 0) {
673             errCode = StopLifeCycleTimer();
674             if (errCode != E_OK) {
675                 LOGE("Stop the life cycle timer failed:%d", errCode);
676                 return errCode;
677             }
678         }
679 
680         if (!notifier) {
681             return E_OK;
682         }
683         errCode = StartLifeCycleTimer(notifier);
684         if (errCode != E_OK) {
685             LOGE("Register life cycle timer failed:%d", errCode);
686             return errCode;
687         }
688     }
689     auto listener = [this] { HeartBeat(); };
690     storageEngine_->RegisterHeartBeatListener(listener);
691     return errCode;
692 }
693 
HeartBeat()694 void SQLiteRelationalStore::HeartBeat()
695 {
696     std::lock_guard<std::mutex> lock(lifeCycleMutex_);
697     int errCode = ResetLifeCycleTimer();
698     if (errCode != E_OK) {
699         LOGE("Heart beat for life cycle failed:%d", errCode);
700     }
701 }
702 
ResetLifeCycleTimer()703 int SQLiteRelationalStore::ResetLifeCycleTimer()
704 {
705     if (lifeTimerId_ == 0) {
706         return E_OK;
707     }
708     auto lifeNotifier = lifeCycleNotifier_;
709     lifeCycleNotifier_ = nullptr;
710     int errCode = StopLifeCycleTimer();
711     if (errCode != E_OK) {
712         LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
713     }
714     return StartLifeCycleTimer(lifeNotifier);
715 }
716 
GetStorePath() const717 std::string SQLiteRelationalStore::GetStorePath() const
718 {
719     return sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
720 }
721 
GetProperties() const722 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
723 {
724     return sqliteStorageEngine_->GetProperties();
725 }
726 
StopSync(uint64_t connectionId)727 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
728 {
729     return syncAbleEngine_->StopSync(connectionId);
730 }
731 
Dump(int fd)732 void SQLiteRelationalStore::Dump(int fd)
733 {
734     std::string userId = "";
735     std::string appId = "";
736     std::string storeId = "";
737     std::string label = "";
738     if (sqliteStorageEngine_ != nullptr) {
739         userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
740         appId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, "");
741         storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
742         label = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
743     }
744     label = DBCommon::TransferStringToHex(label);
745     DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
746         storeId.c_str(), label.c_str());
747     if (syncAbleEngine_ != nullptr) {
748         syncAbleEngine_->Dump(fd);
749     }
750 }
751 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)752 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
753     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
754 {
755     if (sqliteStorageEngine_ == nullptr) {
756         return -E_INVALID_DB;
757     }
758     if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
759         LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
760         return -E_MAX_LIMITS;
761     }
762 
763     if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
764         LOGW("not a distributed relational store.");
765         return -E_NOT_SUPPORT;
766     }
767 
768     // Check whether to be able to operate the db.
769     int errCode = E_OK;
770     auto *handle = GetHandle(false, errCode);
771     if (handle == nullptr) {
772         return errCode;
773     }
774     errCode = handle->CheckEncryptedOrCorrupted();
775     ReleaseHandle(handle);
776     if (errCode != E_OK) {
777         return errCode;
778     }
779 
780     return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
781 }
782 
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)783 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
784 {
785     std::set<std::string> devices;
786     int errCode = GetExistDevices(devices);
787     if (errCode != E_OK) {
788         return errCode;
789     }
790     for (const auto &tableName : tableNameList) {
791         for (const auto &device : devices) {
792             errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
793             if (errCode != E_OK) {
794                 return errCode;
795             }
796         }
797     }
798     return E_OK;
799 }
800 
GetDevTableName(const std::string & device,const std::string & hashDev) const801 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
802 {
803     std::string devTableName;
804     StoreInfo info = { sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
805         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
806         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "") };
807     if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
808         devTableName = hashDev;
809     }
810     return devTableName;
811 }
812 
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const813 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
814 {
815     int errCode = E_OK;
816     handle = GetHandle(true, errCode);
817     if (handle == nullptr) {
818         LOGE("get handle failed %d", errCode);
819         return errCode;
820     }
821 
822     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
823     if (errCode != E_OK) {
824         LOGE("start transaction failed %d", errCode);
825         ReleaseHandle(handle);
826     }
827     return errCode;
828 }
829 
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)830 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
831     const std::string &tableName, bool isNeedHash)
832 {
833     std::string hashHexDev;
834     std::string hashDev;
835     std::string devTableName;
836     if (!isNeedHash) {
837         // if is not need hash mappingDev mean hash(uuid) device is param device
838         hashHexDev = DBCommon::TransferStringToHex(mappingDev);
839         hashDev = mappingDev;
840         devTableName = device;
841     } else {
842         // if is need hash mappingDev mean uuid
843         hashDev = DBCommon::TransferHashString(mappingDev);
844         hashHexDev = DBCommon::TransferStringToHex(hashDev);
845         devTableName = GetDevTableName(mappingDev, hashHexDev);
846     }
847     // erase watermark first
848     int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
849     if (errCode != E_OK) {
850         LOGE("erase watermark failed %d", errCode);
851         return errCode;
852     }
853     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
854     errCode = GetHandleAndStartTransaction(handle);
855     if (handle == nullptr) {
856         return errCode;
857     }
858 
859     errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
860     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
861     if (errCode != E_OK) {
862         LOGE("delete device data failed. %d", errCode);
863         tables.clear();
864     }
865 
866     for (const auto &it : tables) {
867         if (tableName.empty() || it.second.GetTableName() == tableName) {
868             errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
869             if (errCode != E_OK) {
870                 LOGE("delete device data failed. %d", errCode);
871                 break;
872             }
873         }
874     }
875 
876     if (errCode != E_OK) {
877         (void)handle->Rollback();
878         ReleaseHandle(handle);
879         return errCode;
880     }
881     errCode = handle->Commit();
882     ReleaseHandle(handle);
883     storageEngine_->NotifySchemaChanged();
884     return errCode;
885 }
886 
GetExistDevices(std::set<std::string> & hashDevices) const887 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
888 {
889     int errCode = E_OK;
890     auto *handle = GetHandle(true, errCode);
891     if (handle == nullptr) {
892         LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
893         return errCode;
894     }
895     errCode = handle->GetExistsDeviceList(hashDevices);
896     if (errCode != E_OK) {
897         LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
898     }
899     ReleaseHandle(handle);
900     return errCode;
901 }
902 
GetAllDistributedTableName(TableSyncType tableSyncType)903 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName(TableSyncType tableSyncType)
904 {
905     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
906     std::vector<std::string> tableNames;
907     for (const auto &table : tables) {
908         if (table.second.GetTableSyncType() != tableSyncType) {
909             continue;
910         }
911         tableNames.push_back(table.second.GetTableName());
912     }
913     return tableNames;
914 }
915 
916 #ifdef USE_DISTRIBUTEDDB_CLOUD
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)917 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
918 {
919     if (cloudSyncer_ == nullptr) {
920         LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
921         return -E_INVALID_DB;
922     }
923     cloudSyncer_->SetCloudDB(cloudDb);
924     return E_OK;
925 }
926 #endif
927 
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)928 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
929     std::vector<Field> &addFields)
930 {
931     for (const auto &newField : newFields) {
932         if (equalFields.find(newField.colName) == equalFields.end()) {
933             addFields.push_back(newField);
934         }
935     }
936 }
937 
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)938 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
939     std::vector<Field> &addFields)
940 {
941     std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
942     if (newFields.size() < oldFields.size()) {
943         return false;
944     }
945     std::set<std::string> equalFields;
946     for (const auto &oldField : oldFields) {
947         bool isFieldExist = false;
948         for (const auto &newField : newFields) {
949             if (newField.colName != oldField.GetFieldName()) {
950                 continue;
951             }
952             isFieldExist = true;
953             int32_t type = newField.type;
954             // Field type need to match storage type
955             // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
956             // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
957             if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
958                 type++; // storage type - field type = 1
959             } else if (type == TYPE_INDEX<bool>) {
960                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
961             } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
962                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
963             }
964             auto primaryKeyMap = tableInfo.GetPrimaryKey();
965             auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
966                 [&newField](const std::map<int, std::string>::value_type &pair) {
967                     return pair.second == newField.colName;
968                 });
969             if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
970                 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
971                 return false;
972             }
973             equalFields.insert(newField.colName);
974         }
975         if (!isFieldExist) {
976             return false;
977         }
978     }
979     AddFields(newFields, equalFields, addFields);
980     return true;
981 }
982 
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)983 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
984     std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
985 {
986     std::set<std::string> tableNames;
987     std::map<std::string, std::string> sharedTableNamesMap;
988     std::map<std::string, std::vector<Field>> fieldsMap;
989     for (const auto &table : schema.tables) {
990         tableNames.insert(table.name);
991         sharedTableNamesMap[table.name] = table.sharedTableName;
992         std::vector<Field> fields = table.fields;
993         bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
994         Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
995         Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
996         fields.insert(fields.begin(), privilegeField);
997         fields.insert(fields.begin(), ownerField);
998         fieldsMap[table.name] = fields;
999     }
1000 
1001     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1002     TableInfoMap tableList = localSchema.GetTables();
1003     for (const auto &tableInfo : tableList) {
1004         if (!tableInfo.second.GetSharedTableMark()) {
1005             continue;
1006         }
1007         std::string oldSharedTableName = tableInfo.second.GetTableName();
1008         std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1009         std::vector<Field> addFields;
1010         if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1011             deleteTableNames.push_back(oldSharedTableName);
1012         } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1013             deleteTableNames.push_back(oldSharedTableName);
1014         } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1015             if (!addFields.empty()) {
1016                 updateTableNames[oldSharedTableName] = addFields;
1017             }
1018             if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1019                 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1020             }
1021         } else {
1022             return false;
1023         }
1024     }
1025     return true;
1026 }
1027 
1028 #ifdef USE_DISTRIBUTEDDB_CLOUD
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1029 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1030 {
1031     if (storageEngine_ == nullptr) {
1032         LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1033         return -E_INVALID_DB;
1034     }
1035     int errCode = CheckCloudSchema(schema);
1036     if (errCode != E_OK) {
1037         return errCode;
1038     }
1039     // delete, update and create shared table and its distributed table
1040     errCode = ExecuteCreateSharedTable(schema);
1041     if (errCode != E_OK) {
1042         LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1043         return errCode;
1044     }
1045     return storageEngine_->SetCloudDbSchema(schema);
1046 }
1047 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1048 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1049 {
1050     if (cloudSyncer_ == nullptr) {
1051         LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1052         return -E_INVALID_DB;
1053     }
1054     cloudSyncer_->SetIAssetLoader(loader);
1055     return E_OK;
1056 }
1057 #endif
1058 
ChkSchema(const TableName & tableName)1059 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1060 {
1061     // check schema first then compare columns to avoid change the origin return error code
1062     if (storageEngine_ == nullptr) {
1063         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1064         return -E_INVALID_DB;
1065     }
1066     int errCode = storageEngine_->ChkSchema(tableName);
1067     if (errCode != E_OK) {
1068         LOGE("[SQLiteRelationalStore][ChkSchema] ChkSchema failed %d.", errCode);
1069         return errCode;
1070     }
1071     auto *handle = GetHandle(false, errCode);
1072     if (handle == nullptr) {
1073         LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1074         return errCode;
1075     }
1076     RelationalSchemaObject localSchema = storageEngine_->GetSchemaInfo();
1077     handle->SetLocalSchema(localSchema);
1078     errCode = handle->CompareSchemaTableColumns(tableName);
1079     if (errCode != E_OK) {
1080         LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1081     }
1082     ReleaseHandle(handle);
1083     return errCode;
1084 }
1085 
1086 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1087 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1088 {
1089     if (storageEngine_ == nullptr) {
1090         LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1091         return -E_INVALID_DB;
1092     }
1093     int errCode = CheckBeforeSync(option);
1094     if (errCode != E_OK) {
1095         return errCode;
1096     }
1097     LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1098     if (option.compensatedSyncOnly) {
1099         CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1100         info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1101         cloudSyncer_->GenerateCompensatedSync(info);
1102         return E_OK;
1103     }
1104     CloudSyncer::CloudTaskInfo info;
1105     FillSyncInfo(option, onProcess, info);
1106     auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1107     if (ret != E_OK) {
1108         return ret;
1109     }
1110     ret = ReFillSyncInfoTable(table, info);
1111     if (ret != E_OK) {
1112         return ret;
1113     }
1114     info.taskId = taskId;
1115     errCode = cloudSyncer_->Sync(info);
1116     return errCode;
1117 }
1118 
CheckBeforeSync(const CloudSyncOption & option)1119 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1120 {
1121     if (cloudSyncer_ == nullptr) {
1122         LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1123         return -E_INVALID_DB;
1124     }
1125     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1126         return -E_INVALID_ARGS;
1127     }
1128     if (option.priorityLevel < CloudDbConstant::PRIORITY_TASK_DEFALUT_LEVEL ||
1129         option.priorityLevel > CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1130         LOGE("[RelationalStore] priority level is invalid value:%d", option.priorityLevel);
1131         return -E_INVALID_ARGS;
1132     }
1133     if (option.compensatedSyncOnly && option.asyncDownloadAssets) {
1134         return -E_NOT_SUPPORT;
1135     }
1136     int errCode = CheckQueryValid(option);
1137     if (errCode != E_OK) {
1138         return errCode;
1139     }
1140     SecurityOption securityOption;
1141     errCode = storageEngine_->GetSecurityOption(securityOption);
1142     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1143         return -E_SECURITY_OPTION_CHECK_ERROR;
1144     }
1145     if (errCode == E_OK && securityOption.securityLabel == S4) {
1146         return -E_SECURITY_OPTION_CHECK_ERROR;
1147     }
1148     return E_OK;
1149 }
1150 
CheckAssetsOnlyValid(const QuerySyncObject & querySyncObject,const CloudSyncOption & option)1151 int SQLiteRelationalStore::CheckAssetsOnlyValid(const QuerySyncObject &querySyncObject, const CloudSyncOption &option)
1152 {
1153     if (!querySyncObject.IsAssetsOnly()) {
1154         return E_OK;
1155     }
1156     if (option.mode != SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1157         LOGE("[RelationalStore] not support mode %d when sync with assets only", option.mode);
1158         return -E_NOT_SUPPORT;
1159     }
1160     if (option.priorityLevel != CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1161         LOGE("[RelationalStore] priorityLevel must be 2 when sync with assets only, now is %d",
1162             option.priorityLevel);
1163         return -E_INVALID_ARGS;
1164     }
1165     if (querySyncObject.AssetsOnlyErrFlag() == -E_INVALID_ARGS) {
1166         LOGE("[RelationalStore] the query statement of assets only is incorrect.");
1167         return -E_INVALID_ARGS;
1168     }
1169     return E_OK;
1170 }
1171 
CheckQueryValid(const CloudSyncOption & option)1172 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1173 {
1174     if (option.compensatedSyncOnly) {
1175         return E_OK;
1176     }
1177     QuerySyncObject syncObject(option.query);
1178     int errCode = syncObject.GetValidStatus();
1179     if (errCode != E_OK) {
1180         LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1181         return errCode;
1182     }
1183     std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1184     bool isFromTable = object.empty();
1185     if (!option.priorityTask && !isFromTable) {
1186         LOGE("[RelationalStore] not support normal sync with query");
1187         return -E_NOT_SUPPORT;
1188     }
1189     const auto tableNames = syncObject.GetRelationTableNames();
1190     for (const auto &tableName : tableNames) {
1191         QuerySyncObject querySyncObject;
1192         querySyncObject.SetTableName(tableName);
1193         object.push_back(querySyncObject);
1194     }
1195     std::vector<std::string> syncTableNames;
1196     for (const auto &item : object) {
1197         std::string tableName = item.GetRelationTableName();
1198         syncTableNames.emplace_back(tableName);
1199         if (item.IsContainQueryNodes() && option.asyncDownloadAssets) {
1200             LOGE("[RelationalStore] not support async download assets with query table %s length %zu",
1201                 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length());
1202             return -E_NOT_SUPPORT;
1203         }
1204         errCode = CheckAssetsOnlyValid(item, option);
1205         if (errCode != E_OK) {
1206             return errCode;
1207         }
1208     }
1209     errCode = CheckTableName(syncTableNames);
1210     if (errCode != E_OK) {
1211         return errCode;
1212     }
1213     return CheckObjectValid(option.priorityTask, object, isFromTable);
1214 }
1215 
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1216 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1217     bool isFromTable)
1218 {
1219     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1220     for (const auto &item : object) {
1221         if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1222             LOGE("[RelationalStore] not support priority sync with full table");
1223             return -E_INVALID_ARGS;
1224         }
1225         int errCode = storageEngine_->CheckQueryValid(item);
1226         if (errCode != E_OK) {
1227             return errCode;
1228         }
1229         if (!priorityTask || isFromTable) {
1230             continue;
1231         }
1232         if (!item.IsInValueOutOfLimit()) {
1233             LOGE("[RelationalStore] not support priority sync in count out of limit");
1234             return -E_MAX_LIMITS;
1235         }
1236         std::string tableName = item.GetRelationTableName();
1237         TableInfo tableInfo = localSchema.GetTable(tableName);
1238         if (!tableInfo.Empty()) {
1239             const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1240             errCode = item.CheckPrimaryKey(primaryKeyMap);
1241             if (errCode != E_OK) {
1242                 return errCode;
1243             }
1244         }
1245     }
1246     return E_OK;
1247 }
1248 
CheckTableName(const std::vector<std::string> & tableNames)1249 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1250 {
1251     if (tableNames.empty()) {
1252         LOGE("[RelationalStore] sync with empty table");
1253         return -E_INVALID_ARGS;
1254     }
1255     for (const auto &table : tableNames) {
1256         int errCode = ChkSchema(table);
1257         if (errCode != E_OK) {
1258             LOGE("[RelationalStore] schema check failed when sync");
1259             return errCode;
1260         }
1261     }
1262     return E_OK;
1263 }
1264 
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1265 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1266     CloudSyncer::CloudTaskInfo &info)
1267 {
1268     auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1269     if (syncObject.empty()) {
1270         QuerySyncObject querySyncObject(option.query);
1271         info.table = querySyncObject.GetRelationTableNames();
1272         for (const auto &item : info.table) {
1273             QuerySyncObject object(Query::Select());
1274             object.SetTableName(item);
1275             info.queryList.push_back(object);
1276         }
1277     } else {
1278         for (auto &item : syncObject) {
1279             info.table.push_back(item.GetRelationTableName());
1280             info.queryList.push_back(std::move(item));
1281         }
1282     }
1283     info.devices = option.devices;
1284     info.mode = option.mode;
1285     info.callback = onProcess;
1286     info.timeout = option.waitTime;
1287     info.priorityTask = option.priorityTask;
1288     info.compensatedTask = option.compensatedSyncOnly;
1289     info.priorityLevel = option.priorityLevel;
1290     info.users.emplace_back("");
1291     info.lockAction = option.lockAction;
1292     info.merge = option.merge;
1293     info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1294     info.prepareTraceId = option.prepareTraceId;
1295     info.asyncDownloadAssets = option.asyncDownloadAssets;
1296 }
1297 #endif
1298 
SetTrackerTable(const TrackerSchema & trackerSchema)1299 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1300 {
1301     TableInfo tableInfo;
1302     bool isFirstCreate = false;
1303     bool isNoTableInSchema = false;
1304     int errCode = CheckTrackerTable(trackerSchema, tableInfo, isNoTableInSchema, isFirstCreate);
1305     if (errCode != E_OK) {
1306         if (errCode != -E_IGNORE_DATA) {
1307             return errCode;
1308         }
1309         auto *handle = GetHandle(true, errCode);
1310         if (handle != nullptr) {
1311             handle->CheckAndCreateTrigger(tableInfo);
1312             ReleaseHandle(handle);
1313         }
1314         return E_OK;
1315     }
1316     errCode = sqliteStorageEngine_->UpdateExtendField(trackerSchema);
1317     if (errCode != E_OK) {
1318         LOGE("[RelationalStore] update [%s [%zu]] extend_field failed: %d",
1319             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1320         return errCode;
1321     }
1322     if (isNoTableInSchema) {
1323         return sqliteStorageEngine_->SetTrackerTable(trackerSchema, tableInfo, isFirstCreate);
1324     }
1325     sqliteStorageEngine_->CacheTrackerSchema(trackerSchema);
1326     errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1327     if (errCode != E_OK) {
1328         LOGE("[RelationalStore] create distributed table of [%s [%zu]] failed: %d",
1329             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1330         return errCode;
1331     }
1332     return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1333 }
1334 
CheckTrackerTable(const TrackerSchema & trackerSchema,TableInfo & table,bool & isNoTableInSchema,bool & isFirstCreate)1335 int SQLiteRelationalStore::CheckTrackerTable(const TrackerSchema &trackerSchema, TableInfo &table,
1336     bool &isNoTableInSchema, bool &isFirstCreate)
1337 {
1338     const RelationalSchemaObject &tracker = sqliteStorageEngine_->GetTrackerSchema();
1339     isFirstCreate = tracker.GetTrackerTable(trackerSchema.tableName).GetTableName().empty();
1340     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1341     table = localSchema.GetTable(trackerSchema.tableName);
1342     TrackerTable trackerTable;
1343     trackerTable.Init(trackerSchema);
1344     int errCode = E_OK;
1345     if (table.Empty()) {
1346         isNoTableInSchema = true;
1347         table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
1348         auto *handle = GetHandle(true, errCode);
1349         if (handle == nullptr) {
1350             return errCode;
1351         }
1352         errCode = handle->AnalysisTrackerTable(trackerTable, table);
1353         ReleaseHandle(handle);
1354         if (errCode != E_OK) {
1355             LOGE("[CheckTrackerTable] analysis table schema failed %d.", errCode);
1356             return errCode;
1357         }
1358     } else {
1359         table.SetTrackerTable(trackerTable);
1360         errCode = table.CheckTrackerTable();
1361         if (errCode != E_OK) {
1362             LOGE("[CheckTrackerTable] check tracker table schema failed. %d", errCode);
1363             return errCode;
1364         }
1365     }
1366     if (!trackerSchema.isForceUpgrade && !tracker.GetTrackerTable(trackerSchema.tableName).IsChanging(trackerSchema)) {
1367         LOGW("[CheckTrackerTable] tracker schema is no change, table[%s [%zu]]",
1368             DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size());
1369         return -E_IGNORE_DATA;
1370     }
1371     return E_OK;
1372 }
1373 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1374 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1375 {
1376     if (condition.sql.empty()) {
1377         LOGE("[RelationalStore] execute sql is empty.");
1378         return -E_INVALID_ARGS;
1379     }
1380     return sqliteStorageEngine_->ExecuteSql(condition, records);
1381 }
1382 
CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor * & handle,std::set<std::string> & clearWaterMarkTable)1383 int SQLiteRelationalStore::CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle,
1384     std::set<std::string> &clearWaterMarkTable)
1385 {
1386     int errCode = E_OK;
1387     for (const auto &tableName : clearWaterMarkTable) {
1388         std::string cloudWaterMark;
1389         Value blobMetaVal;
1390         errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1391         if (errCode != E_OK) {
1392             LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1393             return errCode;
1394         }
1395         errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1396         if (errCode != E_OK) {
1397             LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1398             return errCode;
1399         }
1400         errCode = handle->CleanUploadFinishedFlag(tableName);
1401         if (errCode != E_OK) {
1402             LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1403             return errCode;
1404         }
1405     }
1406     return errCode;
1407 }
1408 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1409 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1410 {
1411     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1412     int errCode = GetHandleAndStartTransaction(handle);
1413     if (errCode != E_OK) {
1414         LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1415         return errCode;
1416     }
1417     std::set<std::string> clearWaterMarkTables;
1418     RelationalSchemaObject schema;
1419     errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1420     if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1421         LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1422         (void)handle->Rollback();
1423         ReleaseHandle(handle);
1424         return errCode;
1425     }
1426 
1427     if (!clearWaterMarkTables.empty()) {
1428         storageEngine_->SetReusedHandle(handle);
1429         int ret = CleanWaterMark(handle, clearWaterMarkTables);
1430         if (ret != E_OK) {
1431             LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", ret);
1432             storageEngine_->SetReusedHandle(nullptr);
1433             (void)handle->Rollback();
1434             ReleaseHandle(handle);
1435             return ret;
1436         }
1437         storageEngine_->SetReusedHandle(nullptr);
1438         LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1439     }
1440 
1441     int ret = handle->Commit();
1442     ReleaseHandle(handle);
1443     if (ret != E_OK) {
1444         LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1445         return ret;
1446     }
1447     sqliteStorageEngine_->SetSchema(schema);
1448 #ifdef USE_DISTRIBUTEDDB_CLOUD
1449     if (!clearWaterMarkTables.empty()) {
1450         ret = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTables);
1451         if (ret != E_OK) {
1452             LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1453             return ret;
1454         }
1455     }
1456 #endif
1457     return errCode;
1458 }
1459 
InitTrackerSchemaFromMeta()1460 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1461 {
1462     int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1463     return errCode == -E_NOT_FOUND ? E_OK : errCode;
1464 }
1465 
CleanTrackerData(const std::string & tableName,int64_t cursor)1466 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1467 {
1468     if (tableName.empty()) {
1469         return -E_INVALID_ARGS;
1470     }
1471     return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1472 }
1473 
ExecuteCreateSharedTable(const DataBaseSchema & schema)1474 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1475 {
1476     if (sqliteStorageEngine_ == nullptr) {
1477         LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1478         return -E_INVALID_DB;
1479     }
1480     std::vector<std::string> deleteTableNames;
1481     std::map<std::string, std::vector<Field>> updateTableNames;
1482     std::map<std::string, std::string> alterTableNames;
1483     if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1484         LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1485         return -E_INVALID_ARGS;
1486     }
1487     LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1488     // upgrade contains delete, alter, update and create
1489     int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1490     if (errCode != E_OK) {
1491         LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1492     } else {
1493         LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1494     }
1495     return errCode;
1496 }
1497 
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1498 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1499     CloudSyncer::CloudTaskInfo &info)
1500 {
1501     if (info.priorityTask && actualTable.size() != info.table.size()) {
1502         LOGE("[RelationalStore] Not support regenerate table with priority task");
1503         return -E_NOT_SUPPORT;
1504     }
1505     if (actualTable.size() == info.table.size()) {
1506         return E_OK;
1507     }
1508     LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1509     info.table = actualTable;
1510     info.queryList.clear();
1511     for (const auto &item : info.table) {
1512         QuerySyncObject object(Query::Select());
1513         object.SetTableName(item);
1514         info.queryList.push_back(object);
1515     }
1516     return E_OK;
1517 }
1518 
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1519 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1520 {
1521     if (cmd != LOGIC_DELETE_SYNC_DATA) {
1522         return -E_NOT_SUPPORT;
1523     }
1524     if (pragmaData == nullptr) {
1525         return -E_INVALID_ARGS;
1526     }
1527     auto logicDelete = *(static_cast<bool *>(pragmaData));
1528     if (storageEngine_ == nullptr) {
1529         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1530         return -E_INVALID_DB;
1531     }
1532     storageEngine_->SetLogicDelete(logicDelete);
1533     return E_OK;
1534 }
1535 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1536 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1537     const std::vector<VBucket> &records)
1538 {
1539     if (storageEngine_ == nullptr) {
1540         LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1541         return -E_INVALID_DB;
1542     }
1543     int errCode = CheckParamForUpsertData(status, tableName, records);
1544     if (errCode != E_OK) {
1545         return errCode;
1546     }
1547     return storageEngine_->UpsertData(status, tableName, records);
1548 }
1549 
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1550 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1551     const std::vector<VBucket> &records)
1552 {
1553     if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1554         LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1555         return -E_INVALID_ARGS;
1556     }
1557     if (records.empty()) {
1558         LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1559         return -E_INVALID_ARGS;
1560     }
1561     size_t recordSize = records.size();
1562     if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1563         LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1564         return -E_MAX_LIMITS;
1565     }
1566     return CheckSchemaForUpsertData(tableName, records);
1567 }
1568 
ChkTable(const TableInfo & table)1569 static int ChkTable(const TableInfo &table)
1570 {
1571     if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1572         LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1573         return -E_NOT_SUPPORT;
1574     }
1575     if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1576         return -E_NOT_FOUND;
1577     }
1578     return E_OK;
1579 }
1580 
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1581 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1582 {
1583     if (tableName.empty()) {
1584         return -E_INVALID_ARGS;
1585     }
1586     auto schema = storageEngine_->GetSchemaInfo();
1587     auto table = schema.GetTable(tableName);
1588     int errCode = ChkTable(table);
1589     if (errCode != E_OK) {
1590         return errCode;
1591     }
1592     TableSchema cloudTableSchema;
1593     errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1594     if (errCode != E_OK) {
1595         LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1596         return errCode;
1597     }
1598     errCode = ChkSchema(tableName);
1599     if (errCode != E_OK) {
1600         return errCode;
1601     }
1602     std::set<std::string> dbPkFields;
1603     for (auto &field : table.GetIdentifyKey()) {
1604         dbPkFields.insert(field);
1605     }
1606     std::set<std::string> schemaFields;
1607     for (auto &fieldInfo : table.GetFieldInfos()) {
1608         schemaFields.insert(fieldInfo.GetFieldName());
1609     }
1610     for (const auto &record : records) {
1611         std::set<std::string> recordPkFields;
1612         for (const auto &item : record) {
1613             if (schemaFields.find(item.first) == schemaFields.end()) {
1614                 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1615                 return -E_INVALID_ARGS;
1616             }
1617             if (dbPkFields.find(item.first) == dbPkFields.end()) {
1618                 continue;
1619             }
1620             recordPkFields.insert(item.first);
1621         }
1622         if (recordPkFields.size() != dbPkFields.size()) {
1623             LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1624                 recordPkFields.size(), dbPkFields.size());
1625             return -E_INVALID_ARGS;
1626         }
1627     }
1628     return errCode;
1629 }
1630 
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1631 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1632 {
1633     auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1634     if (engine == nullptr) {
1635         LOGE("[RelationalStore][Open] Create storage engine failed");
1636         return -E_OUT_OF_MEMORY;
1637     }
1638     sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1639         [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1640         RefObject::KillAndDecObjRef(releaseEngine);
1641     });
1642     return E_OK;
1643 }
1644 
1645 #ifdef USE_DISTRIBUTEDDB_CLOUD
CheckCloudSchema(const DataBaseSchema & schema)1646 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1647 {
1648     if (storageEngine_ == nullptr) {
1649         LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1650         return -E_INVALID_DB;
1651     }
1652     std::shared_ptr<DataBaseSchema> cloudSchema;
1653     (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1654     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1655     for (const auto &tableSchema : schema.tables) {
1656         TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1657         if (tableInfo.Empty()) {
1658             continue;
1659         }
1660         if (tableInfo.GetSharedTableMark()) {
1661             LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1662             return -E_INVALID_ARGS;
1663         }
1664     }
1665     for (const auto &tableSchema : schema.tables) {
1666         if (cloudSchema == nullptr) {
1667             continue;
1668         }
1669         for (const auto &oldSchema : cloudSchema->tables) {
1670             if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1671                 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1672                 return -E_INVALID_ARGS;
1673             }
1674         }
1675     }
1676     return E_OK;
1677 }
1678 
SetCloudSyncConfig(const CloudSyncConfig & config)1679 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1680 {
1681     if (storageEngine_ == nullptr) {
1682         LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1683         return -E_INVALID_DB;
1684     }
1685     storageEngine_->SetCloudSyncConfig(config);
1686     return E_OK;
1687 }
1688 
GetCloudTaskStatus(uint64_t taskId)1689 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1690 {
1691     return cloudSyncer_->GetCloudTaskStatus(taskId);
1692 }
1693 #endif
1694 
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1695 int SQLiteRelationalStore::SetDistributedSchema(const DistributedSchema &schema, bool isForceUpgrade)
1696 {
1697     if (sqliteStorageEngine_ == nullptr || storageEngine_ == nullptr) {
1698         LOGE("[RelationalStore] engine was not initialized");
1699         return -E_INVALID_DB;
1700     }
1701     auto [errCode, isSchemaChange] = sqliteStorageEngine_->SetDistributedSchema(schema, isForceUpgrade);
1702     if (errCode != E_OK) {
1703         return errCode;
1704     }
1705     if (isSchemaChange) {
1706         LOGI("[RelationalStore] schema was changed by setting distributed schema");
1707         storageEngine_->NotifySchemaChanged();
1708     }
1709     return E_OK;
1710 }
1711 
GetDownloadingAssetsCount(int32_t & count)1712 int SQLiteRelationalStore::GetDownloadingAssetsCount(int32_t &count)
1713 {
1714     std::vector<std::string> tableNameList = GetAllDistributedTableName(TableSyncType::CLOUD_COOPERATION);
1715     if (tableNameList.empty()) {
1716         return E_OK;
1717     }
1718 
1719     int errCode = E_OK;
1720     SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(false, errCode);
1721     if (handle == nullptr) {
1722         return errCode;
1723     }
1724     for (const auto &tableName : tableNameList) {
1725         TableSchema tableSchema;
1726         int errCode = storageEngine_->GetCloudTableSchema(tableName, tableSchema);
1727         if (errCode != E_OK) {
1728             LOGE("[RelationalStore] Get schema failed when get download assets count, %d, tableName: %s, length: %zu",
1729                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1730             break;
1731         }
1732         errCode = handle->GetDownloadingAssetsCount(tableSchema, count);
1733         if (errCode != E_OK) {
1734             LOGE("[RelationalStore] Get download assets count failed: %d, tableName: %s, length: %zu",
1735                 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1736             break;
1737         }
1738     }
1739     ReleaseHandle(handle);
1740     return errCode;
1741 }
1742 
SetTableMode(DistributedTableMode tableMode)1743 int SQLiteRelationalStore::SetTableMode(DistributedTableMode tableMode)
1744 {
1745     if (sqliteStorageEngine_ == nullptr) {
1746         LOGE("[RelationalStore][SetTableMode] sqliteStorageEngine was not initialized");
1747         return -E_INVALID_DB;
1748     }
1749     if (sqliteStorageEngine_->GetProperties().GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE &&
1750         tableMode == DistributedTableMode::COLLABORATION) {
1751         auto schema = sqliteStorageEngine_->GetSchema();
1752         for (const auto &tableMap : schema.GetTables()) {
1753             if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1754                 LOGW("[RelationalStore][SetTableMode] Can not set table mode for table %s[%zu]",
1755                     DBCommon::StringMiddleMasking(tableMap.first).c_str(), tableMap.first.size());
1756                 return -E_NOT_SUPPORT;
1757             }
1758         }
1759     }
1760     RelationalDBProperties properties = sqliteStorageEngine_->GetProperties();
1761     properties.SetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(tableMode));
1762     sqliteStorageEngine_->SetProperties(properties);
1763     LOGI("[RelationalStore][SetTableMode] Set table mode to %d successful", static_cast<int>(tableMode));
1764     return E_OK;
1765 }
1766 } // namespace DistributedDB
1767 #endif
1768