1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_relational_storage_engine.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "res_finalizer.h"
21 #include "sqlite_relational_database_upgrader.h"
22 #include "sqlite_single_ver_relational_storage_executor.h"
23 #include "sqlite_relational_utils.h"
24
25
26 namespace DistributedDB {
SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)27 SQLiteSingleRelationalStorageEngine::SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties)
28 : properties_(properties)
29 {}
30
~SQLiteSingleRelationalStorageEngine()31 SQLiteSingleRelationalStorageEngine::~SQLiteSingleRelationalStorageEngine()
32 {}
33
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)34 StorageExecutor *SQLiteSingleRelationalStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite,
35 bool isMemDb)
36 {
37 auto mode = properties_.GetDistributedTableMode();
38 return new (std::nothrow) SQLiteSingleVerRelationalStorageExecutor(dbHandle, isWrite, mode);
39 }
40
Upgrade(sqlite3 * db)41 int SQLiteSingleRelationalStorageEngine::Upgrade(sqlite3 *db)
42 {
43 int errCode = CreateRelationalMetaTable(db);
44 if (errCode != E_OK) {
45 LOGE("Create relational store meta table failed. err=%d", errCode);
46 return errCode;
47 }
48 LOGD("[RelationalEngine][Upgrade] upgrade relational store.");
49 auto upgrader = std::make_unique<SqliteRelationalDatabaseUpgrader>(db);
50 return upgrader->Upgrade();
51 }
52
RegisterFunction(sqlite3 * db) const53 int SQLiteSingleRelationalStorageEngine::RegisterFunction(sqlite3 *db) const
54 {
55 int errCode = SQLiteUtils::RegisterCalcHash(db);
56 if (errCode != E_OK) {
57 LOGE("[engine] register calculate hash failed!");
58 return errCode;
59 }
60
61 errCode = SQLiteUtils::RegisterGetLastTime(db);
62 if (errCode != E_OK) {
63 LOGE("[engine] register get last time failed!");
64 return errCode;
65 }
66
67 errCode = SQLiteUtils::RegisterGetSysTime(db);
68 if (errCode != E_OK) {
69 LOGE("[engine] register get sys time failed!");
70 return errCode;
71 }
72
73 errCode = SQLiteUtils::RegisterGetRawSysTime(db);
74 if (errCode != E_OK) {
75 LOGE("[engine] register get raw sys time failed!");
76 return errCode;
77 }
78
79 errCode = SQLiteUtils::RegisterCloudDataChangeObserver(db);
80 if (errCode != E_OK) {
81 LOGE("[engine] register cloud observer failed!");
82 }
83
84 errCode = SQLiteUtils::RegisterCloudDataChangeServerObserver(db);
85 if (errCode != E_OK) {
86 LOGE("[engine] register cloud server observer failed!");
87 }
88
89 return errCode;
90 }
91
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)92 int SQLiteSingleRelationalStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
93 {
94 sqlite3 *db = nullptr;
95 int errCode = SQLiteUtils::OpenDatabase(option_, db, false);
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 do {
100 errCode = Upgrade(db); // create meta_data table.
101 if (errCode != E_OK) {
102 break;
103 }
104
105 errCode = RegisterFunction(db);
106 if (errCode != E_OK) {
107 break;
108 }
109
110 handle = NewSQLiteStorageExecutor(db, isWrite, false);
111 if (handle == nullptr) {
112 LOGE("[Relational] New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
113 errCode = -E_OUT_OF_MEMORY;
114 break;
115 }
116 return E_OK;
117 } while (false);
118
119 (void)sqlite3_close_v2(db);
120 db = nullptr;
121 return errCode;
122 }
123
ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isExternal)124 void SQLiteSingleRelationalStorageEngine::ReleaseExecutor(SQLiteSingleVerRelationalStorageExecutor *&handle,
125 bool isExternal)
126 {
127 if (handle == nullptr) {
128 return;
129 }
130 StorageExecutor *databaseHandle = handle;
131 Recycle(databaseHandle, isExternal);
132 handle = nullptr;
133 }
134
SetSchema(const RelationalSchemaObject & schema)135 void SQLiteSingleRelationalStorageEngine::SetSchema(const RelationalSchemaObject &schema)
136 {
137 std::lock_guard lock(schemaMutex_);
138 schema_ = schema;
139 }
140
GetSchema() const141 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetSchema() const
142 {
143 std::lock_guard lock(schemaMutex_);
144 return schema_;
145 }
146
147 namespace {
148 const std::string DEVICE_TYPE = "device";
149 const std::string CLOUD_TYPE = "cloud";
150 const std::string SYNC_TABLE_TYPE = "sync_table_type_";
151
SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)152 int SaveSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle, const RelationalSchemaObject &schema)
153 {
154 const Key schemaKey(DBConstant::RELATIONAL_SCHEMA_KEY.begin(), DBConstant::RELATIONAL_SCHEMA_KEY.end());
155 Value schemaVal;
156 DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
157 int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
158 if (errCode != E_OK) {
159 LOGE("Save schema to meta table failed. %d", errCode);
160 }
161 return errCode;
162 }
163
SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor * handle,const RelationalSchemaObject & schema)164 int SaveTrackerSchemaToMetaTable(SQLiteSingleVerRelationalStorageExecutor *handle,
165 const RelationalSchemaObject &schema)
166 {
167 const Key schemaKey(DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.begin(),
168 DBConstant::RELATIONAL_TRACKER_SCHEMA_KEY.end());
169 Value schemaVal;
170 DBCommon::StringToVector(schema.ToSchemaString(), schemaVal);
171 int errCode = handle->PutKvData(schemaKey, schemaVal); // save schema to meta_data
172 if (errCode != E_OK) {
173 LOGE("Save schema to meta table failed. %d", errCode);
174 }
175 return errCode;
176 }
177
SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor * handle,const std::string & tableName,TableSyncType syncType)178 int SaveSyncTableTypeAndDropFlagToMeta(SQLiteSingleVerRelationalStorageExecutor *handle, const std::string &tableName,
179 TableSyncType syncType)
180 {
181 Key key;
182 DBCommon::StringToVector(SYNC_TABLE_TYPE + tableName, key);
183 Value value;
184 DBCommon::StringToVector(syncType == DEVICE_COOPERATION ? DEVICE_TYPE : CLOUD_TYPE, value);
185 int errCode = handle->PutKvData(key, value);
186 if (errCode != E_OK) {
187 LOGE("Save sync table type to meta table failed. %d", errCode);
188 return errCode;
189 }
190 DBCommon::StringToVector(DBConstant::TABLE_IS_DROPPED + tableName, key);
191 errCode = handle->DeleteMetaData({ key });
192 if (errCode != E_OK) {
193 LOGE("Save table drop flag to meta table failed. %d", errCode);
194 }
195 return errCode;
196 }
197 }
198
CreateDistributedTable(const std::string & tableName,const std::string & identity,bool & schemaChanged,TableSyncType syncType,bool trackerSchemaChanged)199 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName,
200 const std::string &identity, bool &schemaChanged, TableSyncType syncType, bool trackerSchemaChanged)
201 {
202 std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
203 RelationalSchemaObject schema = GetSchema();
204 bool isUpgraded = false;
205 if (DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
206 LOGI("distributed table bas been created.");
207 if (schema.GetTable(tableName).GetTableSyncType() != syncType) {
208 LOGE("table sync type mismatch.");
209 return -E_TYPE_MISMATCH;
210 }
211 isUpgraded = true;
212 int errCode = UpgradeDistributedTable(tableName, schemaChanged, syncType);
213 if (errCode != E_OK) {
214 LOGE("Upgrade distributed table failed. %d", errCode);
215 return errCode;
216 }
217 // Triggers may need to be rebuilt, no return directly
218 } else if (schema.GetTables().size() >= DBConstant::MAX_DISTRIBUTED_TABLE_COUNT) {
219 LOGE("The number of distributed tables is exceeds limit.");
220 return -E_MAX_LIMITS;
221 } else {
222 schemaChanged = true;
223 }
224
225 int errCode = CreateDistributedTable(tableName, isUpgraded, identity, schema, syncType);
226 if (errCode != E_OK) {
227 LOGE("CreateDistributedTable failed. %d", errCode);
228 return errCode;
229 }
230 if (isUpgraded && (schemaChanged || trackerSchemaChanged)) {
231 // Used for upgrading the stock data of the trackerTable
232 errCode = GenLogInfoForUpgrade(tableName, schema, schemaChanged);
233 }
234 return errCode;
235 }
236
CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::string & tableName,const std::string & sharedTableName,TableSyncType tableSyncType,RelationalSchemaObject & schema)237 int SQLiteSingleRelationalStorageEngine::CreateDistributedSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
238 const std::string &tableName, const std::string &sharedTableName, TableSyncType tableSyncType,
239 RelationalSchemaObject &schema)
240 {
241 TableInfo table;
242 table.SetTableName(sharedTableName);
243 table.SetOriginTableName(tableName);
244 table.SetSharedTableMark(true);
245 table.SetTableSyncType(tableSyncType);
246 table.SetTrackerTable(trackerSchema_.GetTrackerTable(sharedTableName));
247 if (!table.GetTrackerTable().IsEmpty() && tableSyncType == TableSyncType::DEVICE_COOPERATION) { // LCOV_EXCL_BR_LINE
248 LOGE("current is trackerTable, not support creating device distributed table.");
249 return -E_NOT_SUPPORT;
250 }
251 bool isUpgraded = schema.GetTable(sharedTableName).GetTableName() == sharedTableName;
252 int errCode = CreateDistributedTable(handle, isUpgraded, "", table, schema);
253 if (errCode != E_OK) {
254 LOGE("create distributed table failed. %d", errCode);
255 return errCode;
256 }
257 std::lock_guard lock(schemaMutex_);
258 schema_ = schema;
259 return errCode;
260 }
261
CreateDistributedTable(const std::string & tableName,bool isUpgraded,const std::string & identity,RelationalSchemaObject & schema,TableSyncType tableSyncType)262 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::string &tableName, bool isUpgraded,
263 const std::string &identity, RelationalSchemaObject &schema, TableSyncType tableSyncType)
264 {
265 LOGD("Create distributed table.");
266 int errCode = E_OK;
267 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
268 errCode));
269 if (handle == nullptr) {
270 return errCode;
271 }
272 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
273
274 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
275 if (errCode != E_OK) {
276 return errCode;
277 }
278
279 TableInfo table;
280 table.SetTableName(tableName);
281 table.SetTableSyncType(tableSyncType);
282 table.SetTrackerTable(GetTrackerSchema().GetTrackerTable(tableName));
283 table.SetDistributedTable(schema.GetDistributedTable(tableName));
284 if (isUpgraded) {
285 table.SetSourceTableReference(schema.GetTable(tableName).GetTableReference());
286 }
287 errCode = CreateDistributedTable(handle, isUpgraded, identity, table, schema);
288 if (errCode != E_OK) {
289 LOGE("create distributed table failed. %d", errCode);
290 (void)handle->Rollback();
291 return errCode;
292 }
293 errCode = handle->Commit();
294 if (errCode == E_OK) {
295 SetSchema(schema);
296 }
297 return errCode;
298 }
299
CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,bool isUpgraded,const std::string & identity,TableInfo & table,RelationalSchemaObject & schema)300 int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
301 bool isUpgraded, const std::string &identity, TableInfo &table, RelationalSchemaObject &schema)
302 {
303 auto mode = properties_.GetDistributedTableMode();
304 TableSyncType tableSyncType = table.GetTableSyncType();
305 std::string tableName = table.GetTableName();
306 int errCode = handle->InitCursorToMeta(tableName);
307 if (errCode != E_OK) {
308 LOGE("init cursor to meta failed. %d", errCode);
309 return errCode;
310 }
311 errCode = handle->CreateDistributedTable(mode, isUpgraded, identity, table);
312 if (errCode != E_OK) {
313 LOGE("create distributed table failed. %d", errCode);
314 return errCode;
315 }
316
317 schema.SetTableMode(mode);
318 // update table if tableName changed
319 schema.RemoveRelationalTable(tableName);
320 schema.AddRelationalTable(table);
321 errCode = SaveSchemaToMetaTable(handle, schema);
322 if (errCode != E_OK) {
323 LOGE("Save schema to meta table for create distributed table failed. %d", errCode);
324 return errCode;
325 }
326
327 errCode = SaveSyncTableTypeAndDropFlagToMeta(handle, tableName, tableSyncType);
328 if (errCode != E_OK) {
329 LOGE("Save sync table type or drop flag to meta table failed. %d", errCode);
330 }
331 return errCode;
332 }
333
UpgradeDistributedTable(const std::string & tableName,bool & schemaChanged,TableSyncType syncType)334 int SQLiteSingleRelationalStorageEngine::UpgradeDistributedTable(const std::string &tableName, bool &schemaChanged,
335 TableSyncType syncType)
336 {
337 LOGD("Upgrade distributed table.");
338 RelationalSchemaObject schema = GetSchema();
339 int errCode = E_OK;
340 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
341 errCode));
342 if (handle == nullptr) {
343 return errCode;
344 }
345
346 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
347 if (errCode != E_OK) {
348 ReleaseExecutor(handle);
349 return errCode;
350 }
351
352 auto mode = properties_.GetDistributedTableMode();
353 errCode = handle->UpgradeDistributedTable(tableName, mode, schemaChanged, schema, syncType);
354 if (errCode != E_OK) {
355 LOGE("Upgrade distributed table failed. %d", errCode);
356 (void)handle->Rollback();
357 ReleaseExecutor(handle);
358 return errCode;
359 }
360
361 errCode = SaveSchemaToMetaTable(handle, schema);
362 if (errCode != E_OK) {
363 LOGE("Save schema to meta table for upgrade distributed table failed. %d", errCode);
364 (void)handle->Rollback();
365 ReleaseExecutor(handle);
366 return errCode;
367 }
368
369 errCode = handle->Commit();
370 if (errCode == E_OK) {
371 SetSchema(schema);
372 }
373 ReleaseExecutor(handle);
374 return errCode;
375 }
376
CleanDistributedDeviceTable(std::vector<std::string> & missingTables)377 int SQLiteSingleRelationalStorageEngine::CleanDistributedDeviceTable(std::vector<std::string> &missingTables)
378 {
379 int errCode = E_OK;
380 auto handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
381 errCode));
382 if (handle == nullptr) {
383 return errCode;
384 }
385
386 // go fast to check missing tables without transaction
387 errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
388 if (errCode == E_OK) {
389 if (missingTables.empty()) {
390 LOGI("Check missing distributed table is empty.");
391 ReleaseExecutor(handle);
392 return errCode;
393 }
394 } else {
395 LOGE("Get missing distributed table failed. %d", errCode);
396 ReleaseExecutor(handle);
397 return errCode;
398 }
399 missingTables.clear();
400
401 std::lock_guard lock(schemaMutex_);
402 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
403 if (errCode != E_OK) {
404 ReleaseExecutor(handle);
405 return errCode;
406 }
407
408 errCode = handle->CheckAndCleanDistributedTable(schema_.GetTableNames(), missingTables);
409 if (errCode == E_OK) {
410 // Remove non-existent tables from the schema
411 for (const auto &tableName : missingTables) {
412 schema_.RemoveRelationalTable(tableName);
413 }
414 errCode = SaveSchemaToMetaTable(handle, schema_); // save schema to meta_data
415 if (errCode != E_OK) {
416 LOGE("Save schema to metaTable failed. %d", errCode);
417 (void)handle->Rollback();
418 } else {
419 errCode = handle->Commit();
420 }
421 } else {
422 LOGE("Check distributed table failed. %d", errCode);
423 (void)handle->Rollback();
424 }
425 ReleaseExecutor(handle);
426 return errCode;
427 }
428
GetProperties() const429 const RelationalDBProperties &SQLiteSingleRelationalStorageEngine::GetProperties() const
430 {
431 return properties_;
432 }
433
SetProperties(const RelationalDBProperties & properties)434 void SQLiteSingleRelationalStorageEngine::SetProperties(const RelationalDBProperties &properties)
435 {
436 properties_ = properties;
437 }
438
CreateRelationalMetaTable(sqlite3 * db)439 int SQLiteSingleRelationalStorageEngine::CreateRelationalMetaTable(sqlite3 *db)
440 {
441 std::string sql =
442 "CREATE TABLE IF NOT EXISTS " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata(" \
443 "key BLOB PRIMARY KEY NOT NULL," \
444 "value BLOB);";
445 int errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
446 if (errCode != E_OK) {
447 LOGE("[SQLite] execute create table sql failed, err=%d", errCode);
448 }
449 return errCode;
450 }
451
SetTrackerTable(const TrackerSchema & schema,const TableInfo & tableInfo,bool isFirstCreate)452 int SQLiteSingleRelationalStorageEngine::SetTrackerTable(const TrackerSchema &schema, const TableInfo &tableInfo,
453 bool isFirstCreate)
454 {
455 int errCode = E_OK;
456 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
457 errCode));
458 if (handle == nullptr) {
459 return errCode;
460 }
461
462 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
463 if (errCode != E_OK) {
464 ReleaseExecutor(handle);
465 return errCode;
466 }
467 RelationalSchemaObject tracker = trackerSchema_;
468 tracker.InsertTrackerSchema(schema);
469 int ret = handle->CreateTrackerTable(tracker.GetTrackerTable(schema.tableName), tableInfo, isFirstCreate);
470 if (ret != E_OK && ret != -E_WITH_INVENTORY_DATA) {
471 (void)handle->Rollback();
472 ReleaseExecutor(handle);
473 return ret;
474 }
475
476 if (schema.trackerColNames.empty() && !schema.isTrackAction) {
477 tracker.RemoveTrackerSchema(schema);
478 }
479 errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
480 if (errCode != E_OK) {
481 (void)handle->Rollback();
482 ReleaseExecutor(handle);
483 return errCode;
484 }
485
486 errCode = handle->Commit();
487 if (errCode != E_OK) {
488 ReleaseExecutor(handle);
489 return errCode;
490 }
491
492 trackerSchema_ = tracker;
493 ReleaseExecutor(handle);
494 return ret;
495 }
496
CacheTrackerSchema(const TrackerSchema & schema)497 void SQLiteSingleRelationalStorageEngine::CacheTrackerSchema(const TrackerSchema &schema)
498 {
499 trackerSchema_.InsertTrackerSchema(schema);
500 if (!schema.isTrackAction && schema.trackerColNames.empty()) {
501 // if isTrackAction be false and trackerColNames is empty, will remove the tracker schema.
502 trackerSchema_.RemoveTrackerSchema(schema);
503 }
504 }
505
GetOrInitTrackerSchemaFromMeta()506 int SQLiteSingleRelationalStorageEngine::GetOrInitTrackerSchemaFromMeta()
507 {
508 RelationalSchemaObject trackerSchema;
509 int errCode = E_OK;
510 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
511 errCode));
512 if (handle == nullptr) {
513 return errCode;
514 }
515 errCode = handle->GetOrInitTrackerSchemaFromMeta(trackerSchema);
516 if (errCode != E_OK) {
517 ReleaseExecutor(handle);
518 return errCode;
519 }
520 const TableInfoMap tableInfoMap = trackerSchema.GetTrackerTables();
521 for (const auto &iter: tableInfoMap) {
522 TableInfo tableInfo;
523 errCode = handle->AnalysisTrackerTable(iter.second.GetTrackerTable(), tableInfo);
524 if (errCode == -E_NOT_FOUND) { // LCOV_EXCL_BR_LINE
525 const std::string trackerTableName = iter.second.GetTrackerTable().GetTableName();
526 errCode = CleanTrackerDeviceTable({ trackerTableName }, trackerSchema, handle);
527 if (errCode != E_OK) {
528 LOGE("cancel tracker table failed during db opening. %d", errCode);
529 ReleaseExecutor(handle);
530 return errCode;
531 }
532 } else if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
533 LOGE("the tracker schema does not match the tracker schema. %d", errCode);
534 ReleaseExecutor(handle);
535 return errCode;
536 }
537 }
538 trackerSchema_ = trackerSchema;
539 ReleaseExecutor(handle);
540 return E_OK;
541 }
542
SaveTrackerSchema(const std::string & tableName,bool isFirstCreate)543 int SQLiteSingleRelationalStorageEngine::SaveTrackerSchema(const std::string &tableName, bool isFirstCreate)
544 {
545 int errCode = E_OK;
546 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
547 errCode));
548 if (handle == nullptr) {
549 return errCode;
550 }
551 RelationalSchemaObject tracker = trackerSchema_;
552 errCode = SaveTrackerSchemaToMetaTable(handle, tracker);
553 if (errCode != E_OK || !isFirstCreate) {
554 ReleaseExecutor(handle);
555 return errCode;
556 }
557 errCode = handle->CheckInventoryData(DBCommon::GetLogTableName(tableName));
558 ReleaseExecutor(handle);
559 return errCode;
560 }
561
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)562 int SQLiteSingleRelationalStorageEngine::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
563 {
564 int errCode = E_OK;
565 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(!condition.readOnly,
566 OperatePerm::NORMAL_PERM, errCode, true));
567 if (handle == nullptr) {
568 return errCode;
569 }
570 errCode = handle->ExecuteSql(condition, records);
571 ReleaseExecutor(handle, true);
572 return errCode;
573 }
574
CheckReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,const RelationalSchemaObject & schema)575 static int CheckReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
576 const RelationalSchemaObject &schema)
577 {
578 for (const auto &reference : tableReferenceProperty) {
579 TableInfo sourceTableInfo = schema.GetTable(reference.sourceTableName);
580 TableInfo targetTableInfo = schema.GetTable(reference.targetTableName);
581 if (strcasecmp(sourceTableInfo.GetTableName().c_str(), reference.sourceTableName.c_str()) != 0 ||
582 strcasecmp(targetTableInfo.GetTableName().c_str(), reference.targetTableName.c_str()) != 0) {
583 LOGE("can't set reference for table which doesn't create distributed table.");
584 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
585 }
586 if (sourceTableInfo.GetTableSyncType() != CLOUD_COOPERATION ||
587 targetTableInfo.GetTableSyncType() != CLOUD_COOPERATION) {
588 LOGE("can't set reference for table which doesn't create distributed table with cloud mode.");
589 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
590 }
591 if (sourceTableInfo.GetSharedTableMark() || targetTableInfo.GetSharedTableMark()) {
592 LOGE("can't set reference for shared table.");
593 return -E_NOT_SUPPORT;
594 }
595
596 FieldInfoMap sourceFieldMap = sourceTableInfo.GetFields();
597 FieldInfoMap targetFieldMap = targetTableInfo.GetFields();
598 for (const auto &[sourceFieldName, targetFieldName] : reference.columns) {
599 if (sourceFieldMap.find(sourceFieldName) == sourceFieldMap.end() ||
600 targetFieldMap.find(targetFieldName) == targetFieldMap.end()) {
601 LOGE("reference field doesn't exists in table.");
602 return -E_INVALID_ARGS;
603 }
604 }
605 }
606 return E_OK;
607 }
608
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty,SQLiteSingleVerRelationalStorageExecutor * handle,std::set<std::string> & clearWaterMarkTables,RelationalSchemaObject & schema)609 int SQLiteSingleRelationalStorageEngine::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty,
610 SQLiteSingleVerRelationalStorageExecutor *handle, std::set<std::string> &clearWaterMarkTables,
611 RelationalSchemaObject &schema)
612 {
613 std::lock_guard lock(schemaMutex_);
614 schema = schema_;
615 int errCode = CheckReference(tableReferenceProperty, schema);
616 if (errCode != E_OK) {
617 LOGE("check reference failed, errCode = %d.", errCode);
618 return errCode;
619 }
620
621 errCode = handle->GetClearWaterMarkTables(tableReferenceProperty, schema, clearWaterMarkTables);
622 if (errCode != E_OK) {
623 return errCode;
624 }
625 schema.SetReferenceProperty(tableReferenceProperty);
626 errCode = SaveSchemaToMetaTable(handle, schema);
627 if (errCode != E_OK) {
628 LOGE("Save schema to meta table for reference failed. %d", errCode);
629 return errCode;
630 }
631 auto mode = properties_.GetDistributedTableMode();
632 for (auto &table : schema.GetTables()) {
633 if (table.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
634 continue;
635 }
636 TableInfo tableInfo = table.second;
637 errCode = handle->RenewTableTrigger(mode, tableInfo, TableSyncType::CLOUD_COOPERATION);
638 if (errCode != E_OK) {
639 LOGE("renew table trigger for reference failed. %d", errCode);
640 return errCode;
641 }
642 }
643 return clearWaterMarkTables.empty() ? E_OK : -E_TABLE_REFERENCE_CHANGED;
644 }
645
GetTrackerSchema() const646 RelationalSchemaObject SQLiteSingleRelationalStorageEngine::GetTrackerSchema() const
647 {
648 return trackerSchema_;
649 }
650
CleanTrackerData(const std::string & tableName,int64_t cursor)651 int SQLiteSingleRelationalStorageEngine::CleanTrackerData(const std::string &tableName, int64_t cursor)
652 {
653 int errCode = E_OK;
654 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
655 errCode));
656 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
657 return errCode;
658 }
659 TrackerTable trackerTable = GetTrackerSchema().GetTrackerTable(tableName);
660 bool isOnlyTrackTable = false;
661 RelationalSchemaObject schema = GetSchema();
662 if (!trackerTable.IsTableNameEmpty() &&
663 !DBCommon::CaseInsensitiveCompare(schema.GetTable(tableName).GetTableName(), tableName)) {
664 isOnlyTrackTable = true;
665 }
666 errCode = handle->CleanTrackerData(tableName, cursor, isOnlyTrackTable);
667 ReleaseExecutor(handle);
668 return errCode;
669 }
670
UpgradeSharedTable(const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)671 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTable(const DataBaseSchema &cloudSchema,
672 const std::vector<std::string> &deleteTableNames, const std::map<std::string, std::vector<Field>> &updateTableNames,
673 const std::map<std::string, std::string> &alterTableNames)
674 {
675 int errCode = E_OK;
676 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
677 errCode));
678 if (handle == nullptr) {
679 return errCode;
680 }
681 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
682 if (errCode != E_OK) {
683 ReleaseExecutor(handle);
684 return errCode;
685 }
686 RelationalSchemaObject schema = GetSchema();
687 errCode = UpgradeSharedTableInner(handle, cloudSchema, deleteTableNames, updateTableNames, alterTableNames);
688 if (errCode != E_OK) {
689 handle->Rollback();
690 ReleaseExecutor(handle);
691 return errCode;
692 }
693 errCode = handle->Commit();
694 if (errCode != E_OK) {
695 std::lock_guard lock(schemaMutex_);
696 schema_ = schema; // revert schema to the initial state
697 }
698 ReleaseExecutor(handle);
699 return errCode;
700 }
701
UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::vector<std::string> & deleteTableNames,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames)702 int SQLiteSingleRelationalStorageEngine::UpgradeSharedTableInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
703 const DataBaseSchema &cloudSchema, const std::vector<std::string> &deleteTableNames,
704 const std::map<std::string, std::vector<Field>> &updateTableNames,
705 const std::map<std::string, std::string> &alterTableNames)
706 {
707 RelationalSchemaObject schema = GetSchema();
708 int errCode = DoDeleteSharedTable(handle, deleteTableNames, schema);
709 if (errCode != E_OK) {
710 LOGE("[RelationalStorageEngine] delete shared table or distributed table failed. %d", errCode);
711 return errCode;
712 }
713 errCode = DoUpdateSharedTable(handle, updateTableNames, cloudSchema, schema);
714 if (errCode != E_OK) {
715 LOGE("[RelationalStorageEngine] update shared table or distributed table failed. %d", errCode);
716 return errCode;
717 }
718 errCode = CheckIfExistUserTable(handle, cloudSchema, alterTableNames, schema);
719 if (errCode != E_OK) {
720 LOGE("[RelationalStorageEngine] check local user table failed. %d", errCode);
721 return errCode;
722 }
723 errCode = DoAlterSharedTableName(handle, alterTableNames, schema);
724 if (errCode != E_OK) {
725 LOGE("[RelationalStorageEngine] alter shared table or distributed table failed. %d", errCode);
726 return errCode;
727 }
728 errCode = DoCreateSharedTable(handle, cloudSchema, updateTableNames, alterTableNames, schema);
729 if (errCode != E_OK) {
730 LOGE("[RelationalStorageEngine] create shared table or distributed table failed. %d", errCode);
731 return errCode;
732 }
733 std::lock_guard lock(schemaMutex_);
734 schema_ = schema;
735 return E_OK;
736 }
737
DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::vector<std::string> & deleteTableNames,RelationalSchemaObject & schema)738 int SQLiteSingleRelationalStorageEngine::DoDeleteSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
739 const std::vector<std::string> &deleteTableNames, RelationalSchemaObject &schema)
740 {
741 if (deleteTableNames.empty()) {
742 return E_OK;
743 }
744 int errCode = handle->DeleteTable(deleteTableNames);
745 if (errCode != E_OK) {
746 LOGE("[RelationalStorageEngine] delete shared table failed. %d", errCode);
747 return errCode;
748 }
749 std::vector<Key> keys;
750 for (const auto &tableName : deleteTableNames) {
751 errCode = handle->CleanResourceForDroppedTable(tableName);
752 if (errCode != E_OK) {
753 LOGE("[RelationalStorageEngine] delete shared distributed table failed. %d", errCode);
754 return errCode;
755 }
756 Key sharedTableKey = DBCommon::GetPrefixTableName(tableName);
757 if (sharedTableKey.empty() || sharedTableKey.size() > DBConstant::MAX_KEY_SIZE) {
758 LOGE("[RelationalStorageEngine] shared table key is invalid.");
759 return -E_INVALID_ARGS;
760 }
761 keys.push_back(sharedTableKey);
762 schema.RemoveRelationalTable(tableName);
763 }
764 errCode = handle->DeleteMetaData(keys);
765 if (errCode != E_OK) {
766 LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
767 }
768 return errCode;
769 }
770
DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::vector<Field>> & updateTableNames,const DataBaseSchema & cloudSchema,RelationalSchemaObject & localSchema)771 int SQLiteSingleRelationalStorageEngine::DoUpdateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
772 const std::map<std::string, std::vector<Field>> &updateTableNames, const DataBaseSchema &cloudSchema,
773 RelationalSchemaObject &localSchema)
774 {
775 if (updateTableNames.empty()) {
776 return E_OK;
777 }
778 int errCode = handle->UpdateSharedTable(updateTableNames);
779 if (errCode != E_OK) {
780 LOGE("[RelationalStorageEngine] update shared table failed. %d", errCode);
781 return errCode;
782 }
783 for (const auto &tableSchema : cloudSchema.tables) {
784 if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
785 errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
786 TableSyncType::CLOUD_COOPERATION, localSchema);
787 if (errCode != E_OK) {
788 LOGE("[RelationalStorageEngine] update shared distributed table failed. %d", errCode);
789 return errCode;
790 }
791 }
792 }
793 return E_OK;
794 }
795
DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)796 int SQLiteSingleRelationalStorageEngine::DoAlterSharedTableName(SQLiteSingleVerRelationalStorageExecutor *&handle,
797 const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
798 {
799 if (alterTableNames.empty()) {
800 return E_OK;
801 }
802 int errCode = handle->AlterTableName(alterTableNames);
803 if (errCode != E_OK) {
804 LOGE("[RelationalStorageEngine] alter shared table failed. %d", errCode);
805 return errCode;
806 }
807 std::map<std::string, std::string> distributedSharedTableNames;
808 for (const auto &tableName : alterTableNames) {
809 errCode = handle->DeleteTableTrigger(tableName.first);
810 if (errCode != E_OK) {
811 LOGE("[RelationalStorageEngine] delete shared table trigger failed. %d", errCode);
812 return errCode;
813 }
814 std::string oldDistributedName = DBCommon::GetLogTableName(tableName.first);
815 std::string newDistributedName = DBCommon::GetLogTableName(tableName.second);
816 distributedSharedTableNames[oldDistributedName] = newDistributedName;
817 }
818 errCode = handle->AlterTableName(distributedSharedTableNames);
819 if (errCode != E_OK) {
820 LOGE("[RelationalStorageEngine] alter distributed shared table failed. %d", errCode);
821 return errCode;
822 }
823 for (const auto &[oldTableName, newTableName] : alterTableNames) {
824 TableInfo tableInfo = schema.GetTable(oldTableName);
825 tableInfo.SetTableName(newTableName);
826 schema.AddRelationalTable(tableInfo);
827 schema.RemoveRelationalTable(oldTableName);
828 }
829 errCode = UpdateKvData(handle, alterTableNames);
830 if (errCode != E_OK) {
831 LOGE("[RelationalStorageEngine] update kv data failed. %d", errCode);
832 }
833 return errCode;
834 }
835
DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::vector<Field>> & updateTableNames,const std::map<std::string,std::string> & alterTableNames,RelationalSchemaObject & schema)836 int SQLiteSingleRelationalStorageEngine::DoCreateSharedTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
837 const DataBaseSchema &cloudSchema, const std::map<std::string, std::vector<Field>> &updateTableNames,
838 const std::map<std::string, std::string> &alterTableNames, RelationalSchemaObject &schema)
839 {
840 for (auto const &tableSchema : cloudSchema.tables) {
841 if (tableSchema.sharedTableName.empty()) {
842 continue;
843 }
844 if (updateTableNames.find(tableSchema.sharedTableName) != updateTableNames.end()) {
845 continue;
846 }
847 bool isUpdated = false;
848 for (const auto &alterTableName : alterTableNames) {
849 if (alterTableName.second == tableSchema.sharedTableName) {
850 isUpdated = true;
851 break;
852 }
853 }
854 if (isUpdated) {
855 continue;
856 }
857 int errCode = handle->CreateSharedTable(tableSchema);
858 if (errCode != E_OK) {
859 return errCode;
860 }
861 errCode = CreateDistributedSharedTable(handle, tableSchema.name, tableSchema.sharedTableName,
862 TableSyncType::CLOUD_COOPERATION, schema);
863 if (errCode != E_OK) {
864 return errCode;
865 }
866 }
867 return E_OK;
868 }
869
UpdateKvData(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::map<std::string,std::string> & alterTableNames)870 int SQLiteSingleRelationalStorageEngine::UpdateKvData(SQLiteSingleVerRelationalStorageExecutor *&handle,
871 const std::map<std::string, std::string> &alterTableNames)
872 {
873 std::vector<Key> keys;
874 for (const auto &tableName : alterTableNames) {
875 Key oldKey = DBCommon::GetPrefixTableName(tableName.first);
876 Value value;
877 int ret = handle->GetKvData(oldKey, value);
878 if (ret == -E_NOT_FOUND) {
879 continue;
880 }
881 if (ret != E_OK) {
882 LOGE("[RelationalStorageEngine] get meta data failed. %d", ret);
883 return ret;
884 }
885 keys.push_back(oldKey);
886 Key newKey = DBCommon::GetPrefixTableName(tableName.second);
887 ret = handle->PutKvData(newKey, value);
888 if (ret != E_OK) {
889 LOGE("[RelationalStorageEngine] put meta data failed. %d", ret);
890 return ret;
891 }
892 }
893 int errCode = handle->DeleteMetaData(keys);
894 if (errCode != E_OK) {
895 LOGE("[RelationalStorageEngine] delete meta data failed. %d", errCode);
896 }
897 return errCode;
898 }
899
CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor * & handle,const DataBaseSchema & cloudSchema,const std::map<std::string,std::string> & alterTableNames,const RelationalSchemaObject & schema)900 int SQLiteSingleRelationalStorageEngine::CheckIfExistUserTable(SQLiteSingleVerRelationalStorageExecutor *&handle,
901 const DataBaseSchema &cloudSchema, const std::map<std::string, std::string> &alterTableNames,
902 const RelationalSchemaObject &schema)
903 {
904 for (const auto &tableSchema : cloudSchema.tables) {
905 if (alterTableNames.find(tableSchema.sharedTableName) != alterTableNames.end()) {
906 continue;
907 }
908 TableInfo tableInfo = schema.GetTable(tableSchema.sharedTableName);
909 if (tableInfo.GetSharedTableMark()) {
910 continue;
911 }
912 int errCode = handle->CheckIfExistUserTable(tableSchema.sharedTableName);
913 if (errCode != E_OK) {
914 LOGE("[RelationalStorageEngine] local exists table. %d", errCode);
915 return errCode;
916 }
917 }
918 return E_OK;
919 }
920
CalTableRef(const std::vector<std::string> & tableNames,const std::map<std::string,std::string> & sharedTableOriginNames)921 std::pair<std::vector<std::string>, int> SQLiteSingleRelationalStorageEngine::CalTableRef(
922 const std::vector<std::string> &tableNames, const std::map<std::string, std::string> &sharedTableOriginNames)
923 {
924 std::pair<std::vector<std::string>, int> res = { tableNames, E_OK };
925 std::map<std::string, std::map<std::string, bool>> reachableReference;
926 std::map<std::string, int> tableWeight;
927 {
928 std::lock_guard lock(schemaMutex_);
929 reachableReference = schema_.GetReachableRef();
930 tableWeight = schema_.GetTableWeight();
931 }
932 if (reachableReference.empty()) {
933 return res;
934 }
935 auto reachableWithShared = GetReachableWithShared(reachableReference, sharedTableOriginNames);
936 // check dependency conflict
937 for (size_t i = 0; i < tableNames.size(); ++i) {
938 for (size_t j = i + 1; j < tableNames.size(); ++j) {
939 // such as table A B, if dependency is A->B
940 // sync should not be A->B, it should be B->A
941 // so if A can reach B, it's wrong
942 if (reachableWithShared[tableNames[i]][tableNames[j]]) {
943 LOGE("[RDBStorageEngine] table %zu reach table %zu", i, j);
944 res.second = -E_INVALID_ARGS;
945 return res;
946 }
947 }
948 }
949 tableWeight = GetTableWeightWithShared(tableWeight, sharedTableOriginNames);
950 auto actualTable = DBCommon::GenerateNodesByNodeWeight(tableNames, reachableWithShared, tableWeight);
951 res.first.assign(actualTable.begin(), actualTable.end());
952 return res;
953 }
954
CleanTrackerDeviceTable(const std::vector<std::string> & tableNames,RelationalSchemaObject & trackerSchemaObj,SQLiteSingleVerRelationalStorageExecutor * & handle)955 int SQLiteSingleRelationalStorageEngine::CleanTrackerDeviceTable(const std::vector<std::string> &tableNames,
956 RelationalSchemaObject &trackerSchemaObj, SQLiteSingleVerRelationalStorageExecutor *&handle)
957 {
958 std::vector<std::string> missingTrackerTables;
959 int errCode = handle->CheckAndCleanDistributedTable(tableNames, missingTrackerTables);
960 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
961 LOGE("Check tracker table failed. %d", errCode);
962 return errCode;
963 }
964 if (missingTrackerTables.empty()) { // LCOV_EXCL_BR_LINE
965 return E_OK;
966 }
967 for (const auto &tableName : missingTrackerTables) {
968 TrackerSchema schema;
969 schema.tableName = tableName;
970 trackerSchemaObj.RemoveTrackerSchema(schema);
971 }
972 errCode = SaveTrackerSchemaToMetaTable(handle, trackerSchemaObj); // save schema to meta_data
973 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
974 LOGE("Save tracker schema to metaTable failed. %d", errCode);
975 }
976 return errCode;
977 }
978
GenLogInfoForUpgrade(const std::string & tableName,RelationalSchemaObject & schema,bool schemaChanged)979 int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string &tableName,
980 RelationalSchemaObject &schema, bool schemaChanged)
981 {
982 int errCode = E_OK;
983 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
984 OperatePerm::NORMAL_PERM, errCode));
985 if (handle == nullptr) {
986 return errCode;
987 }
988 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
989
990 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
991 if (errCode != E_OK) {
992 return errCode;
993 }
994
995 TableInfo table = GetSchema().GetTable(tableName);
996 errCode = handle->UpgradedLogForExistedData(table, schemaChanged);
997 if (errCode != E_OK) {
998 LOGE("Upgrade tracker table log failed. %d", errCode);
999 (void)handle->Rollback();
1000 return errCode;
1001 }
1002 return handle->Commit();
1003 }
1004
GetReachableWithShared(const std::map<std::string,std::map<std::string,bool>> & reachableReference,const std::map<std::string,std::string> & tableToShared)1005 std::map<std::string, std::map<std::string, bool>> SQLiteSingleRelationalStorageEngine::GetReachableWithShared(
1006 const std::map<std::string, std::map<std::string, bool>> &reachableReference,
1007 const std::map<std::string, std::string> &tableToShared)
1008 {
1009 // we translate all origin table to shared table
1010 std::map<std::string, std::map<std::string, bool>> reachableWithShared;
1011 for (const auto &[source, reach] : reachableReference) {
1012 bool sourceHasNoShared = tableToShared.find(source) == tableToShared.end();
1013 for (const auto &[target, isReach] : reach) {
1014 // merge two reachable reference
1015 reachableWithShared[source][target] = isReach;
1016 if (sourceHasNoShared || tableToShared.find(target) == tableToShared.end()) {
1017 continue;
1018 }
1019 // record shared reachable reference
1020 reachableWithShared[tableToShared.at(source)][tableToShared.at(target)] = isReach;
1021 }
1022 }
1023 return reachableWithShared;
1024 }
1025
GetTableWeightWithShared(const std::map<std::string,int> & tableWeight,const std::map<std::string,std::string> & tableToShared)1026 std::map<std::string, int> SQLiteSingleRelationalStorageEngine::GetTableWeightWithShared(
1027 const std::map<std::string, int> &tableWeight, const std::map<std::string, std::string> &tableToShared)
1028 {
1029 std::map<std::string, int> res;
1030 for (const auto &[table, weight] : tableWeight) {
1031 res[table] = weight;
1032 if (tableToShared.find(table) == tableToShared.end()) {
1033 continue;
1034 }
1035 res[tableToShared.at(table)] = weight;
1036 }
1037 return res;
1038 }
1039
UpdateExtendField(const DistributedDB::TrackerSchema & schema)1040 int SQLiteSingleRelationalStorageEngine::UpdateExtendField(const DistributedDB::TrackerSchema &schema)
1041 {
1042 if (schema.extendColNames.empty()) {
1043 return E_OK;
1044 }
1045 int errCode = E_OK;
1046 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true,
1047 OperatePerm::NORMAL_PERM, errCode));
1048 if (handle == nullptr) {
1049 return errCode;
1050 }
1051 ResFinalizer finalizer([&handle, this] { this->ReleaseExecutor(handle); });
1052
1053 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1054 if (errCode != E_OK) {
1055 return errCode;
1056 }
1057
1058 errCode = handle->UpdateExtendField(schema.tableName, schema.extendColNames);
1059 if (errCode != E_OK) {
1060 LOGE("[%s [%zu]] Update extend field failed. %d",
1061 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1062 (void)handle->Rollback();
1063 return errCode;
1064 }
1065
1066 RelationalSchemaObject tracker = trackerSchema_;
1067 TrackerTable oldTrackerTable = tracker.GetTrackerTable(schema.tableName);
1068 const std::set<std::string>& oldExtendColNames = oldTrackerTable.GetExtendNames();
1069 const std::string lowVersionExtendColName = oldTrackerTable.GetExtendName();
1070 if (!oldExtendColNames.empty()) {
1071 errCode = handle->UpdateDeleteDataExtendField(schema.tableName, lowVersionExtendColName,
1072 oldExtendColNames, schema.extendColNames);
1073 if (errCode != E_OK) {
1074 LOGE("[%s [%zu]] Update extend field for delete data failed. %d",
1075 DBCommon::StringMiddleMasking(schema.tableName).c_str(), schema.tableName.size(), errCode);
1076 (void)handle->Rollback();
1077 return errCode;
1078 }
1079 }
1080 return handle->Commit();
1081 }
1082
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1083 std::pair<int, bool> SQLiteSingleRelationalStorageEngine::SetDistributedSchema(const DistributedSchema &schema,
1084 bool isForceUpgrade)
1085 {
1086 std::lock_guard<std::mutex> autoLock(createDistributedTableMutex_);
1087 auto schemaObj = GetSchema();
1088 std::pair<int, bool> res = {E_OK, schemaObj.CheckDistributedSchemaChange(schema)};
1089 auto &[errCode, isSchemaChange] = res;
1090 if (properties_.GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE) {
1091 LOGE("tableMode SPLIT_BY_DEVICE not support set distributed schema");
1092 errCode = -E_NOT_SUPPORT;
1093 return res;
1094 }
1095 if (!isSchemaChange) {
1096 return res;
1097 }
1098 auto localSchema = schemaObj.GetDistributedSchema();
1099 if (localSchema.version != 0 && localSchema.version >= schema.version) {
1100 LOGE("new schema version no upgrade old:%" PRIu32 " new:%" PRIu32, localSchema.version, schema.version);
1101 errCode = -E_INVALID_ARGS;
1102 } else {
1103 errCode = SetDistributedSchemaInner(schemaObj, schema, isForceUpgrade);
1104 }
1105 if (errCode == E_OK) {
1106 SetSchema(schemaObj);
1107 }
1108 return res;
1109 }
1110
SetDistributedSchemaInner(RelationalSchemaObject & schemaObj,const DistributedSchema & schema,bool isForceUpgrade)1111 int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInner(RelationalSchemaObject &schemaObj,
1112 const DistributedSchema &schema, bool isForceUpgrade)
1113 {
1114 int errCode = E_OK;
1115 auto *handle = static_cast<SQLiteSingleVerRelationalStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
1116 errCode));
1117 if (handle == nullptr) {
1118 return errCode;
1119 }
1120 ResFinalizer resFinalizer([this, handle]() {
1121 auto rdbHandle = handle;
1122 ReleaseExecutor(rdbHandle);
1123 });
1124
1125 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1126 if (errCode != E_OK) {
1127 return errCode;
1128 }
1129 errCode = SQLiteRelationalUtils::CheckDistributedSchemaValid(schemaObj, schema, isForceUpgrade, handle);
1130 if (errCode != E_OK) {
1131 (void)handle->Rollback();
1132 return errCode;
1133 }
1134 schemaObj.SetDistributedSchema(schema);
1135 for (const auto &table : schema.tables) {
1136 TableInfo tableInfo = schemaObj.GetTable(table.tableName);
1137 if (tableInfo.Empty()) {
1138 continue;
1139 }
1140 tableInfo.SetDistributedTable(schemaObj.GetDistributedTable(table.tableName));
1141 errCode = handle->RenewTableTrigger(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType());
1142 if (errCode != E_OK) {
1143 LOGE("Failed to refresh trigger while setting up distributed schema: %d", errCode);
1144 (void)handle->Rollback();
1145 return errCode;
1146 }
1147 errCode = handle->UpdateHashKey(schemaObj.GetTableMode(), tableInfo, tableInfo.GetTableSyncType());
1148 if (errCode != E_OK) {
1149 LOGE("Failed to update hash_key while setting up distributed schema: %d", errCode);
1150 (void)handle->Rollback();
1151 return errCode;
1152 }
1153 }
1154 errCode = SaveSchemaToMetaTable(handle, schemaObj);
1155 if (errCode != E_OK) {
1156 LOGE("Save schema to meta table for set distributed schema failed. %d", errCode);
1157 (void)handle->Rollback();
1158 return errCode;
1159 }
1160 return handle->Commit();
1161 }
1162 }
1163 #endif