• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_relational_storage_engine.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "res_finalizer.h"
21 #include "sqlite_relational_database_upgrader.h"
22 #include "sqlite_single_ver_relational_storage_executor.h"
23 #include "sqlite_relational_utils.h"
24 
25 
26 namespace DistributedDB {
SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)27 SQLiteSingleRelationalStorageEngine::SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)
28     : properties_(properties)
29 {}
30 
~SQLiteSingleRelationalStorageEngine()31 SQLiteSingleRelationalStorageEngine::~SQLiteSingleRelationalStorageEngine()
32 {}
33 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)34 StorageExecutor *SQLiteSingleRelationalStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite,
35     bool isMemDb)
36 {
37     auto mode = properties_.GetDistributedTableMode();
38     return new (std::nothrow) SQLiteSingleVerRelationalStorageExecutor(dbHandle, isWrite, mode);
39 }
40 
Upgrade(sqlite3 * db)41 int SQLiteSingleRelationalStorageEngine::Upgrade(sqlite3 *db)
42 {
43     int errCode = CreateRelationalMetaTable(db);
44     if (errCode != E_OK) {
45         LOGE("Create relational store meta table failed. err=%d", errCode);
46         return errCode;
47     }
48     LOGD("[RelationalEngine][Upgrade] upgrade relational store.");
49     auto upgrader = std::make_unique<SqliteRelationalDatabaseUpgrader>(db);
50     return upgrader->Upgrade();
51 }
52 
RegisterFunction(sqlite3 * db) const53 int SQLiteSingleRelationalStorageEngine::RegisterFunction(sqlite3 *db) const
54 {
55     int errCode = SQLiteUtils::RegisterCalcHash(db);
56     if (errCode != E_OK) {
57         LOGE("[engine] register calculate hash failed!");
58         return errCode;
59     }
60 
61     errCode = SQLiteUtils::RegisterGetLastTime(db);
62     if (errCode != E_OK) {
63         LOGE("[engine] register get last time failed!");
64         return errCode;
65     }
66 
67     errCode = SQLiteUtils::RegisterGetSysTime(db);
68     if (errCode != E_OK) {
69         LOGE("[engine] register get sys time failed!");
70         return errCode;
71     }
72 
73     errCode = SQLiteUtils::RegisterGetRawSysTime(db);
74     if (errCode != E_OK) {
75         LOGE("[engine] register get raw sys time failed!");
76         return errCode;
77     }
78 
79     errCode = SQLiteUtils::RegisterCloudDataChangeObserver(db);
80     if (errCode != E_OK) {
81         LOGE("[engine] register cloud observer failed!");
82     }
83 
84     errCode = SQLiteUtils::RegisterCloudDataChangeServerObserver(db);
85     if (errCode != E_OK) {
86         LOGE("[engine] register cloud server observer failed!");
87     }
88 
89     return errCode;
90 }
91 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)92 int SQLiteSingleRelationalStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
93 {
94     sqlite3 *db = nullptr;
95     int errCode = SQLiteUtils::OpenDatabase(option_, db, false);
96     if (errCode != E_OK) {
97         return errCode;
98     }
99     do {
100         errCode = Upgrade(db); // create meta_data table.
101         if (errCode != E_OK) {
102             break;
103         }
104 
105         errCode = RegisterFunction(db);
106         if (errCode != E_OK) {
107             break;
108         }
109 
110         handle = NewSQLiteStorageExecutor(db, isWrite, false);
111         if (handle == nullptr) {
112             LOGE("[Relational] New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
113             errCode = -E_OUT_OF_MEMORY;
114             break;
115         }
116         return E_OK;
117     } while (false);
118 
119     (void)sqlite3_close_v2(db);
120     db = nullptr;
121     return errCode;
122 }
123 
ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isExternal)124 void SQLiteSingleRelationalStorageEngine::ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor *&handle,
125     bool isExternal)
126 {
127     if (handle == nullptr) {
128         return;
129     }
130     StorageExecutor *databaseHandle = handle;
131     Recycle(databaseHandle, isExternal);
132     handle = nullptr;
133 }
134 
SetSchema(const RelationalSchemaObject & schema)135 void SQLiteSingleRelationalStorageEngine::SetSchema(const RelationalSchemaObject &schema)
136 {
137     std::lock_guard lock(schemaMutex_);
138     schema_ = schema;
139 }
140 
GetSchema() const141 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetSchema() const
142 {
143     std::lock_guard lock(schemaMutex_);
144     return schema_;
145 }
146 
147 namespace {
148 const std::string DEVICE_TYPE = "device";
149 const std::string CLOUD_TYPE = "cloud";
150 const std::string SYNC_TABLE_TYPE = "sync_table_type_";
151 
SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)152 int SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle, const RelationalSchemaObject &schema)
153 {
154     const Key schemaKey(DBConstant::RELATIONAL_SCHEMA_KEY.begin(), DBConstant::RELATIONAL_SCHEMA_KEY.end());
155     Value schemaVal;
156     DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
157     int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
158     if (errCode != E_OK) {
159         LOGE("Save schema to meta table failed. %d", errCode);
160     }
161     return errCode;
162 }
163 
SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)164 int SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle,
165     const RelationalSchemaObject &schema)
166 {
167     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
168         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
169     Value schemaVal;
170     DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
171     int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
172     if (errCode != E_OK) {
173         LOGE("Save schema to meta table failed. %d", errCode);
174     }
175     return errCode;
176 }
177 
SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,TableSyncType syncType)178 int SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
179     TableSyncType syncType)
180 {
181     Key key;
182     DBCommon::StringToVector(SYNC_TABLE_TYPE + tableName, key);
183     Value value;
184     DBCommon::StringToVector(syncType == DEVICE_COOPERATION ? DEVICE_TYPE : CLOUD_TYPE, value);
185     int errCode = handle->PutKvData(key, value);
186     if (errCode != E_OK) {
187         LOGE("Save sync table type to meta table failed. %d", errCode);
188         return errCode;
189     }
190     DBCommon::StringToVector(DBConstant::TABLE_IS_DROPPED + tableName, key);
191     errCode = handle->DeleteMetaData({ key });
192     if (errCode != E_OK) {
193         LOGE("Save table drop flag to meta table failed. %d", errCode);
194     }
195     return errCode;
196 }
197 }
198 
CreateDistributedTable(const std::string & tableName,const std::string & identity,bool & schemaChanged,TableSyncType syncType,bool trackerSchemaChanged)199 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName,
200     const std::string &identity, bool &schemaChanged, TableSyncType syncType, bool trackerSchemaChanged)
201 {
202     std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
203     RelationalSchemaObject schema = GetSchema();
204     bool isUpgraded = false;
205     if (DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
206         LOGI("distributed table bas been created.");
207         if (schema.GetTable(tableName).GetTableSyncType() != syncType) {
208             LOGE("table sync type mismatch.");
209             return -E_TYPE_MISMATCH;
210         }
211         isUpgraded = true;
212         int errCode = UpgradeDistributedTable(tableName, schemaChanged, syncType);
213         if (errCode != E_OK) {
214             LOGE("Upgrade distributed table failed. %d", errCode);
215             return errCode;
216         }
217         // Triggers may need to be rebuilt, no return directly
218     } else if (schema.GetTables().size() >= DBConstant::MAX_DISTRIBUTED_TABLE_COUNT) {
219         LOGE("The number of distributed tables is exceeds limit.");
220         return -E_MAX_LIMITS;
221     } else {
222         schemaChanged = true;
223     }
224 
225     int errCode = CreateDistributedTable(tableName, isUpgraded, identity, schema, syncType);
226     if (errCode != E_OK) {
227         LOGE("CreateDistributedTable failed. %d", errCode);
228         return errCode;
229     }
230     if (isUpgraded && (schemaChanged || trackerSchemaChanged)) {
231         // Used for upgrading the stock data of the trackerTable
232         errCode = GenLogInfoForUpgrade(tableName, schema, schemaChanged);
233     }
234     return errCode;
235 }
236 
CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::string & tableName,const std::string & sharedTableName,TableSyncType tableSyncType,RelationalSchemaObject & schema)237 int SQLiteSingleRelationalStorageEngine::CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
238     const std::string &tableName, const std::string &sharedTableName, TableSyncType tableSyncType,
239     RelationalSchemaObject &schema)
240 {
241     TableInfo table;
242     table.SetTableName(sharedTableName);
243     table.SetOriginTableName(tableName);
244     table.SetSharedTableMark(true);
245     table.SetTableSyncType(tableSyncType);
246     table.SetTrackerTable(trackerSchema_.GetTrackerTable(sharedTableName));
247     if (!table.GetTrackerTable().IsEmpty() && tableSyncType == TableSyncType::DEVICE_COOPERATION) { // LCOV_EXCL_BR_LINE
248         LOGE("current is trackerTable, not support creating device distributed table.");
249         return -E_NOT_SUPPORT;
250     }
251     bool isUpgraded = schema.GetTable(sharedTableName).GetTableName() == sharedTableName;
252     int errCode = CreateDistributedTable(handle, isUpgraded, "", table, schema);
253     if (errCode != E_OK) {
254         LOGE("create distributed table failed. %d", errCode);
255         return errCode;
256     }
257     std::lock_guard lock(schemaMutex_);
258     schema_ = schema;
259     return errCode;
260 }
261 
CreateDistributedTable(const std::string & tableName,bool isUpgraded,const std::string & identity,RelationalSchemaObject & schema,TableSyncType tableSyncType)262 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName, bool isUpgraded,
263     const std::string &identity, RelationalSchemaObject &schema, TableSyncType tableSyncType)
264 {
265     LOGD("Create distributed table.");
266     int errCode = E_OK;
267     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
268         errCode));
269     if (handle == nullptr) {
270         return errCode;
271     }
272     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
273 
274     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
275     if (errCode != E_OK) {
276         return errCode;
277     }
278 
279     TableInfo table;
280     table.SetTableName(tableName);
281     table.SetTableSyncType(tableSyncType);
282     table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(tableName));
283     table.SetDistributedTable(schema.GetDistributedTable(tableName));
284     if (isUpgraded) {
285         table.SetSourceTableReference(schema.GetTable(tableName).GetTableReference());
286     }
287     errCode = CreateDistributedTable(handle, isUpgraded, identity, table, schema);
288     if (errCode != E_OK) {
289         LOGE("create distributed table failed. %d", errCode);
290         (void)handle->Rollback();
291         return errCode;
292     }
293     errCode = handle->Commit();
294     if (errCode == E_OK) {
295         SetSchema(schema);
296     }
297     return errCode;
298 }
299 
CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isUpgraded,const std::string & identity,TableInfo & table,RelationalSchemaObject & schema)300 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
301     bool isUpgraded, const std::string &identity, TableInfo &table, RelationalSchemaObject &schema)
302 {
303     auto mode = properties_.GetDistributedTableMode();
304     TableSyncType tableSyncType = table.GetTableSyncType();
305     std::string tableName = table.GetTableName();
306     int errCode = handle->InitCursorToMeta(tableName);
307     if (errCode != E_OK) {
308         LOGE("init cursor to meta failed. %d", errCode);
309         return errCode;
310     }
311     errCode = handle->CreateDistributedTable(mode, isUpgraded, identity, table);
312     if (errCode != E_OK) {
313         LOGE("create distributed table failed. %d", errCode);
314         return errCode;
315     }
316 
317     schema.SetTableMode(mode);
318     // update table if tableName changed
319     schema.RemoveRelationalTable(tableName);
320     schema.AddRelationalTable(table);
321     errCode = SaveSchemaToMetaTable(handle, schema);
322     if (errCode != E_OK) {
323         LOGE("Save schema to meta table for create distributed table failed. %d", errCode);
324         return errCode;
325     }
326 
327     errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, tableName, tableSyncType);
328     if (errCode != E_OK) {
329         LOGE("Save sync table type or drop flag to meta table failed. %d", errCode);
330     }
331     return errCode;
332 }
333 
UpgradeDistributedTable(const std::string & tableName,bool & schemaChanged,TableSyncType syncType)334 int SQLiteSingleRelationalStorageEngine::UpgradeDistributedTable(const std::string &tableName, bool &schemaChanged,
335     TableSyncType syncType)
336 {
337     LOGD("Upgrade distributed table.");
338     RelationalSchemaObject schema = GetSchema();
339     int errCode = E_OK;
340     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
341         errCode));
342     if (handle == nullptr) {
343         return errCode;
344     }
345 
346     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
347     if (errCode != E_OK) {
348         ReleaseExecutor(handle);
349         return errCode;
350     }
351 
352     auto mode = properties_.GetDistributedTableMode();
353     errCode = handle->UpgradeDistributedTable(tableName, mode, schemaChanged, schema, syncType);
354     if (errCode != E_OK) {
355         LOGE("Upgrade distributed table failed. %d", errCode);
356         (void)handle->Rollback();
357         ReleaseExecutor(handle);
358         return errCode;
359     }
360 
361     errCode = SaveSchemaToMetaTable(handle, schema);
362         if (errCode != E_OK) {
363         LOGE("Save schema to meta table for upgrade distributed table failed. %d", errCode);
364         (void)handle->Rollback();
365         ReleaseExecutor(handle);
366         return errCode;
367     }
368 
369     errCode = handle->Commit();
370     if (errCode == E_OK) {
371         SetSchema(schema);
372     }
373     ReleaseExecutor(handle);
374     return errCode;
375 }
376 
CleanDistributedDeviceTable(std::vector<std::string> & missingTables)377 int SQLiteSingleRelationalStorageEngine::CleanDistributedDeviceTable(std::vector<std::string> &missingTables)
378 {
379     int errCode = E_OK;
380     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
381         errCode));
382     if (handle == nullptr) {
383         return errCode;
384     }
385 
386     // go fast to check missing tables without transaction
387     errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
388     if (errCode == E_OK) {
389         if (missingTables.empty()) {
390             LOGI("Check missing distributed table is empty.");
391             ReleaseExecutor(handle);
392             return errCode;
393         }
394     } else {
395         LOGE("Get missing distributed table failed. %d", errCode);
396         ReleaseExecutor(handle);
397         return errCode;
398     }
399     missingTables.clear();
400 
401     std::lock_guard lock(schemaMutex_);
402     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
403     if (errCode != E_OK) {
404         ReleaseExecutor(handle);
405         return errCode;
406     }
407 
408     errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
409     if (errCode == E_OK) {
410         // Remove non-existent tables from the schema
411         for (const auto &tableName : missingTables) {
412             schema_.RemoveRelationalTable(tableName);
413         }
414         errCode = SaveSchemaToMetaTable(handle, schema_); // save schema to meta_data
415         if (errCode != E_OK) {
416             LOGE("Save schema to metaTable failed. %d", errCode);
417             (void)handle->Rollback();
418         } else {
419             errCode = handle->Commit();
420         }
421     } else {
422         LOGE("Check distributed table failed. %d", errCode);
423         (void)handle->Rollback();
424     }
425     ReleaseExecutor(handle);
426     return errCode;
427 }
428 
GetProperties() const429 const RelationalDBProperties &SQLiteSingleRelationalStorageEngine::GetProperties() const
430 {
431     return properties_;
432 }
433 
SetProperties(const RelationalDBProperties & properties)434 void SQLiteSingleRelationalStorageEngine::SetProperties(const RelationalDBProperties &properties)
435 {
436     properties_ = properties;
437 }
438 
CreateRelationalMetaTable(sqlite3 * db)439 int SQLiteSingleRelationalStorageEngine::CreateRelationalMetaTable(sqlite3 *db)
440 {
441     std::string sql =
442         "CREATE TABLE IF NOT EXISTS " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata(" \
443         "key    BLOB PRIMARY KEY NOT NULL," \
444         "value  BLOB);";
445     int errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
446     if (errCode != E_OK) {
447         LOGE("[SQLite] execute create table sql failed, err=%d", errCode);
448     }
449     return errCode;
450 }
451 
SetTrackerTable(const TrackerSchema & schema,const TableInfo & tableInfo,bool isFirstCreate)452 int SQLiteSingleRelationalStorageEngine::SetTrackerTable(const TrackerSchema &schema, const TableInfo &tableInfo,
453     bool isFirstCreate)
454 {
455     int errCode = E_OK;
456     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
457         errCode));
458     if (handle == nullptr) {
459         return errCode;
460     }
461 
462     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
463     if (errCode != E_OK) {
464         ReleaseExecutor(handle);
465         return errCode;
466     }
467     RelationalSchemaObject tracker = trackerSchema_;
468     tracker.InsertTrackerSchema(schema);
469     int ret = handle->CreateTrackerTable(tracker.GetTrackerTable(schema.tableName), tableInfo, isFirstCreate);
470     if (ret != E_OK && ret != -E_WITH_INVENTORY_DATA) {
471         (void)handle->Rollback();
472         ReleaseExecutor(handle);
473         return ret;
474     }
475 
476     if (schema.trackerColNames.empty() && !schema.isTrackAction) {
477         tracker.RemoveTrackerSchema(schema);
478     }
479     errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
480     if (errCode != E_OK) {
481         (void)handle->Rollback();
482         ReleaseExecutor(handle);
483         return errCode;
484     }
485 
486     errCode = handle->Commit();
487     if (errCode != E_OK) {
488         ReleaseExecutor(handle);
489         return errCode;
490     }
491 
492     trackerSchema_ = tracker;
493     ReleaseExecutor(handle);
494     return ret;
495 }
496 
CacheTrackerSchema(const TrackerSchema & schema)497 void SQLiteSingleRelationalStorageEngine::CacheTrackerSchema(const TrackerSchema &schema)
498 {
499     trackerSchema_.InsertTrackerSchema(schema);
500     if (!schema.isTrackAction && schema.trackerColNames.empty()) {
501         // if isTrackAction be false and trackerColNames is empty, will remove the tracker schema.
502         trackerSchema_.RemoveTrackerSchema(schema);
503     }
504 }
505 
GetOrInitTrackerSchemaFromMeta()506 int SQLiteSingleRelationalStorageEngine::GetOrInitTrackerSchemaFromMeta()
507 {
508     RelationalSchemaObject trackerSchema;
509     int errCode = E_OK;
510     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
511         errCode));
512     if (handle == nullptr) {
513         return errCode;
514     }
515     errCode = handle->GetOrInitTrackerSchemaFromMeta(trackerSchema);
516     if (errCode != E_OK) {
517         ReleaseExecutor(handle);
518         return errCode;
519     }
520     const TableInfoMap tableInfoMap = trackerSchema.GetTrackerTables();
521     for (const auto &iter: tableInfoMap) {
522         TableInfo tableInfo;
523         errCode = handle->AnalysisTrackerTable(iter.second.GetTrackerTable(), tableInfo);
524         if (errCode == -E_NOT_FOUND) { // LCOV_EXCL_BR_LINE
525             const std::string trackerTableName = iter.second.GetTrackerTable().GetTableName();
526             errCode = CleanTrackerDeviceTable({ trackerTableName }, trackerSchema, handle);
527             if (errCode != E_OK) {
528                 LOGE("cancel tracker table failed during db opening. %d", errCode);
529                 ReleaseExecutor(handle);
530                 return errCode;
531             }
532         } else if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
533             LOGE("the tracker schema does not match the tracker schema. %d", errCode);
534             ReleaseExecutor(handle);
535             return errCode;
536         }
537     }
538     trackerSchema_ = trackerSchema;
539     ReleaseExecutor(handle);
540     return E_OK;
541 }
542 
SaveTrackerSchema(const std::string & tableName,bool isFirstCreate)543 int SQLiteSingleRelationalStorageEngine::SaveTrackerSchema(const std::string &tableName, bool isFirstCreate)
544 {
545     int errCode = E_OK;
546     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
547         errCode));
548     if (handle == nullptr) {
549         return errCode;
550     }
551     RelationalSchemaObject tracker = trackerSchema_;
552     errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
553     if (errCode != E_OK || !isFirstCreate) {
554         ReleaseExecutor(handle);
555         return errCode;
556     }
557     errCode = handle->CheckInventoryData(DBCommon::GetLogTableName(tableName));
558     ReleaseExecutor(handle);
559     return errCode;
560 }
561 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)562 int SQLiteSingleRelationalStorageEngine::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
563 {
564     int errCode = E_OK;
565     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(!condition.readOnly,
566         OperatePerm::NORMAL_PERM, errCode, true));
567     if (handle == nullptr) {
568         return errCode;
569     }
570     errCode = handle->ExecuteSql(condition, records);
571     ReleaseExecutor(handle, true);
572     return errCode;
573 }
574 
CheckReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema)575 static int CheckReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
576     const RelationalSchemaObject &schema)
577 {
578     for (const auto &reference : tableReferenceProperty) {
579         TableInfo sourceTableInfo = schema.GetTable(reference.sourceTableName);
580         TableInfo targetTableInfo = schema.GetTable(reference.targetTableName);
581         if (strcasecmp(sourceTableInfo.GetTableName().c_str(), reference.sourceTableName.c_str()) != 0 ||
582             strcasecmp(targetTableInfo.GetTableName().c_str(), reference.targetTableName.c_str()) != 0) {
583             LOGE("can't set reference for table which doesn't create distributed table.");
584             return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
585         }
586         if (sourceTableInfo.GetTableSyncType() != CLOUD_COOPERATION ||
587             targetTableInfo.GetTableSyncType() != CLOUD_COOPERATION) {
588             LOGE("can't set reference for table which doesn't create distributed table with cloud mode.");
589             return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
590         }
591         if (sourceTableInfo.GetSharedTableMark() || targetTableInfo.GetSharedTableMark()) {
592             LOGE("can't set reference for shared table.");
593             return -E_NOT_SUPPORT;
594         }
595 
596         FieldInfoMap sourceFieldMap = sourceTableInfo.GetFields();
597         FieldInfoMap targetFieldMap = targetTableInfo.GetFields();
598         for (const auto &[sourceFieldName, targetFieldName] : reference.columns) {
599             if (sourceFieldMap.find(sourceFieldName) == sourceFieldMap.end() ||
600                 targetFieldMap.find(targetFieldName) == targetFieldMap.end()) {
601                 LOGE("reference field doesn't exists in table.");
602                 return -E_INVALID_ARGS;
603             }
604         }
605     }
606     return E_OK;
607 }
608 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,SQLiteSingleVerRelationalStorageExecutor * handle,std::set<std::string> & clearWaterMarkTables,RelationalSchemaObject & schema)609 int SQLiteSingleRelationalStorageEngine::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
610     SQLiteSingleVerRelationalStorageExecutor *handle, std::set<std::string> &clearWaterMarkTables,
611     RelationalSchemaObject &schema)
612 {
613     std::lock_guard lock(schemaMutex_);
614     schema = schema_;
615     int errCode = CheckReference(tableReferenceProperty, schema);
616     if (errCode != E_OK) {
617         LOGE("check reference failed, errCode = %d.", errCode);
618         return errCode;
619     }
620 
621     errCode = handle->GetClearWaterMarkTables(tableReferenceProperty, schema, clearWaterMarkTables);
622     if (errCode != E_OK) {
623         return errCode;
624     }
625     schema.SetReferenceProperty(tableReferenceProperty);
626     errCode = SaveSchemaToMetaTable(handle, schema);
627     if (errCode != E_OK) {
628         LOGE("Save schema to meta table for reference failed. %d", errCode);
629         return errCode;
630     }
631     auto mode = properties_.GetDistributedTableMode();
632     for (auto &table : schema.GetTables()) {
633         if (table.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
634             continue;
635         }
636         TableInfo tableInfo = table.second;
637         errCode = handle->RenewTableTrigger(mode, tableInfo, TableSyncType::CLOUD_COOPERATION);
638         if (errCode != E_OK) {
639             LOGE("renew table trigger for reference failed. %d", errCode);
640             return errCode;
641         }
642     }
643     return clearWaterMarkTables.empty() ? E_OK : -E_TABLE_REFERENCE_CHANGED;
644 }
645 
GetTrackerSchema() const646 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetTrackerSchema() const
647 {
648     return trackerSchema_;
649 }
650 
CleanTrackerData(const std::string & tableName,int64_t cursor)651 int SQLiteSingleRelationalStorageEngine::CleanTrackerData(const std::string &tableName, int64_t cursor)
652 {
653     int errCode = E_OK;
654     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
655         errCode));
656     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
657         return errCode;
658     }
659     TrackerTable trackerTable = GetTrackerSchema().GetTrackerTable(tableName);
660     bool isOnlyTrackTable = false;
661     RelationalSchemaObject schema = GetSchema();
662     if (!trackerTable.IsTableNameEmpty() &&
663         !DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
664         isOnlyTrackTable = true;
665     }
666     errCode = handle->CleanTrackerData(tableName, cursor, isOnlyTrackTable);
667     ReleaseExecutor(handle);
668     return errCode;
669 }
670 
UpgradeSharedTable(const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)671 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTable(const DataBaseSchema &cloudSchema,
672     const std::vector<std::string> &deleteTableNames, const std::map<std::string, std::vector<Field>> &updateTableNames,
673     const std::map<std::string, std::string> &alterTableNames)
674 {
675     int errCode = E_OK;
676     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
677         errCode));
678     if (handle == nullptr) {
679         return errCode;
680     }
681     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
682     if (errCode != E_OK) {
683         ReleaseExecutor(handle);
684         return errCode;
685     }
686     RelationalSchemaObject schema = GetSchema();
687     errCode = UpgradeSharedTableInner(handle, cloudSchema, deleteTableNames, updateTableNames, alterTableNames);
688     if (errCode != E_OK) {
689         handle->Rollback();
690         ReleaseExecutor(handle);
691         return errCode;
692     }
693     errCode = handle->Commit();
694     if (errCode != E_OK) {
695         std::lock_guard lock(schemaMutex_);
696         schema_ = schema; // revert schema to the initial state
697     }
698     ReleaseExecutor(handle);
699     return errCode;
700 }
701 
UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)702 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
703     const DataBaseSchema &cloudSchema, const std::vector<std::string> &deleteTableNames,
704     const std::map<std::string, std::vector<Field>> &updateTableNames,
705     const std::map<std::string, std::string> &alterTableNames)
706 {
707     RelationalSchemaObject schema = GetSchema();
708     int errCode = DoDeleteSharedTable(handle, deleteTableNames, schema);
709     if (errCode != E_OK) {
710         LOGE("[RelationalStorageEngine] delete shared table or distributed table failed. %d", errCode);
711         return errCode;
712     }
713     errCode = DoUpdateSharedTable(handle, updateTableNames, cloudSchema, schema);
714     if (errCode != E_OK) {
715         LOGE("[RelationalStorageEngine] update shared table or distributed table failed. %d", errCode);
716         return errCode;
717     }
718     errCode = CheckIfExistUserTable(handle, cloudSchema, alterTableNames, schema);
719     if (errCode != E_OK) {
720         LOGE("[RelationalStorageEngine] check local user table failed. %d", errCode);
721         return errCode;
722     }
723     errCode = DoAlterSharedTableName(handle, alterTableNames, schema);
724     if (errCode != E_OK) {
725         LOGE("[RelationalStorageEngine] alter shared table or distributed table failed. %d", errCode);
726         return errCode;
727     }
728     errCode = DoCreateSharedTable(handle, cloudSchema, updateTableNames, alterTableNames, schema);
729     if (errCode != E_OK) {
730         LOGE("[RelationalStorageEngine] create shared table or distributed table failed. %d", errCode);
731         return errCode;
732     }
733     std::lock_guard lock(schemaMutex_);
734     schema_ = schema;
735     return E_OK;
736 }
737 
DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::vector<std::string> & deleteTableNames,RelationalSchemaObject & schema)738 int SQLiteSingleRelationalStorageEngine::DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
739     const std::vector<std::string> &deleteTableNames, RelationalSchemaObject &schema)
740 {
741     if (deleteTableNames.empty()) {
742         return E_OK;
743     }
744     int errCode = handle->DeleteTable(deleteTableNames);
745     if (errCode != E_OK) {
746         LOGE("[RelationalStorageEngine] delete shared table failed. %d", errCode);
747         return errCode;
748     }
749     std::vector<Key> keys;
750     for (const auto &tableName : deleteTableNames) {
751         errCode = handle->CleanResourceForDroppedTable(tableName);
752         if (errCode != E_OK) {
753             LOGE("[RelationalStorageEngine] delete shared distributed table failed. %d", errCode);
754             return errCode;
755         }
756         Key sharedTableKey = DBCommon::GetPrefixTableName(tableName);
757         if (sharedTableKey.empty() || sharedTableKey.size() > DBConstant::MAX_KEY_SIZE) {
758             LOGE("[RelationalStorageEngine] shared table key is invalid.");
759             return -E_INVALID_ARGS;
760         }
761         keys.push_back(sharedTableKey);
762         schema.RemoveRelationalTable(tableName);
763     }
764     errCode = handle->DeleteMetaData(keys);
765     if (errCode != E_OK) {
766         LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
767     }
768     return errCode;
769 }
770 
DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::vector<Field>> & updateTableNames,const DataBaseSchema & cloudSchema,RelationalSchemaObject & localSchema)771 int SQLiteSingleRelationalStorageEngine::DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
772     const std::map<std::string, std::vector<Field>> &updateTableNames, const DataBaseSchema &cloudSchema,
773     RelationalSchemaObject &localSchema)
774 {
775     if (updateTableNames.empty()) {
776         return E_OK;
777     }
778     int errCode = handle->UpdateSharedTable(updateTableNames);
779     if (errCode != E_OK) {
780         LOGE("[RelationalStorageEngine] update shared table failed. %d", errCode);
781         return errCode;
782     }
783     for (const auto &tableSchema : cloudSchema.tables) {
784         if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
785             errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
786                 TableSyncType::CLOUD_COOPERATION, localSchema);
787             if (errCode != E_OK) {
788                 LOGE("[RelationalStorageEngine] update shared distributed table failed. %d", errCode);
789                 return errCode;
790             }
791         }
792     }
793     return E_OK;
794 }
795 
DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)796 int SQLiteSingleRelationalStorageEngine::DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor *&handle,
797     const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
798 {
799     if (alterTableNames.empty()) {
800         return E_OK;
801     }
802     int errCode = handle->AlterTableName(alterTableNames);
803     if (errCode != E_OK) {
804         LOGE("[RelationalStorageEngine] alter shared table failed. %d", errCode);
805         return errCode;
806     }
807     std::map<std::string, std::string> distributedSharedTableNames;
808     for (const auto &tableName : alterTableNames) {
809         errCode = handle->DeleteTableTrigger(tableName.first);
810         if (errCode != E_OK) {
811             LOGE("[RelationalStorageEngine] delete shared table trigger failed. %d", errCode);
812             return errCode;
813         }
814         std::string oldDistributedName = DBCommon::GetLogTableName(tableName.first);
815         std::string newDistributedName = DBCommon::GetLogTableName(tableName.second);
816         distributedSharedTableNames[oldDistributedName] = newDistributedName;
817     }
818     errCode = handle->AlterTableName(distributedSharedTableNames);
819     if (errCode != E_OK) {
820         LOGE("[RelationalStorageEngine] alter distributed shared table failed. %d", errCode);
821         return errCode;
822     }
823     for (const auto &[oldTableName, newTableName] : alterTableNames) {
824         TableInfo tableInfo = schema.GetTable(oldTableName);
825         tableInfo.SetTableName(newTableName);
826         schema.AddRelationalTable(tableInfo);
827         schema.RemoveRelationalTable(oldTableName);
828     }
829     errCode = UpdateKvData(handle, alterTableNames);
830     if (errCode != E_OK) {
831         LOGE("[RelationalStorageEngine] update kv data failed. %d", errCode);
832     }
833     return errCode;
834 }
835 
DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)836 int SQLiteSingleRelationalStorageEngine::DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
837     const DataBaseSchema &cloudSchema, const std::map<std::string, std::vector<Field>> &updateTableNames,
838     const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
839 {
840     for (auto const &tableSchema : cloudSchema.tables) {
841         if (tableSchema.sharedTableName.empty()) {
842             continue;
843         }
844         if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
845             continue;
846         }
847         bool isUpdated = false;
848         for (const auto &alterTableName : alterTableNames) {
849             if (alterTableName.second == tableSchema.sharedTableName) {
850                 isUpdated = true;
851                 break;
852             }
853         }
854         if (isUpdated) {
855             continue;
856         }
857         int errCode = handle->CreateSharedTable(tableSchema);
858         if (errCode != E_OK) {
859             return errCode;
860         }
861         errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
862             TableSyncType::CLOUD_COOPERATION, schema);
863         if (errCode != E_OK) {
864             return errCode;
865         }
866     }
867     return E_OK;
868 }
869 
UpdateKvData(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames)870 int SQLiteSingleRelationalStorageEngine::UpdateKvData(SQLiteSingleVerRelationalStorageExecutor *&handle,
871     const std::map<std::string, std::string> &alterTableNames)
872 {
873     std::vector<Key> keys;
874     for (const auto &tableName : alterTableNames) {
875         Key oldKey = DBCommon::GetPrefixTableName(tableName.first);
876         Value value;
877         int ret = handle->GetKvData(oldKey, value);
878         if (ret == -E_NOT_FOUND) {
879             continue;
880         }
881         if (ret != E_OK) {
882             LOGE("[RelationalStorageEngine] get meta data failed. %d", ret);
883             return ret;
884         }
885         keys.push_back(oldKey);
886         Key newKey = DBCommon::GetPrefixTableName(tableName.second);
887         ret = handle->PutKvData(newKey, value);
888         if (ret != E_OK) {
889             LOGE("[RelationalStorageEngine] put meta data failed. %d", ret);
890             return ret;
891         }
892     }
893     int errCode = handle->DeleteMetaData(keys);
894     if (errCode != E_OK) {
895         LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
896     }
897     return errCode;
898 }
899 
CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::string> & alterTableNames,const RelationalSchemaObject & schema)900 int SQLiteSingleRelationalStorageEngine::CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
901     const DataBaseSchema &cloudSchema, const std::map<std::string, std::string> &alterTableNames,
902     const RelationalSchemaObject &schema)
903 {
904     for (const auto &tableSchema : cloudSchema.tables) {
905         if (alterTableNames.find(tableSchema.sharedTableName) != alterTableNames.end()) {
906             continue;
907         }
908         TableInfo tableInfo = schema.GetTable(tableSchema.sharedTableName);
909         if (tableInfo.GetSharedTableMark()) {
910             continue;
911         }
912         int errCode = handle->CheckIfExistUserTable(tableSchema.sharedTableName);
913         if (errCode != E_OK) {
914             LOGE("[RelationalStorageEngine] local exists table. %d", errCode);
915             return errCode;
916         }
917     }
918     return E_OK;
919 }
920 
CalTableRef(const std::vector<std::string> & tableNames,const std::map<std::string,std::string> & sharedTableOriginNames)921 std::pair<std::vector<std::string>, int> SQLiteSingleRelationalStorageEngine::CalTableRef(
922     const std::vector<std::string> &tableNames, const std::map<std::string, std::string> &sharedTableOriginNames)
923 {
924     std::pair<std::vector<std::string>, int> res = { tableNames, E_OK };
925     std::map<std::string, std::map<std::string, bool>> reachableReference;
926     std::map<std::string, int> tableWeight;
927     {
928         std::lock_guard lock(schemaMutex_);
929         reachableReference = schema_.GetReachableRef();
930         tableWeight = schema_.GetTableWeight();
931     }
932     if (reachableReference.empty()) {
933         return res;
934     }
935     auto reachableWithShared = GetReachableWithShared(reachableReference, sharedTableOriginNames);
936     // check dependency conflict
937     for (size_t i = 0; i < tableNames.size(); ++i) {
938         for (size_t j = i + 1; j < tableNames.size(); ++j) {
939             // such as table A B, if dependency is A->B
940             // sync should not be A->B, it should be B->A
941             // so if A can reach B, it's wrong
942             if (reachableWithShared[tableNames[i]][tableNames[j]]) {
943                 LOGE("[RDBStorageEngine] table %zu reach table %zu", i, j);
944                 res.second = -E_INVALID_ARGS;
945                 return res;
946             }
947         }
948     }
949     tableWeight = GetTableWeightWithShared(tableWeight, sharedTableOriginNames);
950     auto actualTable = DBCommon::GenerateNodesByNodeWeight(tableNames, reachableWithShared, tableWeight);
951     res.first.assign(actualTable.begin(), actualTable.end());
952     return res;
953 }
954 
CleanTrackerDeviceTable(const std::vector<std::string> & tableNames,RelationalSchemaObject & trackerSchemaObj,SQLiteSingleVerRelationalStorageExecutor * & handle)955 int SQLiteSingleRelationalStorageEngine::CleanTrackerDeviceTable(const std::vector<std::string> &tableNames,
956     RelationalSchemaObject &trackerSchemaObj, SQLiteSingleVerRelationalStorageExecutor *&handle)
957 {
958     std::vector<std::string> missingTrackerTables;
959     int errCode = handle->CheckAndCleanDistributedTable(tableNames, missingTrackerTables);
960     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
961         LOGE("Check tracker table failed. %d", errCode);
962         return errCode;
963     }
964     if (missingTrackerTables.empty()) { // LCOV_EXCL_BR_LINE
965         return E_OK;
966     }
967     for (const auto &tableName : missingTrackerTables) {
968         TrackerSchema schema;
969         schema.tableName = tableName;
970         trackerSchemaObj.RemoveTrackerSchema(schema);
971     }
972     errCode = SaveTrackerSchemaToMetaTable(handle, trackerSchemaObj); // save schema to meta_data
973     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
974         LOGE("Save tracker schema to metaTable failed. %d", errCode);
975     }
976     return errCode;
977 }
978 
GenLogInfoForUpgrade(const std::string & tableName,RelationalSchemaObject & schema,bool schemaChanged)979 int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string &tableName,
980     RelationalSchemaObject &schema, bool schemaChanged)
981 {
982     int errCode = E_OK;
983     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
984         OperatePerm::NORMAL_PERM, errCode));
985     if (handle == nullptr) {
986         return errCode;
987     }
988     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
989 
990     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
991     if (errCode != E_OK) {
992         return errCode;
993     }
994 
995     TableInfo table = GetSchema().GetTable(tableName);
996     errCode = handle->UpgradedLogForExistedData(table, schemaChanged);
997     if (errCode != E_OK) {
998         LOGE("Upgrade tracker table log failed. %d", errCode);
999         (void)handle->Rollback();
1000         return errCode;
1001     }
1002     return handle->Commit();
1003 }
1004 
GetReachableWithShared(const std::map<std::string,std::map<std::string,bool>> & reachableReference,const std::map<std::string,std::string> & tableToShared)1005 std::map<std::string, std::map<std::string, bool>> SQLiteSingleRelationalStorageEngine::GetReachableWithShared(
1006     const std::map<std::string, std::map<std::string, bool>> &reachableReference,
1007     const std::map<std::string, std::string> &tableToShared)
1008 {
1009     // we translate all origin table to shared table
1010     std::map<std::string, std::map<std::string, bool>> reachableWithShared;
1011     for (const auto &[source, reach] : reachableReference) {
1012         bool sourceHasNoShared = tableToShared.find(source) == tableToShared.end();
1013         for (const auto &[target, isReach] : reach) {
1014             // merge two reachable reference
1015             reachableWithShared[source][target] = isReach;
1016             if (sourceHasNoShared || tableToShared.find(target) == tableToShared.end()) {
1017                 continue;
1018             }
1019             // record shared reachable reference
1020             reachableWithShared[tableToShared.at(source)][tableToShared.at(target)] = isReach;
1021         }
1022     }
1023     return reachableWithShared;
1024 }
1025 
GetTableWeightWithShared(const std::map<std::string,int> & tableWeight,const std::map<std::string,std::string> & tableToShared)1026 std::map<std::string, int> SQLiteSingleRelationalStorageEngine::GetTableWeightWithShared(
1027     const std::map<std::string, int> &tableWeight, const std::map<std::string, std::string> &tableToShared)
1028 {
1029     std::map<std::string, int> res;
1030     for (const auto &[table, weight] : tableWeight) {
1031         res[table] = weight;
1032         if (tableToShared.find(table) == tableToShared.end()) {
1033             continue;
1034         }
1035         res[tableToShared.at(table)] = weight;
1036     }
1037     return res;
1038 }
1039 
UpdateExtendField(const DistributedDB::TrackerSchema & schema)1040 int SQLiteSingleRelationalStorageEngine::UpdateExtendField(const DistributedDB::TrackerSchema &schema)
1041 {
1042     if (schema.extendColNames.empty()) {
1043         return E_OK;
1044     }
1045     int errCode = E_OK;
1046     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
1047         OperatePerm::NORMAL_PERM, errCode));
1048     if (handle == nullptr) {
1049         return errCode;
1050     }
1051     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
1052 
1053     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1054     if (errCode != E_OK) {
1055         return errCode;
1056     }
1057 
1058     errCode = handle->UpdateExtendField(schema.tableName, schema.extendColNames);
1059     if (errCode != E_OK) {
1060         LOGE("[%s [%zu]] Update extend field failed. %d",
1061             DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1062         (void)handle->Rollback();
1063         return errCode;
1064     }
1065 
1066     RelationalSchemaObject tracker = trackerSchema_;
1067     TrackerTable oldTrackerTable = tracker.GetTrackerTable(schema.tableName);
1068     const std::set<std::string>& oldExtendColNames = oldTrackerTable.GetExtendNames();
1069     const std::string lowVersionExtendColName = oldTrackerTable.GetExtendName();
1070     if (!oldExtendColNames.empty()) {
1071         errCode = handle->UpdateDeleteDataExtendField(schema.tableName, lowVersionExtendColName,
1072             oldExtendColNames, schema.extendColNames);
1073         if (errCode != E_OK) {
1074             LOGE("[%s [%zu]] Update extend field for delete data failed. %d",
1075                 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1076             (void)handle->Rollback();
1077             return errCode;
1078         }
1079     }
1080     return handle->Commit();
1081 }
1082 
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1083 std::pair<int, bool> SQLiteSingleRelationalStorageEngine::SetDistributedSchema(const DistributedSchema &schema,
1084     bool isForceUpgrade)
1085 {
1086     std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
1087     auto schemaObj = GetSchema();
1088     std::pair<int, bool> res = {E_OK, schemaObj.CheckDistributedSchemaChange(schema)};
1089     auto &[errCode, isSchemaChange] = res;
1090     if (properties_.GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
1091         LOGE("tableMode SPLIT_BY_DEVICE not support set distributed schema");
1092         errCode = -E_NOT_SUPPORT;
1093         return res;
1094     }
1095     if (!isSchemaChange) {
1096         return res;
1097     }
1098     auto localSchema = schemaObj.GetDistributedSchema();
1099     if (localSchema.version != 0 && localSchema.version >= schema.version) {
1100         LOGE("new schema version no upgrade old:%" PRIu32 " new:%" PRIu32, localSchema.version, schema.version);
1101         errCode = -E_INVALID_ARGS;
1102     } else {
1103         errCode = SetDistributedSchemaInner(schemaObj, schema, isForceUpgrade);
1104     }
1105     if (errCode == E_OK) {
1106         SetSchema(schemaObj);
1107     }
1108     return res;
1109 }
1110 
SetDistributedSchemaInner(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,bool isForceUpgrade)1111 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInner(RelationalSchemaObject &schemaObj,
1112     const DistributedSchema &schema, bool isForceUpgrade)
1113 {
1114     int errCode = E_OK;
1115     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
1116         errCode));
1117     if (handle == nullptr) {
1118         return errCode;
1119     }
1120     ResFinalizer resFinalizer([this, handle]() {
1121         auto rdbHandle = handle;
1122         ReleaseExecutor(rdbHandle);
1123     });
1124 
1125     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1126     if (errCode != E_OK) {
1127         return errCode;
1128     }
1129     errCode = SQLiteRelationalUtils::CheckDistributedSchemaValid(schemaObj, schema, isForceUpgrade, handle);
1130     if (errCode != E_OK) {
1131         (void)handle->Rollback();
1132         return errCode;
1133     }
1134     schemaObj.SetDistributedSchema(schema);
1135     for (const auto &table : schema.tables) {
1136         TableInfo tableInfo = schemaObj.GetTable(table.tableName);
1137         if (tableInfo.Empty()) {
1138             continue;
1139         }
1140         tableInfo.SetDistributedTable(schemaObj.GetDistributedTable(table.tableName));
1141         errCode = handle->RenewTableTrigger(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType());
1142         if (errCode != E_OK) {
1143             LOGE("Failed to refresh trigger while setting up distributed schema: %d", errCode);
1144             (void)handle->Rollback();
1145             return errCode;
1146         }
1147         errCode = handle->UpdateHashKey(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType());
1148         if (errCode != E_OK) {
1149             LOGE("Failed to update hash_key while setting up distributed schema: %d", errCode);
1150             (void)handle->Rollback();
1151             return errCode;
1152         }
1153     }
1154     errCode = SaveSchemaToMetaTable(handle, schemaObj);
1155     if (errCode != E_OK) {
1156         LOGE("Save schema to meta table for set distributed schema failed. %d", errCode);
1157         (void)handle->Rollback();
1158         return errCode;
1159     }
1160     return handle->Commit();
1161 }
1162 }
1163 #endif