• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_relational_storage_engine.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "res_finalizer.h"
21 #include "sqlite_relational_database_upgrader.h"
22 #include "sqlite_single_ver_relational_storage_executor.h"
23 #include "sqlite_relational_utils.h"
24 
25 
26 namespace DistributedDB {
SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)27 SQLiteSingleRelationalStorageEngine::SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)
28     : properties_(properties)
29 {}
30 
~SQLiteSingleRelationalStorageEngine()31 SQLiteSingleRelationalStorageEngine::~SQLiteSingleRelationalStorageEngine()
32 {}
33 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)34 StorageExecutor *SQLiteSingleRelationalStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite,
35     bool isMemDb)
36 {
37     auto mode = GetRelationalProperties().GetDistributedTableMode();
38     return new (std::nothrow) SQLiteSingleVerRelationalStorageExecutor(dbHandle, isWrite, mode);
39 }
40 
Upgrade(sqlite3 * db)41 int SQLiteSingleRelationalStorageEngine::Upgrade(sqlite3 *db)
42 {
43     int errCode = SQLiteRelationalUtils::CreateRelationalMetaTable(db);
44     if (errCode != E_OK) {
45         LOGE("Create relational store meta table failed. err=%d", errCode);
46         return errCode;
47     }
48     LOGD("[RelationalEngine][Upgrade] upgrade relational store.");
49     auto upgrader = std::make_unique<SqliteRelationalDatabaseUpgrader>(db);
50     return upgrader->Upgrade();
51 }
52 
RegisterFunction(sqlite3 * db) const53 int SQLiteSingleRelationalStorageEngine::RegisterFunction(sqlite3 *db) const
54 {
55     int errCode = SQLiteUtils::RegisterCalcHash(db);
56     if (errCode != E_OK) {
57         LOGE("[engine] register calculate hash failed!");
58         return errCode;
59     }
60 
61     errCode = SQLiteUtils::RegisterGetLastTime(db);
62     if (errCode != E_OK) {
63         LOGE("[engine] register get last time failed!");
64         return errCode;
65     }
66 
67     errCode = SQLiteUtils::RegisterGetSysTime(db);
68     if (errCode != E_OK) {
69         LOGE("[engine] register get sys time failed!");
70         return errCode;
71     }
72 
73     errCode = SQLiteUtils::RegisterGetRawSysTime(db);
74     if (errCode != E_OK) {
75         LOGE("[engine] register get raw sys time failed!");
76         return errCode;
77     }
78 
79     errCode = SQLiteUtils::RegisterCloudDataChangeObserver(db);
80     if (errCode != E_OK) {
81         LOGE("[engine] register cloud observer failed!");
82     }
83 
84     errCode = SQLiteUtils::RegisterCloudDataChangeServerObserver(db);
85     if (errCode != E_OK) {
86         LOGE("[engine] register cloud server observer failed!");
87     }
88 
89     return errCode;
90 }
91 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)92 int SQLiteSingleRelationalStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
93 {
94     sqlite3 *db = nullptr;
95     int errCode = SQLiteUtils::OpenDatabase(GetOption(), db, false);
96     if (errCode != E_OK) {
97         return errCode;
98     }
99     do {
100         errCode = Upgrade(db); // create meta_data table.
101         if (errCode != E_OK) {
102             break;
103         }
104 
105         errCode = RegisterFunction(db);
106         if (errCode != E_OK) {
107             break;
108         }
109 
110         handle = NewSQLiteStorageExecutor(db, isWrite, false);
111         if (handle == nullptr) {
112             LOGE("[Relational] New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
113             errCode = -E_OUT_OF_MEMORY;
114             break;
115         }
116         return E_OK;
117     } while (false);
118 
119     (void)sqlite3_close_v2(db);
120     db = nullptr;
121     return errCode;
122 }
123 
ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isExternal)124 void SQLiteSingleRelationalStorageEngine::ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor *&handle,
125     bool isExternal)
126 {
127     if (handle == nullptr) {
128         return;
129     }
130     StorageExecutor *databaseHandle = handle;
131     Recycle(databaseHandle, isExternal);
132     handle = nullptr;
133 }
134 
SetSchema(const RelationalSchemaObject & schema)135 void SQLiteSingleRelationalStorageEngine::SetSchema(const RelationalSchemaObject &schema)
136 {
137     std::lock_guard lock(schemaMutex_);
138     schema_ = schema;
139 }
140 
GetSchema() const141 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetSchema() const
142 {
143     std::lock_guard lock(schemaMutex_);
144     return schema_;
145 }
146 
147 namespace {
148 const std::string DEVICE_TYPE = "device";
149 const std::string CLOUD_TYPE = "cloud";
150 const std::string SYNC_TABLE_TYPE = "sync_table_type_";
151 
SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)152 int SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle, const RelationalSchemaObject &schema)
153 {
154     const Key schemaKey(DBConstant::RELATIONAL_SCHEMA_KEY,
155         DBConstant::RELATIONAL_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_SCHEMA_KEY));
156     Value schemaVal;
157     auto schemaStr = schema.ToSchemaString();
158     if (schemaStr.size() > SchemaConstant::SCHEMA_STRING_SIZE_LIMIT) {
159         LOGE("schema is too large %zu", schemaStr.size());
160         return -E_MAX_LIMITS;
161     }
162     DBCommon::StringToVector(schemaStr, schemaVal);
163     int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
164     if (errCode != E_OK) {
165         LOGE("Save schema to meta table failed. %d", errCode);
166     }
167     return errCode;
168 }
169 
SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)170 int SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle,
171     const RelationalSchemaObject &schema)
172 {
173     const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY,
174         DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY));
175     Value schemaVal;
176     DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
177     int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
178     if (errCode != E_OK) {
179         LOGE("Save schema to meta table failed. %d", errCode);
180     }
181     return errCode;
182 }
183 
SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,TableSyncType syncType)184 int SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
185     TableSyncType syncType)
186 {
187     Key key;
188     DBCommon::StringToVector(SYNC_TABLE_TYPE + tableName, key);
189     Value value;
190     DBCommon::StringToVector(syncType == DEVICE_COOPERATION ? DEVICE_TYPE : CLOUD_TYPE, value);
191     int errCode = handle->PutKvData(key, value);
192     if (errCode != E_OK) {
193         LOGE("Save sync table type to meta table failed. %d", errCode);
194         return errCode;
195     }
196     DBCommon::StringToVector(DBConstant::TABLE_IS_DROPPED + tableName, key);
197     errCode = handle->DeleteMetaData({ key });
198     if (errCode != E_OK) {
199         LOGE("Save table drop flag to meta table failed. %d", errCode);
200     }
201     return errCode;
202 }
203 }
204 
CreateDistributedTable(const std::string & tableName,const std::string & identity,bool & schemaChanged,TableSyncType syncType,bool trackerSchemaChanged)205 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName,
206     const std::string &identity, bool &schemaChanged, TableSyncType syncType, bool trackerSchemaChanged)
207 {
208     std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
209     RelationalSchemaObject schema = GetSchema();
210     bool isUpgraded = false;
211     if (DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
212         LOGI("distributed table bas been created.");
213         if (schema.GetTable(tableName).GetTableSyncType() != syncType) {
214             LOGE("table sync type mismatch.");
215             return -E_TYPE_MISMATCH;
216         }
217         isUpgraded = true;
218         int errCode = UpgradeDistributedTable(tableName, schemaChanged, syncType);
219         if (errCode != E_OK) {
220             LOGE("Upgrade distributed table failed. %d", errCode);
221             return errCode;
222         }
223         // Triggers may need to be rebuilt, no return directly
224     } else if (schema.GetTables().size() >= DBConstant::MAX_DISTRIBUTED_TABLE_COUNT) {
225         LOGE("The number of distributed tables is exceeds limit.");
226         return -E_MAX_LIMITS;
227     } else {
228         schemaChanged = true;
229     }
230 
231     int errCode = CreateDistributedTable(tableName, isUpgraded, identity, schema, syncType);
232     if (errCode != E_OK) {
233         LOGE("CreateDistributedTable failed. %d", errCode);
234         return errCode;
235     }
236     if (isUpgraded && (schemaChanged || trackerSchemaChanged)) {
237         // Used for upgrading the stock data of the trackerTable
238         errCode = GenLogInfoForUpgrade(tableName, schema, schemaChanged);
239     }
240     return errCode;
241 }
242 
CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::string & tableName,const std::string & sharedTableName,TableSyncType tableSyncType,RelationalSchemaObject & schema)243 int SQLiteSingleRelationalStorageEngine::CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
244     const std::string &tableName, const std::string &sharedTableName, TableSyncType tableSyncType,
245     RelationalSchemaObject &schema)
246 {
247     TableInfo table;
248     table.SetTableName(sharedTableName);
249     table.SetOriginTableName(tableName);
250     table.SetSharedTableMark(true);
251     table.SetTableSyncType(tableSyncType);
252     table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(sharedTableName));
253     if (!table.GetTrackerTable().IsEmpty() && tableSyncType == TableSyncType::DEVICE_COOPERATION) { // LCOV_EXCL_BR_LINE
254         LOGE("current is trackerTable, not support creating device distributed table.");
255         return -E_NOT_SUPPORT;
256     }
257     bool isUpgraded = schema.GetTable(sharedTableName).GetTableName() == sharedTableName;
258     int errCode = CreateDistributedTable(handle, isUpgraded, "", table, schema);
259     if (errCode != E_OK) {
260         LOGE("create distributed table failed. %d", errCode);
261         return errCode;
262     }
263     std::lock_guard lock(schemaMutex_);
264     schema_ = schema;
265     return errCode;
266 }
267 
CreateDistributedTable(const std::string & tableName,bool isUpgraded,const std::string & identity,RelationalSchemaObject & schema,TableSyncType tableSyncType)268 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName, bool isUpgraded,
269     const std::string &identity, RelationalSchemaObject &schema, TableSyncType tableSyncType)
270 {
271     LOGD("Create distributed table.");
272     int errCode = E_OK;
273     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
274         errCode));
275     if (handle == nullptr) {
276         return errCode;
277     }
278     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
279 
280     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
281     if (errCode != E_OK) {
282         return errCode;
283     }
284 
285     TableInfo table;
286     table.SetTableName(tableName);
287     table.SetTableSyncType(tableSyncType);
288     table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(tableName));
289     table.SetDistributedTable(schema.GetDistributedTable(tableName));
290     if (isUpgraded) {
291         table.SetSourceTableReference(schema.GetTable(tableName).GetTableReference());
292     }
293     errCode = CreateDistributedTable(handle, isUpgraded, identity, table, schema);
294     if (errCode != E_OK) {
295         LOGE("create distributed table failed. %d", errCode);
296         (void)handle->Rollback();
297         return errCode;
298     }
299     errCode = handle->Commit();
300     if (errCode == E_OK) {
301         SetSchema(schema);
302     }
303     return errCode;
304 }
305 
CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isUpgraded,const std::string & identity,TableInfo & table,RelationalSchemaObject & schema)306 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
307     bool isUpgraded, const std::string &identity, TableInfo &table, RelationalSchemaObject &schema)
308 {
309     auto mode = GetRelationalProperties().GetDistributedTableMode();
310     TableSyncType tableSyncType = table.GetTableSyncType();
311     std::string tableName = table.GetTableName();
312     int errCode = handle->InitCursorToMeta(tableName);
313     if (errCode != E_OK) {
314         LOGE("init cursor to meta failed. %d", errCode);
315         return errCode;
316     }
317     errCode = handle->CreateDistributedTable(mode, isUpgraded, identity, table);
318     if (errCode != E_OK) {
319         LOGE("create distributed table failed. %d", errCode);
320         return errCode;
321     }
322 
323     schema.SetTableMode(mode);
324     // update table if tableName changed
325     schema.RemoveRelationalTable(tableName);
326     schema.AddRelationalTable(table);
327     errCode = SaveSchemaToMetaTable(handle, schema);
328     if (errCode != E_OK) {
329         LOGE("Save schema to meta table for create distributed table failed. %d", errCode);
330         return errCode;
331     }
332 
333     errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, tableName, tableSyncType);
334     if (errCode != E_OK) {
335         LOGE("Save sync table type or drop flag to meta table failed. %d", errCode);
336     }
337     return errCode;
338 }
339 
UpgradeDistributedTable(const std::string & tableName,bool & schemaChanged,TableSyncType syncType)340 int SQLiteSingleRelationalStorageEngine::UpgradeDistributedTable(const std::string &tableName, bool &schemaChanged,
341     TableSyncType syncType)
342 {
343     LOGD("Upgrade distributed table.");
344     RelationalSchemaObject schema = GetSchema();
345     int errCode = E_OK;
346     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
347         errCode));
348     if (handle == nullptr) {
349         return errCode;
350     }
351 
352     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
353     if (errCode != E_OK) {
354         ReleaseExecutor(handle);
355         return errCode;
356     }
357 
358     auto mode = GetRelationalProperties().GetDistributedTableMode();
359     errCode = handle->UpgradeDistributedTable(tableName, mode, schemaChanged, schema, syncType);
360     if (errCode != E_OK) {
361         LOGE("Upgrade distributed table failed. %d", errCode);
362         (void)handle->Rollback();
363         ReleaseExecutor(handle);
364         return errCode;
365     }
366 
367     errCode = SaveSchemaToMetaTable(handle, schema);
368         if (errCode != E_OK) {
369         LOGE("Save schema to meta table for upgrade distributed table failed. %d", errCode);
370         (void)handle->Rollback();
371         ReleaseExecutor(handle);
372         return errCode;
373     }
374 
375     errCode = handle->Commit();
376     if (errCode == E_OK) {
377         SetSchema(schema);
378     }
379     ReleaseExecutor(handle);
380     return errCode;
381 }
382 
CleanDistributedDeviceTable(std::vector<std::string> & missingTables)383 int SQLiteSingleRelationalStorageEngine::CleanDistributedDeviceTable(std::vector<std::string> &missingTables)
384 {
385     int errCode = E_OK;
386     auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
387         errCode));
388     if (handle == nullptr) {
389         return errCode;
390     }
391 
392     // go fast to check missing tables without transaction
393     errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
394     if (errCode == E_OK) {
395         if (missingTables.empty()) {
396             LOGI("Missing table is empty.");
397             ReleaseExecutor(handle);
398             return errCode;
399         }
400     } else {
401         LOGE("Get missing distributed table failed. %d", errCode);
402         ReleaseExecutor(handle);
403         return errCode;
404     }
405     missingTables.clear();
406 
407     std::lock_guard lock(schemaMutex_);
408     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
409     if (errCode != E_OK) {
410         ReleaseExecutor(handle);
411         return errCode;
412     }
413 
414     errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
415     if (errCode == E_OK) {
416         // Remove non-existent tables from the schema
417         for (const auto &tableName : missingTables) {
418             schema_.RemoveRelationalTable(tableName);
419         }
420         errCode = SaveSchemaToMetaTable(handle, schema_); // save schema to meta_data
421         if (errCode != E_OK) {
422             LOGE("Save schema to metaTable failed. %d", errCode);
423             (void)handle->Rollback();
424         } else {
425             errCode = handle->Commit();
426         }
427     } else {
428         LOGE("Check distributed table failed. %d", errCode);
429         (void)handle->Rollback();
430     }
431     ReleaseExecutor(handle);
432     return errCode;
433 }
434 
GetProperties() const435 const RelationalDBProperties &SQLiteSingleRelationalStorageEngine::GetProperties() const
436 {
437     return properties_;
438 }
439 
GetRelationalProperties() const440 const RelationalDBProperties SQLiteSingleRelationalStorageEngine::GetRelationalProperties() const
441 {
442     std::lock_guard lock(propertiesMutex_);
443     return properties_;
444 }
445 
SetProperties(const RelationalDBProperties & properties)446 void SQLiteSingleRelationalStorageEngine::SetProperties(const RelationalDBProperties &properties)
447 {
448     std::lock_guard lock(propertiesMutex_);
449     properties_ = properties;
450 }
451 
SetTrackerTable(const TrackerSchema & schema,const TableInfo & tableInfo,bool isFirstCreate)452 int SQLiteSingleRelationalStorageEngine::SetTrackerTable(const TrackerSchema &schema, const TableInfo &tableInfo,
453     bool isFirstCreate)
454 {
455     int errCode = E_OK;
456     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
457         errCode));
458     if (handle == nullptr) {
459         return errCode;
460     }
461     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
462 
463     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
464     if (errCode != E_OK) {
465         return errCode;
466     }
467     RelationalSchemaObject tracker = GetTrackerSchema();
468     tracker.InsertTrackerSchema(schema);
469     int ret = handle->CreateTrackerTable(tracker.GetTrackerTable(schema.tableName), tableInfo, isFirstCreate);
470     if (ret != E_OK && ret != -E_WITH_INVENTORY_DATA) {
471         (void)handle->Rollback();
472         return ret;
473     }
474     errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, schema.tableName, tableInfo.GetTableSyncType());
475     if (errCode != E_OK) {
476         LOGE("[SetTrackerTable] Save sync type to meta table failed: %d", errCode);
477         (void)handle->Rollback();
478         return errCode;
479     }
480 
481     if (schema.trackerColNames.empty() && !schema.isTrackAction) {
482         tracker.RemoveTrackerSchema(schema);
483     }
484     errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
485     if (errCode != E_OK) {
486         (void)handle->Rollback();
487         return errCode;
488     }
489 
490     errCode = handle->Commit();
491     if (errCode != E_OK) {
492         return errCode;
493     }
494 
495     SetTrackerSchema(tracker);
496     return ret;
497 }
498 
CacheTrackerSchema(const TrackerSchema & schema)499 void SQLiteSingleRelationalStorageEngine::CacheTrackerSchema(const TrackerSchema &schema)
500 {
501     std::lock_guard lock(trackerSchemaMutex_);
502     trackerSchema_.InsertTrackerSchema(schema);
503     if (!schema.isTrackAction && schema.trackerColNames.empty()) {
504         // if isTrackAction be false and trackerColNames is empty, will remove the tracker schema.
505         trackerSchema_.RemoveTrackerSchema(schema);
506     }
507 }
508 
GetOrInitTrackerSchemaFromMeta()509 int SQLiteSingleRelationalStorageEngine::GetOrInitTrackerSchemaFromMeta()
510 {
511     RelationalSchemaObject trackerSchema;
512     int errCode = E_OK;
513     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
514         errCode));
515     if (handle == nullptr) {
516         return errCode;
517     }
518     errCode = handle->GetOrInitTrackerSchemaFromMeta(trackerSchema);
519     if (errCode != E_OK) {
520         ReleaseExecutor(handle);
521         return errCode;
522     }
523     const TableInfoMap tableInfoMap = trackerSchema.GetTrackerTables();
524     for (const auto &iter: tableInfoMap) {
525         TableInfo tableInfo;
526         errCode = handle->AnalysisTrackerTable(iter.second.GetTrackerTable(), tableInfo);
527         if (errCode == -E_NOT_FOUND) { // LCOV_EXCL_BR_LINE
528             const std::string trackerTableName = iter.second.GetTrackerTable().GetTableName();
529             errCode = CleanTrackerDeviceTable({ trackerTableName }, trackerSchema, handle);
530             if (errCode != E_OK) {
531                 LOGE("cancel tracker table failed during db opening. %d", errCode);
532                 ReleaseExecutor(handle);
533                 return errCode;
534             }
535         } else if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
536             LOGE("the tracker schema does not match the tracker schema. %d", errCode);
537             ReleaseExecutor(handle);
538             return errCode;
539         }
540     }
541     SetTrackerSchema(trackerSchema);
542     ReleaseExecutor(handle);
543     return E_OK;
544 }
545 
SaveTrackerSchema(const std::string & tableName,bool isFirstCreate)546 int SQLiteSingleRelationalStorageEngine::SaveTrackerSchema(const std::string &tableName, bool isFirstCreate)
547 {
548     int errCode = E_OK;
549     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
550         errCode));
551     if (handle == nullptr) {
552         return errCode;
553     }
554     RelationalSchemaObject tracker = GetTrackerSchema();
555     errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
556     if (errCode != E_OK || !isFirstCreate) {
557         ReleaseExecutor(handle);
558         return errCode;
559     }
560     errCode = handle->CheckInventoryData(DBCommon::GetLogTableName(tableName));
561     ReleaseExecutor(handle);
562     return errCode;
563 }
564 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)565 int SQLiteSingleRelationalStorageEngine::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
566 {
567     int errCode = E_OK;
568     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(!condition.readOnly,
569         OperatePerm::NORMAL_PERM, errCode, true));
570     if (handle == nullptr) {
571         return errCode;
572     }
573     errCode = handle->ExecuteSql(condition, records);
574     ReleaseExecutor(handle, true);
575     return errCode;
576 }
577 
CheckReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema)578 static int CheckReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
579     const RelationalSchemaObject &schema)
580 {
581     for (const auto &reference : tableReferenceProperty) {
582         TableInfo sourceTableInfo = schema.GetTable(reference.sourceTableName);
583         TableInfo targetTableInfo = schema.GetTable(reference.targetTableName);
584         if (strcasecmp(sourceTableInfo.GetTableName().c_str(), reference.sourceTableName.c_str()) != 0 ||
585             strcasecmp(targetTableInfo.GetTableName().c_str(), reference.targetTableName.c_str()) != 0) {
586             LOGE("can't set reference for table which doesn't create distributed table.");
587             return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
588         }
589         if (sourceTableInfo.GetTableSyncType() != CLOUD_COOPERATION ||
590             targetTableInfo.GetTableSyncType() != CLOUD_COOPERATION) {
591             LOGE("can't set reference for table which doesn't create distributed table with cloud mode.");
592             return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
593         }
594         if (sourceTableInfo.GetSharedTableMark() || targetTableInfo.GetSharedTableMark()) {
595             LOGE("can't set reference for shared table.");
596             return -E_NOT_SUPPORT;
597         }
598 
599         FieldInfoMap sourceFieldMap = sourceTableInfo.GetFields();
600         FieldInfoMap targetFieldMap = targetTableInfo.GetFields();
601         for (const auto &[sourceFieldName, targetFieldName] : reference.columns) {
602             if (sourceFieldMap.find(sourceFieldName) == sourceFieldMap.end() ||
603                 targetFieldMap.find(targetFieldName) == targetFieldMap.end()) {
604                 LOGE("reference field doesn't exists in table.");
605                 return -E_INVALID_ARGS;
606             }
607         }
608     }
609     return E_OK;
610 }
611 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,SQLiteSingleVerRelationalStorageExecutor * handle,std::set<std::string> & clearWaterMarkTables,RelationalSchemaObject & schema)612 int SQLiteSingleRelationalStorageEngine::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
613     SQLiteSingleVerRelationalStorageExecutor *handle, std::set<std::string> &clearWaterMarkTables,
614     RelationalSchemaObject &schema)
615 {
616     std::lock_guard lock(schemaMutex_);
617     schema = schema_;
618     int errCode = CheckReference(tableReferenceProperty, schema);
619     if (errCode != E_OK) {
620         LOGE("check reference failed, errCode = %d.", errCode);
621         return errCode;
622     }
623     int res = handle->GetClearWaterMarkTables(tableReferenceProperty, schema, clearWaterMarkTables);
624     if (res != E_OK && res != -E_TABLE_REFERENCE_CHANGED) {
625         return res;
626     }
627     schema.SetReferenceProperty(tableReferenceProperty);
628     errCode = SaveSchemaToMetaTable(handle, schema);
629     if (errCode != E_OK) {
630         LOGE("Save schema to meta table for reference failed. %d", errCode);
631         return errCode;
632     }
633     auto mode = GetRelationalProperties().GetDistributedTableMode();
634     for (auto &table : schema.GetTables()) {
635         if (table.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
636             continue;
637         }
638         TableInfo tableInfo = table.second;
639         errCode = handle->RenewTableTrigger(mode, tableInfo, TableSyncType::CLOUD_COOPERATION);
640         if (errCode != E_OK) {
641             LOGE("renew table trigger for reference failed. %d", errCode);
642             return errCode;
643         }
644     }
645     return res;
646 }
647 
GetTrackerSchema() const648 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetTrackerSchema() const
649 {
650     std::lock_guard lock(trackerSchemaMutex_);
651     return trackerSchema_;
652 }
653 
SetTrackerSchema(const RelationalSchemaObject & trackerSchema)654 void SQLiteSingleRelationalStorageEngine::SetTrackerSchema(const RelationalSchemaObject &trackerSchema)
655 {
656     std::lock_guard lock(trackerSchemaMutex_);
657     trackerSchema_ = trackerSchema;
658 }
659 
CleanTrackerData(const std::string & tableName,int64_t cursor)660 int SQLiteSingleRelationalStorageEngine::CleanTrackerData(const std::string &tableName, int64_t cursor)
661 {
662     int errCode = E_OK;
663     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
664         errCode));
665     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
666         return errCode;
667     }
668     TrackerTable trackerTable = GetTrackerSchema().GetTrackerTable(tableName);
669     bool isOnlyTrackTable = false;
670     RelationalSchemaObject schema = GetSchema();
671     if (!trackerTable.IsTableNameEmpty() &&
672         !DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
673         isOnlyTrackTable = true;
674     }
675     errCode = handle->CleanTrackerData(tableName, cursor, isOnlyTrackTable);
676     ReleaseExecutor(handle);
677     return errCode;
678 }
679 
UpgradeSharedTable(const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)680 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTable(const DataBaseSchema &cloudSchema,
681     const std::vector<std::string> &deleteTableNames, const std::map<std::string, std::vector<Field>> &updateTableNames,
682     const std::map<std::string, std::string> &alterTableNames)
683 {
684     int errCode = E_OK;
685     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
686         errCode));
687     if (handle == nullptr) {
688         return errCode;
689     }
690     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
691     if (errCode != E_OK) {
692         ReleaseExecutor(handle);
693         return errCode;
694     }
695     RelationalSchemaObject schema = GetSchema();
696     errCode = UpgradeSharedTableInner(handle, cloudSchema, deleteTableNames, updateTableNames, alterTableNames);
697     if (errCode != E_OK) {
698         handle->Rollback();
699         ReleaseExecutor(handle);
700         return errCode;
701     }
702     errCode = handle->Commit();
703     if (errCode != E_OK) {
704         std::lock_guard lock(schemaMutex_);
705         schema_ = schema; // revert schema to the initial state
706     }
707     ReleaseExecutor(handle);
708     return errCode;
709 }
710 
UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)711 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
712     const DataBaseSchema &cloudSchema, const std::vector<std::string> &deleteTableNames,
713     const std::map<std::string, std::vector<Field>> &updateTableNames,
714     const std::map<std::string, std::string> &alterTableNames)
715 {
716     RelationalSchemaObject schema = GetSchema();
717     int errCode = DoDeleteSharedTable(handle, deleteTableNames, schema);
718     if (errCode != E_OK) {
719         LOGE("[RelationalStorageEngine] delete shared table or distributed table failed. %d", errCode);
720         return errCode;
721     }
722     errCode = DoUpdateSharedTable(handle, updateTableNames, cloudSchema, schema);
723     if (errCode != E_OK) {
724         LOGE("[RelationalStorageEngine] update shared table or distributed table failed. %d", errCode);
725         return errCode;
726     }
727     errCode = CheckIfExistUserTable(handle, cloudSchema, alterTableNames, schema);
728     if (errCode != E_OK) {
729         LOGE("[RelationalStorageEngine] check local user table failed. %d", errCode);
730         return errCode;
731     }
732     errCode = DoAlterSharedTableName(handle, alterTableNames, schema);
733     if (errCode != E_OK) {
734         LOGE("[RelationalStorageEngine] alter shared table or distributed table failed. %d", errCode);
735         return errCode;
736     }
737     errCode = DoCreateSharedTable(handle, cloudSchema, updateTableNames, alterTableNames, schema);
738     if (errCode != E_OK) {
739         LOGE("[RelationalStorageEngine] create shared table or distributed table failed. %d", errCode);
740         return errCode;
741     }
742     std::lock_guard lock(schemaMutex_);
743     schema_ = schema;
744     return E_OK;
745 }
746 
DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::vector<std::string> & deleteTableNames,RelationalSchemaObject & schema)747 int SQLiteSingleRelationalStorageEngine::DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
748     const std::vector<std::string> &deleteTableNames, RelationalSchemaObject &schema)
749 {
750     if (deleteTableNames.empty()) {
751         return E_OK;
752     }
753     int errCode = handle->DeleteTable(deleteTableNames);
754     if (errCode != E_OK) {
755         LOGE("[RelationalStorageEngine] delete shared table failed. %d", errCode);
756         return errCode;
757     }
758     std::vector<Key> keys;
759     for (const auto &tableName : deleteTableNames) {
760         errCode = handle->CleanResourceForDroppedTable(tableName);
761         if (errCode != E_OK) {
762             LOGE("[RelationalStorageEngine] delete shared distributed table failed. %d", errCode);
763             return errCode;
764         }
765         Key sharedTableKey = DBCommon::GetPrefixTableName(tableName);
766         if (sharedTableKey.empty() || sharedTableKey.size() > DBConstant::MAX_KEY_SIZE) {
767             LOGE("[RelationalStorageEngine] shared table key is invalid.");
768             return -E_INVALID_ARGS;
769         }
770         keys.push_back(sharedTableKey);
771         schema.RemoveRelationalTable(tableName);
772     }
773     errCode = handle->DeleteMetaData(keys);
774     if (errCode != E_OK) {
775         LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
776     }
777     return errCode;
778 }
779 
DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::vector<Field>> & updateTableNames,const DataBaseSchema & cloudSchema,RelationalSchemaObject & localSchema)780 int SQLiteSingleRelationalStorageEngine::DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
781     const std::map<std::string, std::vector<Field>> &updateTableNames, const DataBaseSchema &cloudSchema,
782     RelationalSchemaObject &localSchema)
783 {
784     if (updateTableNames.empty()) {
785         return E_OK;
786     }
787     int errCode = handle->UpdateSharedTable(updateTableNames);
788     if (errCode != E_OK) {
789         LOGE("[RelationalStorageEngine] update shared table failed. %d", errCode);
790         return errCode;
791     }
792     for (const auto &tableSchema : cloudSchema.tables) {
793         if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
794             errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
795                 TableSyncType::CLOUD_COOPERATION, localSchema);
796             if (errCode != E_OK) {
797                 LOGE("[RelationalStorageEngine] update shared distributed table failed. %d", errCode);
798                 return errCode;
799             }
800         }
801     }
802     return E_OK;
803 }
804 
DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)805 int SQLiteSingleRelationalStorageEngine::DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor *&handle,
806     const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
807 {
808     if (alterTableNames.empty()) {
809         return E_OK;
810     }
811     int errCode = handle->AlterTableName(alterTableNames);
812     if (errCode != E_OK) {
813         LOGE("[RelationalStorageEngine] alter shared table failed. %d", errCode);
814         return errCode;
815     }
816     std::map<std::string, std::string> distributedSharedTableNames;
817     for (const auto &tableName : alterTableNames) {
818         errCode = handle->DeleteTableTrigger(tableName.first);
819         if (errCode != E_OK) {
820             LOGE("[RelationalStorageEngine] delete shared table trigger failed. %d", errCode);
821             return errCode;
822         }
823         std::string oldDistributedName = DBCommon::GetLogTableName(tableName.first);
824         std::string newDistributedName = DBCommon::GetLogTableName(tableName.second);
825         distributedSharedTableNames[oldDistributedName] = newDistributedName;
826     }
827     errCode = handle->AlterTableName(distributedSharedTableNames);
828     if (errCode != E_OK) {
829         LOGE("[RelationalStorageEngine] alter distributed shared table failed. %d", errCode);
830         return errCode;
831     }
832     for (const auto &[oldTableName, newTableName] : alterTableNames) {
833         TableInfo tableInfo = schema.GetTable(oldTableName);
834         tableInfo.SetTableName(newTableName);
835         schema.AddRelationalTable(tableInfo);
836         schema.RemoveRelationalTable(oldTableName);
837     }
838     errCode = UpdateKvData(handle, alterTableNames);
839     if (errCode != E_OK) {
840         LOGE("[RelationalStorageEngine] update kv data failed. %d", errCode);
841     }
842     return errCode;
843 }
844 
DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)845 int SQLiteSingleRelationalStorageEngine::DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
846     const DataBaseSchema &cloudSchema, const std::map<std::string, std::vector<Field>> &updateTableNames,
847     const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
848 {
849     for (auto const &tableSchema : cloudSchema.tables) {
850         if (tableSchema.sharedTableName.empty()) {
851             continue;
852         }
853         if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
854             continue;
855         }
856         bool isUpdated = false;
857         for (const auto &alterTableName : alterTableNames) {
858             if (alterTableName.second == tableSchema.sharedTableName) {
859                 isUpdated = true;
860                 break;
861             }
862         }
863         if (isUpdated) {
864             continue;
865         }
866         int errCode = handle->CreateSharedTable(tableSchema);
867         if (errCode != E_OK) {
868             return errCode;
869         }
870         errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
871             TableSyncType::CLOUD_COOPERATION, schema);
872         if (errCode != E_OK) {
873             return errCode;
874         }
875     }
876     return E_OK;
877 }
878 
UpdateKvData(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames)879 int SQLiteSingleRelationalStorageEngine::UpdateKvData(SQLiteSingleVerRelationalStorageExecutor *&handle,
880     const std::map<std::string, std::string> &alterTableNames)
881 {
882     std::vector<Key> keys;
883     for (const auto &tableName : alterTableNames) {
884         Key oldKey = DBCommon::GetPrefixTableName(tableName.first);
885         Value value;
886         int ret = handle->GetKvData(oldKey, value);
887         if (ret == -E_NOT_FOUND) {
888             continue;
889         }
890         if (ret != E_OK) {
891             LOGE("[RelationalStorageEngine] get meta data failed. %d", ret);
892             return ret;
893         }
894         keys.push_back(oldKey);
895         Key newKey = DBCommon::GetPrefixTableName(tableName.second);
896         ret = handle->PutKvData(newKey, value);
897         if (ret != E_OK) {
898             LOGE("[RelationalStorageEngine] put meta data failed. %d", ret);
899             return ret;
900         }
901     }
902     int errCode = handle->DeleteMetaData(keys);
903     if (errCode != E_OK) {
904         LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
905     }
906     return errCode;
907 }
908 
CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::string> & alterTableNames,const RelationalSchemaObject & schema)909 int SQLiteSingleRelationalStorageEngine::CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
910     const DataBaseSchema &cloudSchema, const std::map<std::string, std::string> &alterTableNames,
911     const RelationalSchemaObject &schema)
912 {
913     for (const auto &tableSchema : cloudSchema.tables) {
914         if (alterTableNames.find(tableSchema.sharedTableName) != alterTableNames.end()) {
915             continue;
916         }
917         TableInfo tableInfo = schema.GetTable(tableSchema.sharedTableName);
918         if (tableInfo.GetSharedTableMark()) {
919             continue;
920         }
921         int errCode = handle->CheckIfExistUserTable(tableSchema.sharedTableName);
922         if (errCode != E_OK) {
923             LOGE("[RelationalStorageEngine] local exists table. %d", errCode);
924             return errCode;
925         }
926     }
927     return E_OK;
928 }
929 
CalTableRef(const std::vector<std::string> & tableNames,const std::map<std::string,std::string> & sharedTableOriginNames)930 std::pair<std::vector<std::string>, int> SQLiteSingleRelationalStorageEngine::CalTableRef(
931     const std::vector<std::string> &tableNames, const std::map<std::string, std::string> &sharedTableOriginNames)
932 {
933     std::pair<std::vector<std::string>, int> res = { tableNames, E_OK };
934     std::map<std::string, std::map<std::string, bool>> reachableReference;
935     std::map<std::string, int> tableWeight;
936     {
937         std::lock_guard lock(schemaMutex_);
938         reachableReference = schema_.GetReachableRef();
939         tableWeight = schema_.GetTableWeight();
940     }
941     if (reachableReference.empty()) {
942         return res;
943     }
944     auto reachableWithShared = GetReachableWithShared(reachableReference, sharedTableOriginNames);
945     // check dependency conflict
946     for (size_t i = 0; i < tableNames.size(); ++i) {
947         for (size_t j = i + 1; j < tableNames.size(); ++j) {
948             // such as table A B, if dependency is A->B
949             // sync should not be A->B, it should be B->A
950             // so if A can reach B, it's wrong
951             if (reachableWithShared[tableNames[i]][tableNames[j]]) {
952                 LOGE("[RDBStorageEngine] table %zu reach table %zu", i, j);
953                 res.second = -E_INVALID_ARGS;
954                 return res;
955             }
956         }
957     }
958     tableWeight = GetTableWeightWithShared(tableWeight, sharedTableOriginNames);
959     auto actualTable = DBCommon::GenerateNodesByNodeWeight(tableNames, reachableWithShared, tableWeight);
960     res.first.assign(actualTable.begin(), actualTable.end());
961     return res;
962 }
963 
CleanTrackerDeviceTable(const std::vector<std::string> & tableNames,RelationalSchemaObject & trackerSchemaObj,SQLiteSingleVerRelationalStorageExecutor * & handle)964 int SQLiteSingleRelationalStorageEngine::CleanTrackerDeviceTable(const std::vector<std::string> &tableNames,
965     RelationalSchemaObject &trackerSchemaObj, SQLiteSingleVerRelationalStorageExecutor *&handle)
966 {
967     std::vector<std::string> missingTrackerTables;
968     int errCode = handle->CheckAndCleanDistributedTable(tableNames, missingTrackerTables);
969     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
970         LOGE("Check tracker table failed. %d", errCode);
971         return errCode;
972     }
973     if (missingTrackerTables.empty()) { // LCOV_EXCL_BR_LINE
974         return E_OK;
975     }
976     for (const auto &tableName : missingTrackerTables) {
977         TrackerSchema schema;
978         schema.tableName = tableName;
979         trackerSchemaObj.RemoveTrackerSchema(schema);
980     }
981     errCode = SaveTrackerSchemaToMetaTable(handle, trackerSchemaObj); // save schema to meta_data
982     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
983         LOGE("Save tracker schema to metaTable failed. %d", errCode);
984     }
985     return errCode;
986 }
987 
GenLogInfoForUpgrade(const std::string & tableName,RelationalSchemaObject & schema,bool schemaChanged)988 int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string &tableName,
989     RelationalSchemaObject &schema, bool schemaChanged)
990 {
991     int errCode = E_OK;
992     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
993         OperatePerm::NORMAL_PERM, errCode));
994     if (handle == nullptr) {
995         return errCode;
996     }
997     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
998 
999     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1000     if (errCode != E_OK) {
1001         return errCode;
1002     }
1003 
1004     TableInfo table = GetSchema().GetTable(tableName);
1005     errCode = handle->UpgradedLogForExistedData(table, schemaChanged);
1006     if (errCode != E_OK) {
1007         LOGE("Upgrade tracker table log failed. %d", errCode);
1008         (void)handle->Rollback();
1009         return errCode;
1010     }
1011     return handle->Commit();
1012 }
1013 
GetReachableWithShared(const std::map<std::string,std::map<std::string,bool>> & reachableReference,const std::map<std::string,std::string> & tableToShared)1014 std::map<std::string, std::map<std::string, bool>> SQLiteSingleRelationalStorageEngine::GetReachableWithShared(
1015     const std::map<std::string, std::map<std::string, bool>> &reachableReference,
1016     const std::map<std::string, std::string> &tableToShared)
1017 {
1018     // we translate all origin table to shared table
1019     std::map<std::string, std::map<std::string, bool>> reachableWithShared;
1020     for (const auto &[source, reach] : reachableReference) {
1021         bool sourceHasNoShared = tableToShared.find(source) == tableToShared.end();
1022         for (const auto &[target, isReach] : reach) {
1023             // merge two reachable reference
1024             reachableWithShared[source][target] = isReach;
1025             if (sourceHasNoShared || tableToShared.find(target) == tableToShared.end()) {
1026                 continue;
1027             }
1028             // record shared reachable reference
1029             reachableWithShared[tableToShared.at(source)][tableToShared.at(target)] = isReach;
1030         }
1031     }
1032     return reachableWithShared;
1033 }
1034 
GetTableWeightWithShared(const std::map<std::string,int> & tableWeight,const std::map<std::string,std::string> & tableToShared)1035 std::map<std::string, int> SQLiteSingleRelationalStorageEngine::GetTableWeightWithShared(
1036     const std::map<std::string, int> &tableWeight, const std::map<std::string, std::string> &tableToShared)
1037 {
1038     std::map<std::string, int> res;
1039     for (const auto &[table, weight] : tableWeight) {
1040         res[table] = weight;
1041         if (tableToShared.find(table) == tableToShared.end()) {
1042             continue;
1043         }
1044         res[tableToShared.at(table)] = weight;
1045     }
1046     return res;
1047 }
1048 
UpdateExtendField(const DistributedDB::TrackerSchema & schema)1049 int SQLiteSingleRelationalStorageEngine::UpdateExtendField(const DistributedDB::TrackerSchema &schema)
1050 {
1051     if (schema.extendColNames.empty()) {
1052         return E_OK;
1053     }
1054     int errCode = E_OK;
1055     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
1056         OperatePerm::NORMAL_PERM, errCode));
1057     if (handle == nullptr) {
1058         return errCode;
1059     }
1060     ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
1061 
1062     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1063     if (errCode != E_OK) {
1064         return errCode;
1065     }
1066 
1067     errCode = handle->UpdateExtendField(schema.tableName, schema.extendColNames);
1068     if (errCode != E_OK) {
1069         LOGE("[%s [%zu]] Update extend field failed. %d",
1070             DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1071         (void)handle->Rollback();
1072         return errCode;
1073     }
1074 
1075     RelationalSchemaObject tracker = GetTrackerSchema();
1076     TrackerTable oldTrackerTable = tracker.GetTrackerTable(schema.tableName);
1077     const std::set<std::string>& oldExtendColNames = oldTrackerTable.GetExtendNames();
1078     const std::string lowVersionExtendColName = oldTrackerTable.GetExtendName();
1079     if (!oldExtendColNames.empty()) {
1080         errCode = handle->UpdateDeleteDataExtendField(schema.tableName, lowVersionExtendColName,
1081             oldExtendColNames, schema.extendColNames);
1082         if (errCode != E_OK) {
1083             LOGE("[%s [%zu]] Update extend field for delete data failed. %d",
1084                 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1085             (void)handle->Rollback();
1086             return errCode;
1087         }
1088     }
1089 
1090     // Try clear historical mismatched log, which usually do not occur and apply to tracker table only.
1091     if (GetSchema().GetTable(schema.tableName).Empty()) {
1092         handle->ClearLogOfMismatchedData(schema.tableName);
1093     }
1094     return handle->Commit();
1095 }
1096 
SetDistributedSchema(const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade)1097 std::pair<int, bool> SQLiteSingleRelationalStorageEngine::SetDistributedSchema(const DistributedSchema &schema,
1098     const std::string &localIdentity, bool isForceUpgrade)
1099 {
1100     std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
1101     auto schemaObj = GetSchema();
1102     std::pair<int, bool> res = {E_OK, schemaObj.CheckDistributedSchemaChange(schema)};
1103     auto &[errCode, isSchemaChange] = res;
1104     if (GetRelationalProperties().GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
1105         LOGE("tableMode SPLIT_BY_DEVICE not support set distributed schema");
1106         errCode = -E_NOT_SUPPORT;
1107         return res;
1108     }
1109     if (!isSchemaChange) {
1110         return res;
1111     }
1112     auto localSchema = schemaObj.GetDistributedSchema();
1113     if (localSchema.version != 0 && localSchema.version >= schema.version) {
1114         LOGE("new schema version no upgrade old:%" PRIu32 " new:%" PRIu32, localSchema.version, schema.version);
1115         errCode = -E_INVALID_ARGS;
1116     } else {
1117         errCode = SetDistributedSchemaInner(schemaObj, schema, localIdentity, isForceUpgrade);
1118     }
1119     if (errCode == E_OK) {
1120         SetSchema(schemaObj);
1121     }
1122     return res;
1123 }
1124 
SetDistributedSchemaInner(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade)1125 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInner(RelationalSchemaObject &schemaObj,
1126     const DistributedSchema &schema, const std::string &localIdentity, bool isForceUpgrade)
1127 {
1128     int errCode = E_OK;
1129     auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
1130         errCode));
1131     if (handle == nullptr) {
1132         return errCode;
1133     }
1134     ResFinalizer resFinalizer([this, handle]() {
1135         auto rdbHandle = handle;
1136         ReleaseExecutor(rdbHandle);
1137     });
1138 
1139     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1140     if (errCode != E_OK) {
1141         return errCode;
1142     }
1143     errCode = SetDistributedSchemaInTraction(schemaObj, schema, localIdentity, isForceUpgrade, *handle);
1144     if (errCode != E_OK) {
1145         (void)handle->Rollback();
1146         return errCode;
1147     }
1148     return handle->Commit();
1149 }
1150 
SetDistributedSchemaInTraction(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade,SQLiteSingleVerRelationalStorageExecutor & handle)1151 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInTraction(RelationalSchemaObject &schemaObj,
1152     const DistributedSchema &schema, const std::string &localIdentity, bool isForceUpgrade,
1153     SQLiteSingleVerRelationalStorageExecutor &handle)
1154 {
1155     int errCode = SQLiteRelationalUtils::CheckDistributedSchemaValid(schemaObj, schema, isForceUpgrade, &handle);
1156     if (errCode != E_OK) {
1157         return errCode;
1158     }
1159     auto changeStatus = schemaObj.GetTableChangeStatus(schema);
1160     schemaObj.SetDistributedSchema(schema);
1161     for (const auto &table : schema.tables) {
1162         if (!changeStatus[table.tableName]) {
1163             continue;
1164         }
1165         TableInfo tableInfo = schemaObj.GetTable(table.tableName);
1166         tableInfo.SetTrackerTable(GetTrackerSchema().GetTrackerTable(table.tableName));
1167         if (tableInfo.Empty()) {
1168             continue;
1169         }
1170         tableInfo.SetDistributedTable(schemaObj.GetDistributedTable(table.tableName));
1171         errCode = handle.RenewTableTrigger(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType(),
1172             localIdentity);
1173         if (errCode != E_OK) {
1174             LOGE("Failed to refresh %s:%zu trigger while setting up distributed schema: %d",
1175                 DBCommon::StringMiddleMasking(table.tableName).c_str(), table.tableName.size(), errCode);
1176             return errCode;
1177         }
1178         errCode = handle.UpdateHashKey(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType(),
1179             localIdentity);
1180         if (errCode != E_OK) {
1181             LOGE("Failed to update %s:%zu hash_key while setting up distributed schema: %d",
1182                 DBCommon::StringMiddleMasking(table.tableName).c_str(), table.tableName.size(), errCode);
1183             return errCode;
1184         }
1185     }
1186     errCode = SaveSchemaToMetaTable(&handle, schemaObj);
1187     if (errCode != E_OK) {
1188         LOGE("Save schema to meta table for set distributed schema failed. %d", errCode);
1189     }
1190     return errCode;
1191 }
1192 }
1193 #endif