1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_relational_storage_engine.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "res_finalizer.h"
21 #include "sqlite_relational_database_upgrader.h"
22 #include "sqlite_single_ver_relational_storage_executor.h"
23 #include "sqlite_relational_utils.h"
24
25
26 namespace DistributedDB {
SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)27 SQLiteSingleRelationalStorageEngine::SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)
28 : properties_(properties)
29 {}
30
~SQLiteSingleRelationalStorageEngine()31 SQLiteSingleRelationalStorageEngine::~SQLiteSingleRelationalStorageEngine()
32 {}
33
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)34 StorageExecutor *SQLiteSingleRelationalStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite,
35 bool isMemDb)
36 {
37 auto mode = GetRelationalProperties().GetDistributedTableMode();
38 return new (std::nothrow) SQLiteSingleVerRelationalStorageExecutor(dbHandle, isWrite, mode);
39 }
40
Upgrade(sqlite3 * db)41 int SQLiteSingleRelationalStorageEngine::Upgrade(sqlite3 *db)
42 {
43 int errCode = SQLiteRelationalUtils::CreateRelationalMetaTable(db);
44 if (errCode != E_OK) {
45 LOGE("Create relational store meta table failed. err=%d", errCode);
46 return errCode;
47 }
48 LOGD("[RelationalEngine][Upgrade] upgrade relational store.");
49 auto upgrader = std::make_unique<SqliteRelationalDatabaseUpgrader>(db);
50 return upgrader->Upgrade();
51 }
52
RegisterFunction(sqlite3 * db) const53 int SQLiteSingleRelationalStorageEngine::RegisterFunction(sqlite3 *db) const
54 {
55 int errCode = SQLiteUtils::RegisterCalcHash(db);
56 if (errCode != E_OK) {
57 LOGE("[engine] register calculate hash failed!");
58 return errCode;
59 }
60
61 errCode = SQLiteUtils::RegisterGetLastTime(db);
62 if (errCode != E_OK) {
63 LOGE("[engine] register get last time failed!");
64 return errCode;
65 }
66
67 errCode = SQLiteUtils::RegisterGetSysTime(db);
68 if (errCode != E_OK) {
69 LOGE("[engine] register get sys time failed!");
70 return errCode;
71 }
72
73 errCode = SQLiteUtils::RegisterGetRawSysTime(db);
74 if (errCode != E_OK) {
75 LOGE("[engine] register get raw sys time failed!");
76 return errCode;
77 }
78
79 errCode = SQLiteUtils::RegisterCloudDataChangeObserver(db);
80 if (errCode != E_OK) {
81 LOGE("[engine] register cloud observer failed!");
82 }
83
84 errCode = SQLiteUtils::RegisterCloudDataChangeServerObserver(db);
85 if (errCode != E_OK) {
86 LOGE("[engine] register cloud server observer failed!");
87 }
88
89 return errCode;
90 }
91
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)92 int SQLiteSingleRelationalStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
93 {
94 sqlite3 *db = nullptr;
95 int errCode = SQLiteUtils::OpenDatabase(GetOption(), db, false);
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 do {
100 errCode = Upgrade(db); // create meta_data table.
101 if (errCode != E_OK) {
102 break;
103 }
104
105 errCode = RegisterFunction(db);
106 if (errCode != E_OK) {
107 break;
108 }
109
110 handle = NewSQLiteStorageExecutor(db, isWrite, false);
111 if (handle == nullptr) {
112 LOGE("[Relational] New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
113 errCode = -E_OUT_OF_MEMORY;
114 break;
115 }
116 return E_OK;
117 } while (false);
118
119 (void)sqlite3_close_v2(db);
120 db = nullptr;
121 return errCode;
122 }
123
ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isExternal)124 void SQLiteSingleRelationalStorageEngine::ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor *&handle,
125 bool isExternal)
126 {
127 if (handle == nullptr) {
128 return;
129 }
130 StorageExecutor *databaseHandle = handle;
131 Recycle(databaseHandle, isExternal);
132 handle = nullptr;
133 }
134
SetSchema(const RelationalSchemaObject & schema)135 void SQLiteSingleRelationalStorageEngine::SetSchema(const RelationalSchemaObject &schema)
136 {
137 std::lock_guard lock(schemaMutex_);
138 schema_ = schema;
139 }
140
GetSchema() const141 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetSchema() const
142 {
143 std::lock_guard lock(schemaMutex_);
144 return schema_;
145 }
146
147 namespace {
148 const std::string DEVICE_TYPE = "device";
149 const std::string CLOUD_TYPE = "cloud";
150 const std::string SYNC_TABLE_TYPE = "sync_table_type_";
151
SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)152 int SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle, const RelationalSchemaObject &schema)
153 {
154 const Key schemaKey(DBConstant::RELATIONAL_SCHEMA_KEY,
155 DBConstant::RELATIONAL_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_SCHEMA_KEY));
156 Value schemaVal;
157 auto schemaStr = schema.ToSchemaString();
158 if (schemaStr.size() > SchemaConstant::SCHEMA_STRING_SIZE_LIMIT) {
159 LOGE("schema is too large %zu", schemaStr.size());
160 return -E_MAX_LIMITS;
161 }
162 DBCommon::StringToVector(schemaStr, schemaVal);
163 int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
164 if (errCode != E_OK) {
165 LOGE("Save schema to meta table failed. %d", errCode);
166 }
167 return errCode;
168 }
169
SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)170 int SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle,
171 const RelationalSchemaObject &schema)
172 {
173 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY,
174 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY + strlen(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY));
175 Value schemaVal;
176 DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
177 int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
178 if (errCode != E_OK) {
179 LOGE("Save schema to meta table failed. %d", errCode);
180 }
181 return errCode;
182 }
183
SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,TableSyncType syncType)184 int SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
185 TableSyncType syncType)
186 {
187 Key key;
188 DBCommon::StringToVector(SYNC_TABLE_TYPE + tableName, key);
189 Value value;
190 DBCommon::StringToVector(syncType == DEVICE_COOPERATION ? DEVICE_TYPE : CLOUD_TYPE, value);
191 int errCode = handle->PutKvData(key, value);
192 if (errCode != E_OK) {
193 LOGE("Save sync table type to meta table failed. %d", errCode);
194 return errCode;
195 }
196 DBCommon::StringToVector(DBConstant::TABLE_IS_DROPPED + tableName, key);
197 errCode = handle->DeleteMetaData({ key });
198 if (errCode != E_OK) {
199 LOGE("Save table drop flag to meta table failed. %d", errCode);
200 }
201 return errCode;
202 }
203 }
204
CreateDistributedTable(const std::string & tableName,const std::string & identity,bool & schemaChanged,TableSyncType syncType,bool trackerSchemaChanged)205 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName,
206 const std::string &identity, bool &schemaChanged, TableSyncType syncType, bool trackerSchemaChanged)
207 {
208 std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
209 RelationalSchemaObject schema = GetSchema();
210 bool isUpgraded = false;
211 if (DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
212 LOGI("distributed table bas been created.");
213 if (schema.GetTable(tableName).GetTableSyncType() != syncType) {
214 LOGE("table sync type mismatch.");
215 return -E_TYPE_MISMATCH;
216 }
217 isUpgraded = true;
218 int errCode = UpgradeDistributedTable(tableName, schemaChanged, syncType);
219 if (errCode != E_OK) {
220 LOGE("Upgrade distributed table failed. %d", errCode);
221 return errCode;
222 }
223 // Triggers may need to be rebuilt, no return directly
224 } else if (schema.GetTables().size() >= DBConstant::MAX_DISTRIBUTED_TABLE_COUNT) {
225 LOGE("The number of distributed tables is exceeds limit.");
226 return -E_MAX_LIMITS;
227 } else {
228 schemaChanged = true;
229 }
230
231 int errCode = CreateDistributedTable(tableName, isUpgraded, identity, schema, syncType);
232 if (errCode != E_OK) {
233 LOGE("CreateDistributedTable failed. %d", errCode);
234 return errCode;
235 }
236 if (isUpgraded && (schemaChanged || trackerSchemaChanged)) {
237 // Used for upgrading the stock data of the trackerTable
238 errCode = GenLogInfoForUpgrade(tableName, schema, schemaChanged);
239 }
240 return errCode;
241 }
242
CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::string & tableName,const std::string & sharedTableName,TableSyncType tableSyncType,RelationalSchemaObject & schema)243 int SQLiteSingleRelationalStorageEngine::CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
244 const std::string &tableName, const std::string &sharedTableName, TableSyncType tableSyncType,
245 RelationalSchemaObject &schema)
246 {
247 TableInfo table;
248 table.SetTableName(sharedTableName);
249 table.SetOriginTableName(tableName);
250 table.SetSharedTableMark(true);
251 table.SetTableSyncType(tableSyncType);
252 table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(sharedTableName));
253 if (!table.GetTrackerTable().IsEmpty() && tableSyncType == TableSyncType::DEVICE_COOPERATION) { // LCOV_EXCL_BR_LINE
254 LOGE("current is trackerTable, not support creating device distributed table.");
255 return -E_NOT_SUPPORT;
256 }
257 bool isUpgraded = schema.GetTable(sharedTableName).GetTableName() == sharedTableName;
258 int errCode = CreateDistributedTable(handle, isUpgraded, "", table, schema);
259 if (errCode != E_OK) {
260 LOGE("create distributed table failed. %d", errCode);
261 return errCode;
262 }
263 std::lock_guard lock(schemaMutex_);
264 schema_ = schema;
265 return errCode;
266 }
267
CreateDistributedTable(const std::string & tableName,bool isUpgraded,const std::string & identity,RelationalSchemaObject & schema,TableSyncType tableSyncType)268 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName, bool isUpgraded,
269 const std::string &identity, RelationalSchemaObject &schema, TableSyncType tableSyncType)
270 {
271 LOGD("Create distributed table.");
272 int errCode = E_OK;
273 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
274 errCode));
275 if (handle == nullptr) {
276 return errCode;
277 }
278 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
279
280 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
281 if (errCode != E_OK) {
282 return errCode;
283 }
284
285 TableInfo table;
286 table.SetTableName(tableName);
287 table.SetTableSyncType(tableSyncType);
288 table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(tableName));
289 table.SetDistributedTable(schema.GetDistributedTable(tableName));
290 if (isUpgraded) {
291 table.SetSourceTableReference(schema.GetTable(tableName).GetTableReference());
292 }
293 errCode = CreateDistributedTable(handle, isUpgraded, identity, table, schema);
294 if (errCode != E_OK) {
295 LOGE("create distributed table failed. %d", errCode);
296 (void)handle->Rollback();
297 return errCode;
298 }
299 errCode = handle->Commit();
300 if (errCode == E_OK) {
301 SetSchema(schema);
302 }
303 return errCode;
304 }
305
CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isUpgraded,const std::string & identity,TableInfo & table,RelationalSchemaObject & schema)306 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
307 bool isUpgraded, const std::string &identity, TableInfo &table, RelationalSchemaObject &schema)
308 {
309 auto mode = GetRelationalProperties().GetDistributedTableMode();
310 TableSyncType tableSyncType = table.GetTableSyncType();
311 std::string tableName = table.GetTableName();
312 int errCode = handle->InitCursorToMeta(tableName);
313 if (errCode != E_OK) {
314 LOGE("init cursor to meta failed. %d", errCode);
315 return errCode;
316 }
317 errCode = handle->CreateDistributedTable(mode, isUpgraded, identity, table);
318 if (errCode != E_OK) {
319 LOGE("create distributed table failed. %d", errCode);
320 return errCode;
321 }
322
323 schema.SetTableMode(mode);
324 // update table if tableName changed
325 schema.RemoveRelationalTable(tableName);
326 schema.AddRelationalTable(table);
327 errCode = SaveSchemaToMetaTable(handle, schema);
328 if (errCode != E_OK) {
329 LOGE("Save schema to meta table for create distributed table failed. %d", errCode);
330 return errCode;
331 }
332
333 errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, tableName, tableSyncType);
334 if (errCode != E_OK) {
335 LOGE("Save sync table type or drop flag to meta table failed. %d", errCode);
336 }
337 return errCode;
338 }
339
UpgradeDistributedTable(const std::string & tableName,bool & schemaChanged,TableSyncType syncType)340 int SQLiteSingleRelationalStorageEngine::UpgradeDistributedTable(const std::string &tableName, bool &schemaChanged,
341 TableSyncType syncType)
342 {
343 LOGD("Upgrade distributed table.");
344 RelationalSchemaObject schema = GetSchema();
345 int errCode = E_OK;
346 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
347 errCode));
348 if (handle == nullptr) {
349 return errCode;
350 }
351
352 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
353 if (errCode != E_OK) {
354 ReleaseExecutor(handle);
355 return errCode;
356 }
357
358 auto mode = GetRelationalProperties().GetDistributedTableMode();
359 errCode = handle->UpgradeDistributedTable(tableName, mode, schemaChanged, schema, syncType);
360 if (errCode != E_OK) {
361 LOGE("Upgrade distributed table failed. %d", errCode);
362 (void)handle->Rollback();
363 ReleaseExecutor(handle);
364 return errCode;
365 }
366
367 errCode = SaveSchemaToMetaTable(handle, schema);
368 if (errCode != E_OK) {
369 LOGE("Save schema to meta table for upgrade distributed table failed. %d", errCode);
370 (void)handle->Rollback();
371 ReleaseExecutor(handle);
372 return errCode;
373 }
374
375 errCode = handle->Commit();
376 if (errCode == E_OK) {
377 SetSchema(schema);
378 }
379 ReleaseExecutor(handle);
380 return errCode;
381 }
382
CleanDistributedDeviceTable(std::vector<std::string> & missingTables)383 int SQLiteSingleRelationalStorageEngine::CleanDistributedDeviceTable(std::vector<std::string> &missingTables)
384 {
385 int errCode = E_OK;
386 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
387 errCode));
388 if (handle == nullptr) {
389 return errCode;
390 }
391
392 // go fast to check missing tables without transaction
393 errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
394 if (errCode == E_OK) {
395 if (missingTables.empty()) {
396 LOGI("Missing table is empty.");
397 ReleaseExecutor(handle);
398 return errCode;
399 }
400 } else {
401 LOGE("Get missing distributed table failed. %d", errCode);
402 ReleaseExecutor(handle);
403 return errCode;
404 }
405 missingTables.clear();
406
407 std::lock_guard lock(schemaMutex_);
408 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
409 if (errCode != E_OK) {
410 ReleaseExecutor(handle);
411 return errCode;
412 }
413
414 errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
415 if (errCode == E_OK) {
416 // Remove non-existent tables from the schema
417 for (const auto &tableName : missingTables) {
418 schema_.RemoveRelationalTable(tableName);
419 }
420 errCode = SaveSchemaToMetaTable(handle, schema_); // save schema to meta_data
421 if (errCode != E_OK) {
422 LOGE("Save schema to metaTable failed. %d", errCode);
423 (void)handle->Rollback();
424 } else {
425 errCode = handle->Commit();
426 }
427 } else {
428 LOGE("Check distributed table failed. %d", errCode);
429 (void)handle->Rollback();
430 }
431 ReleaseExecutor(handle);
432 return errCode;
433 }
434
GetProperties() const435 const RelationalDBProperties &SQLiteSingleRelationalStorageEngine::GetProperties() const
436 {
437 return properties_;
438 }
439
GetRelationalProperties() const440 const RelationalDBProperties SQLiteSingleRelationalStorageEngine::GetRelationalProperties() const
441 {
442 std::lock_guard lock(propertiesMutex_);
443 return properties_;
444 }
445
SetProperties(const RelationalDBProperties & properties)446 void SQLiteSingleRelationalStorageEngine::SetProperties(const RelationalDBProperties &properties)
447 {
448 std::lock_guard lock(propertiesMutex_);
449 properties_ = properties;
450 }
451
SetTrackerTable(const TrackerSchema & schema,const TableInfo & tableInfo,bool isFirstCreate)452 int SQLiteSingleRelationalStorageEngine::SetTrackerTable(const TrackerSchema &schema, const TableInfo &tableInfo,
453 bool isFirstCreate)
454 {
455 int errCode = E_OK;
456 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
457 errCode));
458 if (handle == nullptr) {
459 return errCode;
460 }
461 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
462
463 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
464 if (errCode != E_OK) {
465 return errCode;
466 }
467 RelationalSchemaObject tracker = GetTrackerSchema();
468 tracker.InsertTrackerSchema(schema);
469 int ret = handle->CreateTrackerTable(tracker.GetTrackerTable(schema.tableName), tableInfo, isFirstCreate);
470 if (ret != E_OK && ret != -E_WITH_INVENTORY_DATA) {
471 (void)handle->Rollback();
472 return ret;
473 }
474 errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, schema.tableName, tableInfo.GetTableSyncType());
475 if (errCode != E_OK) {
476 LOGE("[SetTrackerTable] Save sync type to meta table failed: %d", errCode);
477 (void)handle->Rollback();
478 return errCode;
479 }
480
481 if (schema.trackerColNames.empty() && !schema.isTrackAction) {
482 tracker.RemoveTrackerSchema(schema);
483 }
484 errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
485 if (errCode != E_OK) {
486 (void)handle->Rollback();
487 return errCode;
488 }
489
490 errCode = handle->Commit();
491 if (errCode != E_OK) {
492 return errCode;
493 }
494
495 SetTrackerSchema(tracker);
496 return ret;
497 }
498
CacheTrackerSchema(const TrackerSchema & schema)499 void SQLiteSingleRelationalStorageEngine::CacheTrackerSchema(const TrackerSchema &schema)
500 {
501 std::lock_guard lock(trackerSchemaMutex_);
502 trackerSchema_.InsertTrackerSchema(schema);
503 if (!schema.isTrackAction && schema.trackerColNames.empty()) {
504 // if isTrackAction be false and trackerColNames is empty, will remove the tracker schema.
505 trackerSchema_.RemoveTrackerSchema(schema);
506 }
507 }
508
GetOrInitTrackerSchemaFromMeta()509 int SQLiteSingleRelationalStorageEngine::GetOrInitTrackerSchemaFromMeta()
510 {
511 RelationalSchemaObject trackerSchema;
512 int errCode = E_OK;
513 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
514 errCode));
515 if (handle == nullptr) {
516 return errCode;
517 }
518 errCode = handle->GetOrInitTrackerSchemaFromMeta(trackerSchema);
519 if (errCode != E_OK) {
520 ReleaseExecutor(handle);
521 return errCode;
522 }
523 const TableInfoMap tableInfoMap = trackerSchema.GetTrackerTables();
524 for (const auto &iter: tableInfoMap) {
525 TableInfo tableInfo;
526 errCode = handle->AnalysisTrackerTable(iter.second.GetTrackerTable(), tableInfo);
527 if (errCode == -E_NOT_FOUND) { // LCOV_EXCL_BR_LINE
528 const std::string trackerTableName = iter.second.GetTrackerTable().GetTableName();
529 errCode = CleanTrackerDeviceTable({ trackerTableName }, trackerSchema, handle);
530 if (errCode != E_OK) {
531 LOGE("cancel tracker table failed during db opening. %d", errCode);
532 ReleaseExecutor(handle);
533 return errCode;
534 }
535 } else if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
536 LOGE("the tracker schema does not match the tracker schema. %d", errCode);
537 ReleaseExecutor(handle);
538 return errCode;
539 }
540 }
541 SetTrackerSchema(trackerSchema);
542 ReleaseExecutor(handle);
543 return E_OK;
544 }
545
SaveTrackerSchema(const std::string & tableName,bool isFirstCreate)546 int SQLiteSingleRelationalStorageEngine::SaveTrackerSchema(const std::string &tableName, bool isFirstCreate)
547 {
548 int errCode = E_OK;
549 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
550 errCode));
551 if (handle == nullptr) {
552 return errCode;
553 }
554 RelationalSchemaObject tracker = GetTrackerSchema();
555 errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
556 if (errCode != E_OK || !isFirstCreate) {
557 ReleaseExecutor(handle);
558 return errCode;
559 }
560 errCode = handle->CheckInventoryData(DBCommon::GetLogTableName(tableName));
561 ReleaseExecutor(handle);
562 return errCode;
563 }
564
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)565 int SQLiteSingleRelationalStorageEngine::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
566 {
567 int errCode = E_OK;
568 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(!condition.readOnly,
569 OperatePerm::NORMAL_PERM, errCode, true));
570 if (handle == nullptr) {
571 return errCode;
572 }
573 errCode = handle->ExecuteSql(condition, records);
574 ReleaseExecutor(handle, true);
575 return errCode;
576 }
577
CheckReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema)578 static int CheckReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
579 const RelationalSchemaObject &schema)
580 {
581 for (const auto &reference : tableReferenceProperty) {
582 TableInfo sourceTableInfo = schema.GetTable(reference.sourceTableName);
583 TableInfo targetTableInfo = schema.GetTable(reference.targetTableName);
584 if (strcasecmp(sourceTableInfo.GetTableName().c_str(), reference.sourceTableName.c_str()) != 0 ||
585 strcasecmp(targetTableInfo.GetTableName().c_str(), reference.targetTableName.c_str()) != 0) {
586 LOGE("can't set reference for table which doesn't create distributed table.");
587 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
588 }
589 if (sourceTableInfo.GetTableSyncType() != CLOUD_COOPERATION ||
590 targetTableInfo.GetTableSyncType() != CLOUD_COOPERATION) {
591 LOGE("can't set reference for table which doesn't create distributed table with cloud mode.");
592 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
593 }
594 if (sourceTableInfo.GetSharedTableMark() || targetTableInfo.GetSharedTableMark()) {
595 LOGE("can't set reference for shared table.");
596 return -E_NOT_SUPPORT;
597 }
598
599 FieldInfoMap sourceFieldMap = sourceTableInfo.GetFields();
600 FieldInfoMap targetFieldMap = targetTableInfo.GetFields();
601 for (const auto &[sourceFieldName, targetFieldName] : reference.columns) {
602 if (sourceFieldMap.find(sourceFieldName) == sourceFieldMap.end() ||
603 targetFieldMap.find(targetFieldName) == targetFieldMap.end()) {
604 LOGE("reference field doesn't exists in table.");
605 return -E_INVALID_ARGS;
606 }
607 }
608 }
609 return E_OK;
610 }
611
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,SQLiteSingleVerRelationalStorageExecutor * handle,std::set<std::string> & clearWaterMarkTables,RelationalSchemaObject & schema)612 int SQLiteSingleRelationalStorageEngine::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
613 SQLiteSingleVerRelationalStorageExecutor *handle, std::set<std::string> &clearWaterMarkTables,
614 RelationalSchemaObject &schema)
615 {
616 std::lock_guard lock(schemaMutex_);
617 schema = schema_;
618 int errCode = CheckReference(tableReferenceProperty, schema);
619 if (errCode != E_OK) {
620 LOGE("check reference failed, errCode = %d.", errCode);
621 return errCode;
622 }
623 int res = handle->GetClearWaterMarkTables(tableReferenceProperty, schema, clearWaterMarkTables);
624 if (res != E_OK && res != -E_TABLE_REFERENCE_CHANGED) {
625 return res;
626 }
627 schema.SetReferenceProperty(tableReferenceProperty);
628 errCode = SaveSchemaToMetaTable(handle, schema);
629 if (errCode != E_OK) {
630 LOGE("Save schema to meta table for reference failed. %d", errCode);
631 return errCode;
632 }
633 auto mode = GetRelationalProperties().GetDistributedTableMode();
634 for (auto &table : schema.GetTables()) {
635 if (table.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
636 continue;
637 }
638 TableInfo tableInfo = table.second;
639 errCode = handle->RenewTableTrigger(mode, tableInfo, TableSyncType::CLOUD_COOPERATION);
640 if (errCode != E_OK) {
641 LOGE("renew table trigger for reference failed. %d", errCode);
642 return errCode;
643 }
644 }
645 return res;
646 }
647
GetTrackerSchema() const648 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetTrackerSchema() const
649 {
650 std::lock_guard lock(trackerSchemaMutex_);
651 return trackerSchema_;
652 }
653
SetTrackerSchema(const RelationalSchemaObject & trackerSchema)654 void SQLiteSingleRelationalStorageEngine::SetTrackerSchema(const RelationalSchemaObject &trackerSchema)
655 {
656 std::lock_guard lock(trackerSchemaMutex_);
657 trackerSchema_ = trackerSchema;
658 }
659
CleanTrackerData(const std::string & tableName,int64_t cursor)660 int SQLiteSingleRelationalStorageEngine::CleanTrackerData(const std::string &tableName, int64_t cursor)
661 {
662 int errCode = E_OK;
663 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
664 errCode));
665 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
666 return errCode;
667 }
668 TrackerTable trackerTable = GetTrackerSchema().GetTrackerTable(tableName);
669 bool isOnlyTrackTable = false;
670 RelationalSchemaObject schema = GetSchema();
671 if (!trackerTable.IsTableNameEmpty() &&
672 !DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
673 isOnlyTrackTable = true;
674 }
675 errCode = handle->CleanTrackerData(tableName, cursor, isOnlyTrackTable);
676 ReleaseExecutor(handle);
677 return errCode;
678 }
679
UpgradeSharedTable(const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)680 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTable(const DataBaseSchema &cloudSchema,
681 const std::vector<std::string> &deleteTableNames, const std::map<std::string, std::vector<Field>> &updateTableNames,
682 const std::map<std::string, std::string> &alterTableNames)
683 {
684 int errCode = E_OK;
685 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
686 errCode));
687 if (handle == nullptr) {
688 return errCode;
689 }
690 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
691 if (errCode != E_OK) {
692 ReleaseExecutor(handle);
693 return errCode;
694 }
695 RelationalSchemaObject schema = GetSchema();
696 errCode = UpgradeSharedTableInner(handle, cloudSchema, deleteTableNames, updateTableNames, alterTableNames);
697 if (errCode != E_OK) {
698 handle->Rollback();
699 ReleaseExecutor(handle);
700 return errCode;
701 }
702 errCode = handle->Commit();
703 if (errCode != E_OK) {
704 std::lock_guard lock(schemaMutex_);
705 schema_ = schema; // revert schema to the initial state
706 }
707 ReleaseExecutor(handle);
708 return errCode;
709 }
710
UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)711 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
712 const DataBaseSchema &cloudSchema, const std::vector<std::string> &deleteTableNames,
713 const std::map<std::string, std::vector<Field>> &updateTableNames,
714 const std::map<std::string, std::string> &alterTableNames)
715 {
716 RelationalSchemaObject schema = GetSchema();
717 int errCode = DoDeleteSharedTable(handle, deleteTableNames, schema);
718 if (errCode != E_OK) {
719 LOGE("[RelationalStorageEngine] delete shared table or distributed table failed. %d", errCode);
720 return errCode;
721 }
722 errCode = DoUpdateSharedTable(handle, updateTableNames, cloudSchema, schema);
723 if (errCode != E_OK) {
724 LOGE("[RelationalStorageEngine] update shared table or distributed table failed. %d", errCode);
725 return errCode;
726 }
727 errCode = CheckIfExistUserTable(handle, cloudSchema, alterTableNames, schema);
728 if (errCode != E_OK) {
729 LOGE("[RelationalStorageEngine] check local user table failed. %d", errCode);
730 return errCode;
731 }
732 errCode = DoAlterSharedTableName(handle, alterTableNames, schema);
733 if (errCode != E_OK) {
734 LOGE("[RelationalStorageEngine] alter shared table or distributed table failed. %d", errCode);
735 return errCode;
736 }
737 errCode = DoCreateSharedTable(handle, cloudSchema, updateTableNames, alterTableNames, schema);
738 if (errCode != E_OK) {
739 LOGE("[RelationalStorageEngine] create shared table or distributed table failed. %d", errCode);
740 return errCode;
741 }
742 std::lock_guard lock(schemaMutex_);
743 schema_ = schema;
744 return E_OK;
745 }
746
DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::vector<std::string> & deleteTableNames,RelationalSchemaObject & schema)747 int SQLiteSingleRelationalStorageEngine::DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
748 const std::vector<std::string> &deleteTableNames, RelationalSchemaObject &schema)
749 {
750 if (deleteTableNames.empty()) {
751 return E_OK;
752 }
753 int errCode = handle->DeleteTable(deleteTableNames);
754 if (errCode != E_OK) {
755 LOGE("[RelationalStorageEngine] delete shared table failed. %d", errCode);
756 return errCode;
757 }
758 std::vector<Key> keys;
759 for (const auto &tableName : deleteTableNames) {
760 errCode = handle->CleanResourceForDroppedTable(tableName);
761 if (errCode != E_OK) {
762 LOGE("[RelationalStorageEngine] delete shared distributed table failed. %d", errCode);
763 return errCode;
764 }
765 Key sharedTableKey = DBCommon::GetPrefixTableName(tableName);
766 if (sharedTableKey.empty() || sharedTableKey.size() > DBConstant::MAX_KEY_SIZE) {
767 LOGE("[RelationalStorageEngine] shared table key is invalid.");
768 return -E_INVALID_ARGS;
769 }
770 keys.push_back(sharedTableKey);
771 schema.RemoveRelationalTable(tableName);
772 }
773 errCode = handle->DeleteMetaData(keys);
774 if (errCode != E_OK) {
775 LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
776 }
777 return errCode;
778 }
779
DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::vector<Field>> & updateTableNames,const DataBaseSchema & cloudSchema,RelationalSchemaObject & localSchema)780 int SQLiteSingleRelationalStorageEngine::DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
781 const std::map<std::string, std::vector<Field>> &updateTableNames, const DataBaseSchema &cloudSchema,
782 RelationalSchemaObject &localSchema)
783 {
784 if (updateTableNames.empty()) {
785 return E_OK;
786 }
787 int errCode = handle->UpdateSharedTable(updateTableNames);
788 if (errCode != E_OK) {
789 LOGE("[RelationalStorageEngine] update shared table failed. %d", errCode);
790 return errCode;
791 }
792 for (const auto &tableSchema : cloudSchema.tables) {
793 if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
794 errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
795 TableSyncType::CLOUD_COOPERATION, localSchema);
796 if (errCode != E_OK) {
797 LOGE("[RelationalStorageEngine] update shared distributed table failed. %d", errCode);
798 return errCode;
799 }
800 }
801 }
802 return E_OK;
803 }
804
DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)805 int SQLiteSingleRelationalStorageEngine::DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor *&handle,
806 const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
807 {
808 if (alterTableNames.empty()) {
809 return E_OK;
810 }
811 int errCode = handle->AlterTableName(alterTableNames);
812 if (errCode != E_OK) {
813 LOGE("[RelationalStorageEngine] alter shared table failed. %d", errCode);
814 return errCode;
815 }
816 std::map<std::string, std::string> distributedSharedTableNames;
817 for (const auto &tableName : alterTableNames) {
818 errCode = handle->DeleteTableTrigger(tableName.first);
819 if (errCode != E_OK) {
820 LOGE("[RelationalStorageEngine] delete shared table trigger failed. %d", errCode);
821 return errCode;
822 }
823 std::string oldDistributedName = DBCommon::GetLogTableName(tableName.first);
824 std::string newDistributedName = DBCommon::GetLogTableName(tableName.second);
825 distributedSharedTableNames[oldDistributedName] = newDistributedName;
826 }
827 errCode = handle->AlterTableName(distributedSharedTableNames);
828 if (errCode != E_OK) {
829 LOGE("[RelationalStorageEngine] alter distributed shared table failed. %d", errCode);
830 return errCode;
831 }
832 for (const auto &[oldTableName, newTableName] : alterTableNames) {
833 TableInfo tableInfo = schema.GetTable(oldTableName);
834 tableInfo.SetTableName(newTableName);
835 schema.AddRelationalTable(tableInfo);
836 schema.RemoveRelationalTable(oldTableName);
837 }
838 errCode = UpdateKvData(handle, alterTableNames);
839 if (errCode != E_OK) {
840 LOGE("[RelationalStorageEngine] update kv data failed. %d", errCode);
841 }
842 return errCode;
843 }
844
DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)845 int SQLiteSingleRelationalStorageEngine::DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
846 const DataBaseSchema &cloudSchema, const std::map<std::string, std::vector<Field>> &updateTableNames,
847 const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
848 {
849 for (auto const &tableSchema : cloudSchema.tables) {
850 if (tableSchema.sharedTableName.empty()) {
851 continue;
852 }
853 if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
854 continue;
855 }
856 bool isUpdated = false;
857 for (const auto &alterTableName : alterTableNames) {
858 if (alterTableName.second == tableSchema.sharedTableName) {
859 isUpdated = true;
860 break;
861 }
862 }
863 if (isUpdated) {
864 continue;
865 }
866 int errCode = handle->CreateSharedTable(tableSchema);
867 if (errCode != E_OK) {
868 return errCode;
869 }
870 errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
871 TableSyncType::CLOUD_COOPERATION, schema);
872 if (errCode != E_OK) {
873 return errCode;
874 }
875 }
876 return E_OK;
877 }
878
UpdateKvData(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames)879 int SQLiteSingleRelationalStorageEngine::UpdateKvData(SQLiteSingleVerRelationalStorageExecutor *&handle,
880 const std::map<std::string, std::string> &alterTableNames)
881 {
882 std::vector<Key> keys;
883 for (const auto &tableName : alterTableNames) {
884 Key oldKey = DBCommon::GetPrefixTableName(tableName.first);
885 Value value;
886 int ret = handle->GetKvData(oldKey, value);
887 if (ret == -E_NOT_FOUND) {
888 continue;
889 }
890 if (ret != E_OK) {
891 LOGE("[RelationalStorageEngine] get meta data failed. %d", ret);
892 return ret;
893 }
894 keys.push_back(oldKey);
895 Key newKey = DBCommon::GetPrefixTableName(tableName.second);
896 ret = handle->PutKvData(newKey, value);
897 if (ret != E_OK) {
898 LOGE("[RelationalStorageEngine] put meta data failed. %d", ret);
899 return ret;
900 }
901 }
902 int errCode = handle->DeleteMetaData(keys);
903 if (errCode != E_OK) {
904 LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
905 }
906 return errCode;
907 }
908
CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::string> & alterTableNames,const RelationalSchemaObject & schema)909 int SQLiteSingleRelationalStorageEngine::CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
910 const DataBaseSchema &cloudSchema, const std::map<std::string, std::string> &alterTableNames,
911 const RelationalSchemaObject &schema)
912 {
913 for (const auto &tableSchema : cloudSchema.tables) {
914 if (alterTableNames.find(tableSchema.sharedTableName) != alterTableNames.end()) {
915 continue;
916 }
917 TableInfo tableInfo = schema.GetTable(tableSchema.sharedTableName);
918 if (tableInfo.GetSharedTableMark()) {
919 continue;
920 }
921 int errCode = handle->CheckIfExistUserTable(tableSchema.sharedTableName);
922 if (errCode != E_OK) {
923 LOGE("[RelationalStorageEngine] local exists table. %d", errCode);
924 return errCode;
925 }
926 }
927 return E_OK;
928 }
929
CalTableRef(const std::vector<std::string> & tableNames,const std::map<std::string,std::string> & sharedTableOriginNames)930 std::pair<std::vector<std::string>, int> SQLiteSingleRelationalStorageEngine::CalTableRef(
931 const std::vector<std::string> &tableNames, const std::map<std::string, std::string> &sharedTableOriginNames)
932 {
933 std::pair<std::vector<std::string>, int> res = { tableNames, E_OK };
934 std::map<std::string, std::map<std::string, bool>> reachableReference;
935 std::map<std::string, int> tableWeight;
936 {
937 std::lock_guard lock(schemaMutex_);
938 reachableReference = schema_.GetReachableRef();
939 tableWeight = schema_.GetTableWeight();
940 }
941 if (reachableReference.empty()) {
942 return res;
943 }
944 auto reachableWithShared = GetReachableWithShared(reachableReference, sharedTableOriginNames);
945 // check dependency conflict
946 for (size_t i = 0; i < tableNames.size(); ++i) {
947 for (size_t j = i + 1; j < tableNames.size(); ++j) {
948 // such as table A B, if dependency is A->B
949 // sync should not be A->B, it should be B->A
950 // so if A can reach B, it's wrong
951 if (reachableWithShared[tableNames[i]][tableNames[j]]) {
952 LOGE("[RDBStorageEngine] table %zu reach table %zu", i, j);
953 res.second = -E_INVALID_ARGS;
954 return res;
955 }
956 }
957 }
958 tableWeight = GetTableWeightWithShared(tableWeight, sharedTableOriginNames);
959 auto actualTable = DBCommon::GenerateNodesByNodeWeight(tableNames, reachableWithShared, tableWeight);
960 res.first.assign(actualTable.begin(), actualTable.end());
961 return res;
962 }
963
CleanTrackerDeviceTable(const std::vector<std::string> & tableNames,RelationalSchemaObject & trackerSchemaObj,SQLiteSingleVerRelationalStorageExecutor * & handle)964 int SQLiteSingleRelationalStorageEngine::CleanTrackerDeviceTable(const std::vector<std::string> &tableNames,
965 RelationalSchemaObject &trackerSchemaObj, SQLiteSingleVerRelationalStorageExecutor *&handle)
966 {
967 std::vector<std::string> missingTrackerTables;
968 int errCode = handle->CheckAndCleanDistributedTable(tableNames, missingTrackerTables);
969 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
970 LOGE("Check tracker table failed. %d", errCode);
971 return errCode;
972 }
973 if (missingTrackerTables.empty()) { // LCOV_EXCL_BR_LINE
974 return E_OK;
975 }
976 for (const auto &tableName : missingTrackerTables) {
977 TrackerSchema schema;
978 schema.tableName = tableName;
979 trackerSchemaObj.RemoveTrackerSchema(schema);
980 }
981 errCode = SaveTrackerSchemaToMetaTable(handle, trackerSchemaObj); // save schema to meta_data
982 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
983 LOGE("Save tracker schema to metaTable failed. %d", errCode);
984 }
985 return errCode;
986 }
987
GenLogInfoForUpgrade(const std::string & tableName,RelationalSchemaObject & schema,bool schemaChanged)988 int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string &tableName,
989 RelationalSchemaObject &schema, bool schemaChanged)
990 {
991 int errCode = E_OK;
992 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
993 OperatePerm::NORMAL_PERM, errCode));
994 if (handle == nullptr) {
995 return errCode;
996 }
997 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
998
999 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1000 if (errCode != E_OK) {
1001 return errCode;
1002 }
1003
1004 TableInfo table = GetSchema().GetTable(tableName);
1005 errCode = handle->UpgradedLogForExistedData(table, schemaChanged);
1006 if (errCode != E_OK) {
1007 LOGE("Upgrade tracker table log failed. %d", errCode);
1008 (void)handle->Rollback();
1009 return errCode;
1010 }
1011 return handle->Commit();
1012 }
1013
GetReachableWithShared(const std::map<std::string,std::map<std::string,bool>> & reachableReference,const std::map<std::string,std::string> & tableToShared)1014 std::map<std::string, std::map<std::string, bool>> SQLiteSingleRelationalStorageEngine::GetReachableWithShared(
1015 const std::map<std::string, std::map<std::string, bool>> &reachableReference,
1016 const std::map<std::string, std::string> &tableToShared)
1017 {
1018 // we translate all origin table to shared table
1019 std::map<std::string, std::map<std::string, bool>> reachableWithShared;
1020 for (const auto &[source, reach] : reachableReference) {
1021 bool sourceHasNoShared = tableToShared.find(source) == tableToShared.end();
1022 for (const auto &[target, isReach] : reach) {
1023 // merge two reachable reference
1024 reachableWithShared[source][target] = isReach;
1025 if (sourceHasNoShared || tableToShared.find(target) == tableToShared.end()) {
1026 continue;
1027 }
1028 // record shared reachable reference
1029 reachableWithShared[tableToShared.at(source)][tableToShared.at(target)] = isReach;
1030 }
1031 }
1032 return reachableWithShared;
1033 }
1034
GetTableWeightWithShared(const std::map<std::string,int> & tableWeight,const std::map<std::string,std::string> & tableToShared)1035 std::map<std::string, int> SQLiteSingleRelationalStorageEngine::GetTableWeightWithShared(
1036 const std::map<std::string, int> &tableWeight, const std::map<std::string, std::string> &tableToShared)
1037 {
1038 std::map<std::string, int> res;
1039 for (const auto &[table, weight] : tableWeight) {
1040 res[table] = weight;
1041 if (tableToShared.find(table) == tableToShared.end()) {
1042 continue;
1043 }
1044 res[tableToShared.at(table)] = weight;
1045 }
1046 return res;
1047 }
1048
UpdateExtendField(const DistributedDB::TrackerSchema & schema)1049 int SQLiteSingleRelationalStorageEngine::UpdateExtendField(const DistributedDB::TrackerSchema &schema)
1050 {
1051 if (schema.extendColNames.empty()) {
1052 return E_OK;
1053 }
1054 int errCode = E_OK;
1055 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
1056 OperatePerm::NORMAL_PERM, errCode));
1057 if (handle == nullptr) {
1058 return errCode;
1059 }
1060 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
1061
1062 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1063 if (errCode != E_OK) {
1064 return errCode;
1065 }
1066
1067 errCode = handle->UpdateExtendField(schema.tableName, schema.extendColNames);
1068 if (errCode != E_OK) {
1069 LOGE("[%s [%zu]] Update extend field failed. %d",
1070 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1071 (void)handle->Rollback();
1072 return errCode;
1073 }
1074
1075 RelationalSchemaObject tracker = GetTrackerSchema();
1076 TrackerTable oldTrackerTable = tracker.GetTrackerTable(schema.tableName);
1077 const std::set<std::string>& oldExtendColNames = oldTrackerTable.GetExtendNames();
1078 const std::string lowVersionExtendColName = oldTrackerTable.GetExtendName();
1079 if (!oldExtendColNames.empty()) {
1080 errCode = handle->UpdateDeleteDataExtendField(schema.tableName, lowVersionExtendColName,
1081 oldExtendColNames, schema.extendColNames);
1082 if (errCode != E_OK) {
1083 LOGE("[%s [%zu]] Update extend field for delete data failed. %d",
1084 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1085 (void)handle->Rollback();
1086 return errCode;
1087 }
1088 }
1089
1090 // Try clear historical mismatched log, which usually do not occur and apply to tracker table only.
1091 if (GetSchema().GetTable(schema.tableName).Empty()) {
1092 handle->ClearLogOfMismatchedData(schema.tableName);
1093 }
1094 return handle->Commit();
1095 }
1096
SetDistributedSchema(const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade)1097 std::pair<int, bool> SQLiteSingleRelationalStorageEngine::SetDistributedSchema(const DistributedSchema &schema,
1098 const std::string &localIdentity, bool isForceUpgrade)
1099 {
1100 std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
1101 auto schemaObj = GetSchema();
1102 std::pair<int, bool> res = {E_OK, schemaObj.CheckDistributedSchemaChange(schema)};
1103 auto &[errCode, isSchemaChange] = res;
1104 if (GetRelationalProperties().GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
1105 LOGE("tableMode SPLIT_BY_DEVICE not support set distributed schema");
1106 errCode = -E_NOT_SUPPORT;
1107 return res;
1108 }
1109 if (!isSchemaChange) {
1110 return res;
1111 }
1112 auto localSchema = schemaObj.GetDistributedSchema();
1113 if (localSchema.version != 0 && localSchema.version >= schema.version) {
1114 LOGE("new schema version no upgrade old:%" PRIu32 " new:%" PRIu32, localSchema.version, schema.version);
1115 errCode = -E_INVALID_ARGS;
1116 } else {
1117 errCode = SetDistributedSchemaInner(schemaObj, schema, localIdentity, isForceUpgrade);
1118 }
1119 if (errCode == E_OK) {
1120 SetSchema(schemaObj);
1121 }
1122 return res;
1123 }
1124
SetDistributedSchemaInner(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade)1125 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInner(RelationalSchemaObject &schemaObj,
1126 const DistributedSchema &schema, const std::string &localIdentity, bool isForceUpgrade)
1127 {
1128 int errCode = E_OK;
1129 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
1130 errCode));
1131 if (handle == nullptr) {
1132 return errCode;
1133 }
1134 ResFinalizer resFinalizer([this, handle]() {
1135 auto rdbHandle = handle;
1136 ReleaseExecutor(rdbHandle);
1137 });
1138
1139 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1140 if (errCode != E_OK) {
1141 return errCode;
1142 }
1143 errCode = SetDistributedSchemaInTraction(schemaObj, schema, localIdentity, isForceUpgrade, *handle);
1144 if (errCode != E_OK) {
1145 (void)handle->Rollback();
1146 return errCode;
1147 }
1148 return handle->Commit();
1149 }
1150
SetDistributedSchemaInTraction(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,const std::string & localIdentity,bool isForceUpgrade,SQLiteSingleVerRelationalStorageExecutor & handle)1151 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInTraction(RelationalSchemaObject &schemaObj,
1152 const DistributedSchema &schema, const std::string &localIdentity, bool isForceUpgrade,
1153 SQLiteSingleVerRelationalStorageExecutor &handle)
1154 {
1155 int errCode = SQLiteRelationalUtils::CheckDistributedSchemaValid(schemaObj, schema, isForceUpgrade, &handle);
1156 if (errCode != E_OK) {
1157 return errCode;
1158 }
1159 auto changeStatus = schemaObj.GetTableChangeStatus(schema);
1160 schemaObj.SetDistributedSchema(schema);
1161 for (const auto &table : schema.tables) {
1162 if (!changeStatus[table.tableName]) {
1163 continue;
1164 }
1165 TableInfo tableInfo = schemaObj.GetTable(table.tableName);
1166 tableInfo.SetTrackerTable(GetTrackerSchema().GetTrackerTable(table.tableName));
1167 if (tableInfo.Empty()) {
1168 continue;
1169 }
1170 tableInfo.SetDistributedTable(schemaObj.GetDistributedTable(table.tableName));
1171 errCode = handle.RenewTableTrigger(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType(),
1172 localIdentity);
1173 if (errCode != E_OK) {
1174 LOGE("Failed to refresh %s:%zu trigger while setting up distributed schema: %d",
1175 DBCommon::StringMiddleMasking(table.tableName).c_str(), table.tableName.size(), errCode);
1176 return errCode;
1177 }
1178 errCode = handle.UpdateHashKey(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType(),
1179 localIdentity);
1180 if (errCode != E_OK) {
1181 LOGE("Failed to update %s:%zu hash_key while setting up distributed schema: %d",
1182 DBCommon::StringMiddleMasking(table.tableName).c_str(), table.tableName.size(), errCode);
1183 return errCode;
1184 }
1185 }
1186 errCode = SaveSchemaToMetaTable(&handle, schemaObj);
1187 if (errCode != E_OK) {
1188 LOGE("Save schema to meta table for set distributed schema failed. %d", errCode);
1189 }
1190 return errCode;
1191 }
1192 }
1193 #endif