1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "res_finalizer.h"
26 #include "sqlite_log_table_manager.h"
27 #include "sqlite_relational_utils.h"
28 #include "sqlite_relational_store_connection.h"
29 #include "storage_engine_manager.h"
30 #include "cloud_sync_utils.h"
31
32 namespace DistributedDB {
33 namespace {
34 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
35 }
36
~SQLiteRelationalStore()37 SQLiteRelationalStore::~SQLiteRelationalStore()
38 {
39 sqliteStorageEngine_ = nullptr;
40 }
41
42 // Called when a new connection created.
IncreaseConnectionCounter()43 void SQLiteRelationalStore::IncreaseConnectionCounter()
44 {
45 if (sqliteStorageEngine_ == nullptr) {
46 LOGE("[SQLiteRelationalStore] [IncreaseConnectionCounter] sqliteStorageEngine_ is nullptr.");
47 return;
48 }
49 connectionCount_.fetch_add(1, std::memory_order_seq_cst);
50 if (connectionCount_.load() > 0) {
51 sqliteStorageEngine_->SetConnectionFlag(true);
52 }
53 }
54
GetDBConnection(int & errCode)55 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
56 {
57 std::lock_guard<std::mutex> lock(connectMutex_);
58 RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
59 if (connection == nullptr) {
60 errCode = -E_OUT_OF_MEMORY;
61 return nullptr;
62 }
63 IncObjRef(this);
64 IncreaseConnectionCounter();
65 return connection;
66 }
67
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)68 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
69 {
70 option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
71 option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
72 if (properties.IsEncrypted()) {
73 option.cipherType = properties.GetCipherType();
74 option.passwd = properties.GetPasswd();
75 option.iterTimes = properties.GetIterTimes();
76 }
77 }
78
InitStorageEngine(const RelationalDBProperties & properties)79 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
80 {
81 OpenDbProperties option;
82 InitDataBaseOption(properties, option);
83 std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
84
85 StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
86 int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
87 if (errCode != E_OK) {
88 LOGE("Init the sqlite storage engine failed:%d", errCode);
89 }
90 return errCode;
91 }
92
ReleaseResources()93 void SQLiteRelationalStore::ReleaseResources()
94 {
95 if (sqliteStorageEngine_ != nullptr) {
96 sqliteStorageEngine_->ClearEnginePasswd();
97 sqliteStorageEngine_ = nullptr;
98 }
99 #ifdef USE_DISTRIBUTEDDB_CLOUD
100 if (cloudSyncer_ != nullptr) {
101 cloudSyncer_->Close();
102 RefObject::KillAndDecObjRef(cloudSyncer_);
103 cloudSyncer_ = nullptr;
104 }
105 #endif
106 RefObject::DecObjRef(storageEngine_);
107 }
108
CheckDBMode()109 int SQLiteRelationalStore::CheckDBMode()
110 {
111 int errCode = E_OK;
112 auto *handle = GetHandle(true, errCode);
113 if (handle == nullptr) {
114 return errCode;
115 }
116 errCode = handle->CheckDBModeForRelational();
117 if (errCode != E_OK) {
118 LOGE("check relational DB mode failed. %d", errCode);
119 }
120
121 ReleaseHandle(handle);
122 return errCode;
123 }
124
GetSchemaFromMeta(RelationalSchemaObject & schema)125 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
126 {
127 Key schemaKey;
128 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
129 Value schemaVal;
130 int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
131 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
132 LOGE("Get relational schema from meta table failed. %d", errCode);
133 return errCode;
134 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
135 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
136 return -E_NOT_FOUND;
137 }
138
139 std::string schemaStr;
140 DBCommon::VectorToString(schemaVal, schemaStr);
141 errCode = schema.ParseFromSchemaString(schemaStr);
142 if (errCode != E_OK) {
143 LOGE("Parse schema string from meta table failed.");
144 return errCode;
145 }
146
147 sqliteStorageEngine_->SetSchema(schema);
148 return E_OK;
149 }
150
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)151 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
152 {
153 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
154 Value modeVal;
155 int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
156 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
157 LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
158 return errCode;
159 }
160
161 DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
162 if (!modeVal.empty()) {
163 std::string value(modeVal.begin(), modeVal.end());
164 orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
165 } else if (isUnSet) {
166 return E_OK; // First set table mode.
167 }
168
169 if (orgMode == DistributedTableMode::COLLABORATION && orgMode != mode) {
170 LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
171 return -E_INVALID_ARGS;
172 }
173 return E_OK;
174 }
175
CheckProperties(RelationalDBProperties properties)176 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
177 {
178 RelationalSchemaObject schema;
179 int errCode = GetSchemaFromMeta(schema);
180 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
181 LOGE("Get relational schema from meta failed. errcode=%d", errCode);
182 return errCode;
183 }
184 int ret = InitTrackerSchemaFromMeta();
185 if (ret != E_OK) {
186 LOGE("Init tracker schema from meta failed. errcode=%d", ret);
187 return ret;
188 }
189 properties.SetSchema(schema);
190
191 // Empty schema means no distributed table has been used, we may set DB to any table mode
192 // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
193 bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
194 auto mode = properties.GetDistributedTableMode();
195 errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
196 if (errCode != E_OK) {
197 LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
198 return errCode;
199 }
200 if (!isSchemaEmpty) {
201 return errCode;
202 }
203
204 errCode = SaveTableModeToMeta(mode);
205 if (errCode != E_OK) {
206 LOGE("Save table mode to meta failed. errCode=%d", errCode);
207 return errCode;
208 }
209
210 return E_OK;
211 }
212
SaveTableModeToMeta(DistributedTableMode mode)213 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
214 {
215 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
216 Value modeVal;
217 DBCommon::StringToVector(std::to_string(static_cast<int>(mode)), modeVal);
218 int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
219 if (errCode != E_OK) {
220 LOGE("Save relational schema to meta table failed. %d", errCode);
221 }
222 return errCode;
223 }
224
SaveLogTableVersionToMeta()225 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
226 {
227 LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
228 const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY,
229 DBConstant::LOG_TABLE_VERSION_KEY + strlen(DBConstant::LOG_TABLE_VERSION_KEY));
230 Value logVersion;
231 int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
232 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
233 LOGE("Get log version from meta table failed. %d", errCode);
234 return errCode;
235 }
236 std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
237 Value logVersionVal(versionStr.begin(), versionStr.end());
238 // log version is same, no need to update
239 if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
240 return errCode;
241 }
242 // If the log version does not exist or is different, update the log version
243 errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
244 if (errCode != E_OK) {
245 LOGE("save log table version to meta table failed. %d", errCode);
246 }
247 return errCode;
248 }
249
CleanDistributedDeviceTable()250 int SQLiteRelationalStore::CleanDistributedDeviceTable()
251 {
252 std::vector<std::string> missingTables;
253 int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
254 if (errCode != E_OK) {
255 LOGE("Clean distributed device table failed. %d", errCode);
256 }
257 for (const auto &deviceTableName : missingTables) {
258 std::string deviceHash;
259 std::string tableName;
260 DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
261 errCode = syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
262 if (errCode != E_OK) {
263 LOGE("Erase water mark failed:%d", errCode);
264 return errCode;
265 }
266 }
267 return errCode;
268 }
269
Open(const RelationalDBProperties & properties)270 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
271 {
272 std::lock_guard<std::mutex> lock(initalMutex_);
273 if (isInitialized_) {
274 LOGD("[RelationalStore][Open] relational db was already initialized.");
275 return E_OK;
276 }
277 int errCode = InitSQLiteStorageEngine(properties);
278 if (errCode != E_OK) {
279 return errCode;
280 }
281
282 do {
283 errCode = InitStorageEngine(properties);
284 if (errCode != E_OK) {
285 LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
286 break;
287 }
288
289 storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
290 if (storageEngine_ == nullptr) {
291 LOGE("[RelationalStore][Open] Create syncable storage failed");
292 errCode = -E_OUT_OF_MEMORY;
293 break;
294 }
295
296 syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
297 // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
298 // be destructed when close store
299 storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
300 #ifdef USE_DISTRIBUTEDDB_CLOUD
301 cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
302 #endif
303 errCode = CheckDBMode();
304 if (errCode != E_OK) {
305 break;
306 }
307
308 errCode = CheckProperties(properties);
309 if (errCode != E_OK) {
310 break;
311 }
312
313 errCode = SaveLogTableVersionToMeta();
314 if (errCode != E_OK) {
315 break;
316 }
317
318 errCode = CleanDistributedDeviceTable();
319 if (errCode != E_OK) {
320 break;
321 }
322
323 isInitialized_ = true;
324 return E_OK;
325 } while (false);
326
327 ReleaseResources();
328 return errCode;
329 }
330
OnClose(const std::function<void (void)> & notifier)331 void SQLiteRelationalStore::OnClose(const std::function<void(void)> ¬ifier)
332 {
333 AutoLock lockGuard(this);
334 if (notifier) {
335 closeNotifiers_.push_back(notifier);
336 } else {
337 LOGW("Register 'Close()' notifier failed, notifier is null.");
338 }
339 }
340
GetHandle(bool isWrite,int & errCode) const341 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
342 {
343 if (sqliteStorageEngine_ == nullptr) {
344 errCode = -E_INVALID_DB;
345 return nullptr;
346 }
347
348 return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
349 sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
350 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const351 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
352 {
353 if (handle == nullptr) {
354 return;
355 }
356
357 if (sqliteStorageEngine_ != nullptr) {
358 StorageExecutor *databaseHandle = handle;
359 sqliteStorageEngine_->Recycle(databaseHandle);
360 handle = nullptr;
361 }
362 }
363
Sync(const ISyncer::SyncParam & syncParam,uint64_t connectionId)364 int SQLiteRelationalStore::Sync(const ISyncer::SyncParam &syncParam, uint64_t connectionId)
365 {
366 return syncAbleEngine_->Sync(syncParam, connectionId);
367 }
368
369 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)370 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
371 {
372 int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
373 if (count <= 0) {
374 LOGF("Decrease db connection counter failed, count <= 0.");
375 return;
376 }
377 if (storageEngine_ != nullptr) {
378 storageEngine_->EraseDataChangeCallback(connectionId);
379 }
380 if (count != 1) {
381 return;
382 }
383
384 LockObj();
385 auto notifiers = std::move(closeNotifiers_);
386 UnlockObj();
387 for (const auto ¬ifier : notifiers) {
388 if (notifier) {
389 notifier();
390 }
391 }
392
393 // Sync Close
394 syncAbleEngine_->Close();
395
396 #ifdef USE_DISTRIBUTEDDB_CLOUD
397 if (cloudSyncer_ != nullptr) {
398 cloudSyncer_->Close();
399 RefObject::KillAndDecObjRef(cloudSyncer_);
400 cloudSyncer_ = nullptr;
401 }
402 #endif
403
404 if (sqliteStorageEngine_ != nullptr) {
405 sqliteStorageEngine_ = nullptr;
406 }
407 {
408 if (storageEngine_ != nullptr) {
409 storageEngine_->RegisterHeartBeatListener(nullptr);
410 }
411 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
412 StopLifeCycleTimer();
413 lifeCycleNotifier_ = nullptr;
414 }
415 // close will dec sync ref of storageEngine_
416 DecObjRef(storageEngine_);
417 }
418
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)419 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
420 {
421 if (connectionCount_.load() == 1) {
422 sqliteStorageEngine_->SetConnectionFlag(false);
423 }
424
425 connectMutex_.lock();
426 if (connection != nullptr) {
427 KillAndDecObjRef(connection);
428 DecreaseConnectionCounter(connectionId);
429 connectMutex_.unlock();
430 KillAndDecObjRef(this);
431 } else {
432 connectMutex_.unlock();
433 }
434 }
435
WakeUpSyncer()436 void SQLiteRelationalStore::WakeUpSyncer()
437 {
438 syncAbleEngine_->WakeUpSyncer();
439 }
440
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)441 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
442 bool trackerSchemaChanged)
443 {
444 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
445 TableInfo tableInfo = localSchema.GetTable(tableName);
446 if (!tableInfo.Empty()) {
447 bool isSharedTable = tableInfo.GetSharedTableMark();
448 if (isSharedTable && !trackerSchemaChanged) {
449 return E_OK; // shared table will create distributed table when use SetCloudDbSchema
450 }
451 }
452
453 auto mode = sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode();
454 std::string localIdentity; // collaboration mode need local identify
455 if (mode == DistributedTableMode::COLLABORATION) {
456 int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
457 if (errCode != E_OK || localIdentity.empty()) {
458 LOGW("Get local identity failed: %d", errCode);
459 }
460 }
461 bool schemaChanged = false;
462 int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(localIdentity),
463 schemaChanged, syncType, trackerSchemaChanged);
464 if (errCode != E_OK) {
465 LOGE("Create distributed table failed. %d", errCode);
466 } else {
467 CleanDirtyLogIfNeed(tableName);
468 }
469 if (schemaChanged) {
470 LOGD("Notify schema changed.");
471 storageEngine_->NotifySchemaChanged();
472 }
473 return errCode;
474 }
475
476 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetCloudSyncTaskCount()477 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
478 {
479 if (cloudSyncer_ == nullptr) {
480 LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
481 return -1;
482 }
483 return cloudSyncer_->GetCloudSyncTaskCount();
484 }
485
CleanCloudData(ClearMode mode)486 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
487 {
488 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
489 TableInfoMap tables = localSchema.GetTables();
490 std::vector<std::string> cloudTableNameList;
491 for (const auto &tableInfo : tables) {
492 bool isSharedTable = tableInfo.second.GetSharedTableMark();
493 if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
494 continue;
495 }
496 if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
497 cloudTableNameList.push_back(tableInfo.first);
498 }
499 }
500 if (cloudTableNameList.empty()) {
501 LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
502 return E_OK;
503 }
504 if (cloudSyncer_ == nullptr) {
505 LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
506 return -E_INVALID_DB;
507 }
508 int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
509 if (errCode != E_OK) {
510 LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
511 }
512
513 return errCode;
514 }
515
ClearCloudWatermark(const std::set<std::string> & tableNames)516 int SQLiteRelationalStore::ClearCloudWatermark(const std::set<std::string> &tableNames)
517 {
518 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
519 TableInfoMap tables = localSchema.GetTables();
520 std::vector<std::string> cloudTableNameList;
521 bool isClearAllTables = tableNames.empty();
522 for (const auto &tableInfo : tables) {
523 if (tableInfo.second.GetSharedTableMark()) {
524 continue;
525 }
526 std::string tableName = tableInfo.first;
527 std::string maskedName = DBCommon::StringMiddleMasking(tableName);
528 bool isTargetTable = tableNames.count(tableName) > 0;
529 if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION && (isClearAllTables || isTargetTable)) {
530 cloudTableNameList.push_back(tableName);
531 LOGI("[RelationalStore] cloud watermark of table %s will be cleared, original name length: %zu",
532 maskedName.c_str(), tableName.length());
533 } else if (!isClearAllTables && isTargetTable) {
534 LOGW("[RelationalStore] table %s ignored when clear cloud watermark, original name length: %zu",
535 maskedName.c_str(), tableName.length());
536 }
537 }
538 if (cloudTableNameList.empty()) {
539 LOGI("[RelationalStore] no target tables found for clear cloud watermark");
540 return E_OK;
541 }
542 if (cloudSyncer_ == nullptr) {
543 LOGE("[RelationalStore] cloudSyncer was not initialized when clear cloud watermark");
544 return -E_INVALID_DB;
545 }
546 int errCode = cloudSyncer_->ClearCloudWatermark(cloudTableNameList);
547 if (errCode != E_OK) {
548 LOGE("[RelationalStore] failed to clear cloud watermark, %d.", errCode);
549 }
550 return errCode;
551 }
552 #endif
553
RemoveDeviceData()554 int SQLiteRelationalStore::RemoveDeviceData()
555 {
556 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetRelationalProperties().GetIntProp(
557 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
558 if (mode == DistributedTableMode::COLLABORATION) {
559 LOGE("Not support remove all device data in collaboration mode.");
560 return -E_NOT_SUPPORT;
561 }
562
563 std::vector<std::string> tableNameList = GetAllDistributedTableName();
564 if (tableNameList.empty()) {
565 return E_OK;
566 }
567 // erase watermark first
568 int errCode = EraseAllDeviceWatermark(tableNameList);
569 if (errCode != E_OK) {
570 LOGE("remove watermark failed %d", errCode);
571 return errCode;
572 }
573 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
574 errCode = GetHandleAndStartTransaction(handle);
575 if (handle == nullptr) {
576 return errCode;
577 }
578
579 for (const auto &table : tableNameList) {
580 errCode = handle->DeleteDistributedDeviceTable("", table);
581 if (errCode != E_OK) {
582 LOGE("delete device data failed. %d", errCode);
583 break;
584 }
585
586 errCode = handle->DeleteDistributedAllDeviceTableLog(table);
587 if (errCode != E_OK) {
588 LOGE("delete device data failed. %d", errCode);
589 break;
590 }
591 }
592
593 if (errCode != E_OK) {
594 (void)handle->Rollback();
595 ReleaseHandle(handle);
596 return errCode;
597 }
598
599 errCode = handle->Commit();
600 ReleaseHandle(handle);
601 storageEngine_->NotifySchemaChanged();
602 return errCode;
603 }
604
RemoveDeviceData(const std::string & device,const std::string & tableName)605 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
606 {
607 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetRelationalProperties().GetIntProp(
608 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
609 if (mode == DistributedTableMode::COLLABORATION) {
610 LOGE("Not support remove device data in collaboration mode.");
611 return -E_NOT_SUPPORT;
612 }
613
614 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
615 auto iter = tables.find(tableName);
616 if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
617 LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
618 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
619 }
620 // cloud mode is not permit
621 if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
622 LOGE("Remove device data with cloud sync table name.");
623 return -E_NOT_SUPPORT;
624 }
625 bool isNeedHash = false;
626 std::string hashDeviceId;
627 int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
628 if (errCode == -E_NOT_SUPPORT) {
629 isNeedHash = true;
630 hashDeviceId = device;
631 errCode = E_OK;
632 }
633 if (errCode != E_OK) {
634 return errCode;
635 }
636 if (isNeedHash) {
637 // check device is uuid in meta
638 std::set<std::string> hashDevices;
639 errCode = GetExistDevices(hashDevices);
640 if (errCode != E_OK) {
641 return errCode;
642 }
643 if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
644 LOGD("[SQLiteRelationalStore] not match device, just return");
645 return E_OK;
646 }
647 }
648 return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
649 }
650
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)651 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
652 const RelationalObserverAction &action)
653 {
654 return storageEngine_->RegisterObserverAction(connectionId, observer, action);
655 }
656
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)657 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
658 {
659 return storageEngine_->UnRegisterObserverAction(connectionId, observer);
660 }
661
StopLifeCycleTimer()662 int SQLiteRelationalStore::StopLifeCycleTimer()
663 {
664 auto runtimeCxt = RuntimeContext::GetInstance();
665 if (runtimeCxt == nullptr) {
666 return -E_INVALID_ARGS;
667 }
668 if (lifeTimerId_ != 0) {
669 TimerId timerId = lifeTimerId_;
670 lifeTimerId_ = 0;
671 runtimeCxt->RemoveTimer(timerId, false);
672 }
673 return E_OK;
674 }
675
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)676 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier)
677 {
678 auto runtimeCxt = RuntimeContext::GetInstance();
679 if (runtimeCxt == nullptr) {
680 return -E_INVALID_ARGS;
681 }
682 RefObject::IncObjRef(this);
683 TimerId timerId = 0;
684 int errCode = runtimeCxt->SetTimer(
685 DBConstant::DEF_LIFE_CYCLE_TIME,
686 [this](TimerId id) -> int {
687 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
688 if (lifeCycleNotifier_) {
689 // normal identifier mode
690 std::string identifier;
691 if (sqliteStorageEngine_->GetRelationalProperties().GetBoolProp(
692 DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
693 identifier = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(
694 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
695 } else {
696 identifier = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(
697 DBProperties::IDENTIFIER_DATA, "");
698 }
699 auto userId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, "");
700 lifeCycleNotifier_(identifier, userId);
701 }
702 return 0;
703 },
704 [this]() {
705 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
706 if (ret != E_OK) {
707 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
708 }
709 },
710 timerId);
711 if (errCode != E_OK) {
712 lifeTimerId_ = 0;
713 LOGE("SetTimer failed:%d", errCode);
714 RefObject::DecObjRef(this);
715 return errCode;
716 }
717
718 lifeCycleNotifier_ = notifier;
719 lifeTimerId_ = timerId;
720 return E_OK;
721 }
722
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)723 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier)
724 {
725 int errCode;
726 {
727 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
728 if (lifeTimerId_ != 0) {
729 errCode = StopLifeCycleTimer();
730 if (errCode != E_OK) {
731 LOGE("Stop the life cycle timer failed:%d", errCode);
732 return errCode;
733 }
734 }
735
736 if (!notifier) {
737 return E_OK;
738 }
739 errCode = StartLifeCycleTimer(notifier);
740 if (errCode != E_OK) {
741 LOGE("Register life cycle timer failed:%d", errCode);
742 return errCode;
743 }
744 }
745 auto listener = [this] { HeartBeat(); };
746 storageEngine_->RegisterHeartBeatListener(listener);
747 return errCode;
748 }
749
HeartBeat()750 void SQLiteRelationalStore::HeartBeat()
751 {
752 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
753 int errCode = ResetLifeCycleTimer();
754 if (errCode != E_OK) {
755 LOGE("Heart beat for life cycle failed:%d", errCode);
756 }
757 }
758
ResetLifeCycleTimer()759 int SQLiteRelationalStore::ResetLifeCycleTimer()
760 {
761 if (lifeTimerId_ == 0) {
762 return E_OK;
763 }
764 auto lifeNotifier = lifeCycleNotifier_;
765 lifeCycleNotifier_ = nullptr;
766 int errCode = StopLifeCycleTimer();
767 if (errCode != E_OK) {
768 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
769 }
770 return StartLifeCycleTimer(lifeNotifier);
771 }
772
GetStorePath() const773 std::string SQLiteRelationalStore::GetStorePath() const
774 {
775 return sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::DATA_DIR, "");
776 }
777
GetProperties() const778 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
779 {
780 return sqliteStorageEngine_->GetRelationalProperties();
781 }
782
StopSync(uint64_t connectionId)783 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
784 {
785 return syncAbleEngine_->StopSync(connectionId);
786 }
787
Dump(int fd)788 void SQLiteRelationalStore::Dump(int fd)
789 {
790 std::string userId = "";
791 std::string appId = "";
792 std::string storeId = "";
793 std::string label = "";
794 if (sqliteStorageEngine_ != nullptr) {
795 userId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, "");
796 appId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, "");
797 storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
798 label = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
799 }
800 label = DBCommon::TransferStringToHex(label);
801 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
802 storeId.c_str(), label.c_str());
803 if (syncAbleEngine_ != nullptr) {
804 syncAbleEngine_->Dump(fd);
805 }
806 }
807
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)808 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
809 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
810 {
811 if (sqliteStorageEngine_ == nullptr) {
812 return -E_INVALID_DB;
813 }
814 if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
815 LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
816 return -E_MAX_LIMITS;
817 }
818
819 if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
820 LOGW("not a distributed relational store.");
821 return -E_NOT_SUPPORT;
822 }
823
824 // Check whether to be able to operate the db.
825 int errCode = E_OK;
826 auto *handle = GetHandle(false, errCode);
827 if (handle == nullptr) {
828 return errCode;
829 }
830 errCode = handle->CheckEncryptedOrCorrupted();
831 ReleaseHandle(handle);
832 if (errCode != E_OK) {
833 return errCode;
834 }
835
836 return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
837 }
838
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)839 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
840 {
841 std::set<std::string> devices;
842 int errCode = GetExistDevices(devices);
843 if (errCode != E_OK) {
844 return errCode;
845 }
846 for (const auto &tableName : tableNameList) {
847 for (const auto &device : devices) {
848 errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
849 if (errCode != E_OK) {
850 return errCode;
851 }
852 }
853 }
854 return E_OK;
855 }
856
GetDevTableName(const std::string & device,const std::string & hashDev) const857 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
858 {
859 std::string devTableName;
860 StoreInfo info = { sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::USER_ID, ""),
861 sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::APP_ID, ""),
862 sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "") };
863 if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
864 devTableName = hashDev;
865 }
866 return devTableName;
867 }
868
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const869 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
870 {
871 int errCode = E_OK;
872 handle = GetHandle(true, errCode);
873 if (handle == nullptr) {
874 LOGE("get handle failed %d", errCode);
875 return errCode;
876 }
877
878 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
879 if (errCode != E_OK) {
880 LOGE("start transaction failed %d", errCode);
881 ReleaseHandle(handle);
882 }
883 return errCode;
884 }
885
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)886 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
887 const std::string &tableName, bool isNeedHash)
888 {
889 std::string hashHexDev;
890 std::string hashDev;
891 std::string devTableName;
892 if (!isNeedHash) {
893 // if is not need hash mappingDev mean hash(uuid) device is param device
894 hashHexDev = DBCommon::TransferStringToHex(mappingDev);
895 hashDev = mappingDev;
896 devTableName = device;
897 } else {
898 // if is need hash mappingDev mean uuid
899 hashDev = DBCommon::TransferHashString(mappingDev);
900 hashHexDev = DBCommon::TransferStringToHex(hashDev);
901 devTableName = GetDevTableName(mappingDev, hashHexDev);
902 }
903 // erase watermark first
904 int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
905 if (errCode != E_OK) {
906 LOGE("erase watermark failed %d", errCode);
907 return errCode;
908 }
909 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
910 errCode = GetHandleAndStartTransaction(handle);
911 if (handle == nullptr) {
912 return errCode;
913 }
914
915 errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
916 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
917 if (errCode != E_OK) {
918 LOGE("delete device data failed. %d", errCode);
919 tables.clear();
920 }
921
922 for (const auto &it : tables) {
923 if (tableName.empty() || it.second.GetTableName() == tableName) {
924 errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
925 if (errCode != E_OK) {
926 LOGE("delete device data failed. %d", errCode);
927 break;
928 }
929 }
930 }
931
932 if (errCode != E_OK) {
933 (void)handle->Rollback();
934 ReleaseHandle(handle);
935 return errCode;
936 }
937 errCode = handle->Commit();
938 ReleaseHandle(handle);
939 storageEngine_->NotifySchemaChanged();
940 return errCode;
941 }
942
GetExistDevices(std::set<std::string> & hashDevices) const943 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
944 {
945 int errCode = E_OK;
946 auto *handle = GetHandle(true, errCode);
947 if (handle == nullptr) {
948 LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
949 return errCode;
950 }
951 errCode = handle->GetExistsDeviceList(hashDevices);
952 if (errCode != E_OK) {
953 LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
954 }
955 ReleaseHandle(handle);
956 return errCode;
957 }
958
GetAllDistributedTableName(TableSyncType tableSyncType)959 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName(TableSyncType tableSyncType)
960 {
961 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
962 std::vector<std::string> tableNames;
963 for (const auto &table : tables) {
964 if (table.second.GetTableSyncType() != tableSyncType) {
965 continue;
966 }
967 tableNames.push_back(table.second.GetTableName());
968 }
969 return tableNames;
970 }
971
972 #ifdef USE_DISTRIBUTEDDB_CLOUD
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)973 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
974 {
975 if (cloudSyncer_ == nullptr) {
976 LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
977 return -E_INVALID_DB;
978 }
979 cloudSyncer_->SetCloudDB(cloudDb);
980 return E_OK;
981 }
982 #endif
983
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)984 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
985 std::vector<Field> &addFields)
986 {
987 for (const auto &newField : newFields) {
988 if (equalFields.find(newField.colName) == equalFields.end()) {
989 addFields.push_back(newField);
990 }
991 }
992 }
993
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)994 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
995 std::vector<Field> &addFields)
996 {
997 std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
998 if (newFields.size() < oldFields.size()) {
999 return false;
1000 }
1001 std::set<std::string> equalFields;
1002 for (const auto &oldField : oldFields) {
1003 bool isFieldExist = false;
1004 for (const auto &newField : newFields) {
1005 if (newField.colName != oldField.GetFieldName()) {
1006 continue;
1007 }
1008 isFieldExist = true;
1009 int32_t type = newField.type;
1010 // Field type need to match storage type
1011 // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
1012 // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
1013 if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
1014 type++; // storage type - field type = 1
1015 } else if (type == TYPE_INDEX<bool>) {
1016 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
1017 } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
1018 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
1019 }
1020 auto primaryKeyMap = tableInfo.GetPrimaryKey();
1021 auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
1022 [&newField](const std::map<int, std::string>::value_type &pair) {
1023 return pair.second == newField.colName;
1024 });
1025 if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
1026 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
1027 return false;
1028 }
1029 equalFields.insert(newField.colName);
1030 }
1031 if (!isFieldExist) {
1032 return false;
1033 }
1034 }
1035 AddFields(newFields, equalFields, addFields);
1036 return true;
1037 }
1038
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)1039 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
1040 std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
1041 {
1042 std::set<std::string> tableNames;
1043 std::map<std::string, std::string> sharedTableNamesMap;
1044 std::map<std::string, std::vector<Field>> fieldsMap;
1045 for (const auto &table : schema.tables) {
1046 tableNames.insert(table.name);
1047 sharedTableNamesMap[table.name] = table.sharedTableName;
1048 std::vector<Field> fields = table.fields;
1049 bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
1050 Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
1051 Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
1052 fields.insert(fields.begin(), privilegeField);
1053 fields.insert(fields.begin(), ownerField);
1054 fieldsMap[table.name] = fields;
1055 }
1056
1057 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1058 TableInfoMap tableList = localSchema.GetTables();
1059 for (const auto &tableInfo : tableList) {
1060 if (!tableInfo.second.GetSharedTableMark()) {
1061 continue;
1062 }
1063 std::string oldSharedTableName = tableInfo.second.GetTableName();
1064 std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1065 std::vector<Field> addFields;
1066 if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1067 deleteTableNames.push_back(oldSharedTableName);
1068 } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1069 deleteTableNames.push_back(oldSharedTableName);
1070 } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1071 if (!addFields.empty()) {
1072 updateTableNames[oldSharedTableName] = addFields;
1073 }
1074 if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1075 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1076 }
1077 } else {
1078 return false;
1079 }
1080 }
1081 return true;
1082 }
1083
1084 #ifdef USE_DISTRIBUTEDDB_CLOUD
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1085 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1086 {
1087 if (storageEngine_ == nullptr) {
1088 LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1089 return -E_INVALID_DB;
1090 }
1091 int errCode = CheckCloudSchema(schema);
1092 if (errCode != E_OK) {
1093 return errCode;
1094 }
1095 // delete, update and create shared table and its distributed table
1096 errCode = ExecuteCreateSharedTable(schema);
1097 if (errCode != E_OK) {
1098 LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1099 return errCode;
1100 }
1101 return storageEngine_->SetCloudDbSchema(schema);
1102 }
1103
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1104 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1105 {
1106 if (cloudSyncer_ == nullptr) {
1107 LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1108 return -E_INVALID_DB;
1109 }
1110 cloudSyncer_->SetIAssetLoader(loader);
1111 return E_OK;
1112 }
1113 #endif
1114
ChkSchema(const TableName & tableName)1115 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1116 {
1117 // check schema first then compare columns to avoid change the origin return error code
1118 if (storageEngine_ == nullptr) {
1119 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1120 return -E_INVALID_DB;
1121 }
1122 int errCode = storageEngine_->ChkSchema(tableName);
1123 if (errCode != E_OK) {
1124 LOGE("[SQLiteRelationalStore][ChkSchema] ChkSchema failed %d.", errCode);
1125 return errCode;
1126 }
1127 auto *handle = GetHandle(false, errCode);
1128 if (handle == nullptr) {
1129 LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1130 return errCode;
1131 }
1132 RelationalSchemaObject localSchema = storageEngine_->GetSchemaInfo();
1133 handle->SetLocalSchema(localSchema);
1134 errCode = handle->CompareSchemaTableColumns(tableName);
1135 if (errCode != E_OK) {
1136 LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1137 }
1138 ReleaseHandle(handle);
1139 return errCode;
1140 }
1141
1142 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1143 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1144 {
1145 if (storageEngine_ == nullptr) {
1146 LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1147 return -E_INVALID_DB;
1148 }
1149 int errCode = CheckBeforeSync(option);
1150 if (errCode != E_OK) {
1151 return errCode;
1152 }
1153 LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1154 if (option.compensatedSyncOnly) {
1155 CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1156 info.storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
1157 cloudSyncer_->GenerateCompensatedSync(info);
1158 return E_OK;
1159 }
1160 CloudSyncer::CloudTaskInfo info;
1161 FillSyncInfo(option, onProcess, info);
1162 auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1163 if (ret != E_OK) {
1164 return ret;
1165 }
1166 ret = ReFillSyncInfoTable(table, info);
1167 if (ret != E_OK) {
1168 return ret;
1169 }
1170 info.taskId = taskId;
1171 errCode = cloudSyncer_->Sync(info);
1172 return errCode;
1173 }
1174
CheckBeforeSync(const CloudSyncOption & option)1175 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1176 {
1177 if (cloudSyncer_ == nullptr) {
1178 LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1179 return -E_INVALID_DB;
1180 }
1181 if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1182 return -E_INVALID_ARGS;
1183 }
1184 if (option.priorityLevel < CloudDbConstant::PRIORITY_TASK_DEFALUT_LEVEL ||
1185 option.priorityLevel > CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1186 LOGE("[RelationalStore] priority level is invalid value:%d", option.priorityLevel);
1187 return -E_INVALID_ARGS;
1188 }
1189 if (option.compensatedSyncOnly && option.asyncDownloadAssets) {
1190 return -E_NOT_SUPPORT;
1191 }
1192 int errCode = CheckQueryValid(option);
1193 if (errCode != E_OK) {
1194 return errCode;
1195 }
1196 SecurityOption securityOption;
1197 errCode = storageEngine_->GetSecurityOption(securityOption);
1198 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1199 return -E_SECURITY_OPTION_CHECK_ERROR;
1200 }
1201 CloudSyncConfig config = storageEngine_->GetCloudSyncConfig();
1202 if ((errCode == E_OK) && (securityOption.securityLabel == S4) && (!config.isSupportEncrypt)) {
1203 LOGE("[RelationalStore] The current data does not support synchronization.");
1204 return -E_SECURITY_OPTION_CHECK_ERROR;
1205 }
1206 return E_OK;
1207 }
1208
CheckAssetsOnlyValid(const QuerySyncObject & querySyncObject,const CloudSyncOption & option)1209 int SQLiteRelationalStore::CheckAssetsOnlyValid(const QuerySyncObject &querySyncObject, const CloudSyncOption &option)
1210 {
1211 if (!querySyncObject.IsAssetsOnly()) {
1212 return E_OK;
1213 }
1214 if (option.mode != SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1215 LOGE("[RelationalStore] not support mode %d when sync with assets only", option.mode);
1216 return -E_NOT_SUPPORT;
1217 }
1218 if (option.priorityLevel != CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1219 LOGE("[RelationalStore] priorityLevel must be 2 when sync with assets only, now is %d",
1220 option.priorityLevel);
1221 return -E_INVALID_ARGS;
1222 }
1223 if (querySyncObject.AssetsOnlyErrFlag() == -E_INVALID_ARGS) {
1224 LOGE("[RelationalStore] the query statement of assets only is incorrect.");
1225 return -E_INVALID_ARGS;
1226 }
1227 return E_OK;
1228 }
1229
CheckQueryValid(const CloudSyncOption & option)1230 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1231 {
1232 if (option.compensatedSyncOnly) {
1233 return E_OK;
1234 }
1235 QuerySyncObject syncObject(option.query);
1236 int errCode = syncObject.GetValidStatus();
1237 if (errCode != E_OK) {
1238 LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1239 return errCode;
1240 }
1241 std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1242 bool isFromTable = object.empty();
1243 if (!option.priorityTask && !isFromTable) {
1244 LOGE("[RelationalStore] not support normal sync with query");
1245 return -E_NOT_SUPPORT;
1246 }
1247 const auto tableNames = syncObject.GetRelationTableNames();
1248 for (const auto &tableName : tableNames) {
1249 QuerySyncObject querySyncObject;
1250 querySyncObject.SetTableName(tableName);
1251 object.push_back(querySyncObject);
1252 }
1253 std::vector<std::string> syncTableNames;
1254 for (const auto &item : object) {
1255 std::string tableName = item.GetRelationTableName();
1256 syncTableNames.emplace_back(tableName);
1257 if (item.IsContainQueryNodes() && option.asyncDownloadAssets) {
1258 LOGE("[RelationalStore] not support async download assets with query table %s length %zu",
1259 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length());
1260 return -E_NOT_SUPPORT;
1261 }
1262 errCode = CheckAssetsOnlyValid(item, option);
1263 if (errCode != E_OK) {
1264 return errCode;
1265 }
1266 }
1267 errCode = CheckTableName(syncTableNames);
1268 if (errCode != E_OK) {
1269 return errCode;
1270 }
1271 return CheckObjectValid(option.priorityTask, object, isFromTable);
1272 }
1273
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1274 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1275 bool isFromTable)
1276 {
1277 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1278 for (const auto &item : object) {
1279 if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1280 LOGE("[RelationalStore] not support priority sync with full table");
1281 return -E_INVALID_ARGS;
1282 }
1283 int errCode = storageEngine_->CheckQueryValid(item);
1284 if (errCode != E_OK) {
1285 return errCode;
1286 }
1287 if (!priorityTask || isFromTable) {
1288 continue;
1289 }
1290 if (!item.IsInValueOutOfLimit()) {
1291 LOGE("[RelationalStore] not support priority sync in count out of limit");
1292 return -E_MAX_LIMITS;
1293 }
1294 std::string tableName = item.GetRelationTableName();
1295 TableInfo tableInfo = localSchema.GetTable(tableName);
1296 if (!tableInfo.Empty()) {
1297 const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1298 errCode = item.CheckPrimaryKey(primaryKeyMap);
1299 if (errCode != E_OK) {
1300 return errCode;
1301 }
1302 }
1303 }
1304 return E_OK;
1305 }
1306
CheckTableName(const std::vector<std::string> & tableNames)1307 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1308 {
1309 if (tableNames.empty()) {
1310 LOGE("[RelationalStore] sync with empty table");
1311 return -E_INVALID_ARGS;
1312 }
1313 for (const auto &table : tableNames) {
1314 int errCode = ChkSchema(table);
1315 if (errCode != E_OK) {
1316 LOGE("[RelationalStore] schema check failed when sync");
1317 return errCode;
1318 }
1319 }
1320 return E_OK;
1321 }
1322
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1323 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1324 CloudSyncer::CloudTaskInfo &info)
1325 {
1326 auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1327 if (syncObject.empty()) {
1328 QuerySyncObject querySyncObject(option.query);
1329 info.table = querySyncObject.GetRelationTableNames();
1330 for (const auto &item : info.table) {
1331 QuerySyncObject object(Query::Select());
1332 object.SetTableName(item);
1333 info.queryList.push_back(object);
1334 }
1335 } else {
1336 for (auto &item : syncObject) {
1337 info.table.push_back(item.GetRelationTableName());
1338 info.queryList.push_back(std::move(item));
1339 }
1340 }
1341 info.devices = option.devices;
1342 info.mode = option.mode;
1343 info.callback = onProcess;
1344 info.timeout = option.waitTime;
1345 info.priorityTask = option.priorityTask;
1346 info.compensatedTask = option.compensatedSyncOnly;
1347 info.priorityLevel = option.priorityLevel;
1348 info.users.emplace_back("");
1349 info.lockAction = option.lockAction;
1350 info.merge = option.merge;
1351 info.storeId = sqliteStorageEngine_->GetRelationalProperties().GetStringProp(DBProperties::STORE_ID, "");
1352 info.prepareTraceId = option.prepareTraceId;
1353 info.asyncDownloadAssets = option.asyncDownloadAssets;
1354 }
1355 #endif
1356
SetTrackerTable(const TrackerSchema & trackerSchema)1357 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1358 {
1359 TableInfo tableInfo;
1360 bool isFirstCreate = false;
1361 bool isNoTableInSchema = false;
1362 int errCode = CheckTrackerTable(trackerSchema, tableInfo, isNoTableInSchema, isFirstCreate);
1363 if (errCode != E_OK) {
1364 if (errCode != -E_IGNORE_DATA) {
1365 return errCode;
1366 }
1367 auto *handle = GetHandle(true, errCode);
1368 if (handle != nullptr) {
1369 handle->CheckAndCreateTrigger(tableInfo);
1370 // Try clear historical mismatched log, which usually do not occur and apply to tracker table only.
1371 if (isNoTableInSchema) {
1372 handle->ClearLogOfMismatchedData(trackerSchema.tableName);
1373 }
1374 handle->RecoverNullExtendLog(trackerSchema, tableInfo.GetTrackerTable());
1375 ReleaseHandle(handle);
1376 }
1377 CleanDirtyLogIfNeed(trackerSchema.tableName);
1378 return E_OK;
1379 }
1380 errCode = sqliteStorageEngine_->UpdateExtendField(trackerSchema);
1381 if (errCode != E_OK) {
1382 LOGE("[RelationalStore] update [%s [%zu]] extend_field failed: %d",
1383 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1384 return errCode;
1385 }
1386 if (isNoTableInSchema) {
1387 errCode = sqliteStorageEngine_->SetTrackerTable(trackerSchema, tableInfo, isFirstCreate);
1388 if (errCode == E_OK) {
1389 CleanDirtyLogIfNeed(trackerSchema.tableName);
1390 }
1391 return errCode;
1392 }
1393 sqliteStorageEngine_->CacheTrackerSchema(trackerSchema);
1394 errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1395 if (errCode != E_OK) {
1396 LOGE("[RelationalStore] create distributed table of [%s [%zu]] failed: %d",
1397 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1398 return errCode;
1399 }
1400 return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1401 }
1402
CheckTrackerTable(const TrackerSchema & trackerSchema,TableInfo & table,bool & isNoTableInSchema,bool & isFirstCreate)1403 int SQLiteRelationalStore::CheckTrackerTable(const TrackerSchema &trackerSchema, TableInfo &table,
1404 bool &isNoTableInSchema, bool &isFirstCreate)
1405 {
1406 const RelationalSchemaObject tracker = sqliteStorageEngine_->GetTrackerSchema();
1407 isFirstCreate = tracker.GetTrackerTable(trackerSchema.tableName).GetTableName().empty();
1408 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1409 table = localSchema.GetTable(trackerSchema.tableName);
1410 TrackerTable trackerTable;
1411 trackerTable.Init(trackerSchema);
1412 int errCode = E_OK;
1413 auto *handle = GetHandle(true, errCode);
1414 if (handle == nullptr) {
1415 return errCode;
1416 }
1417 ResFinalizer finalizer([this, handle]() {
1418 SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle;
1419 ReleaseHandle(releaseHandle);
1420 });
1421 bool isOnceDrop = false;
1422 (void)handle->IsTableOnceDropped(trackerSchema.tableName, isOnceDrop);
1423 if (table.Empty()) {
1424 isNoTableInSchema = true;
1425 table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
1426 errCode = handle->AnalysisTrackerTable(trackerTable, table);
1427 if (errCode != E_OK) {
1428 LOGE("[CheckTrackerTable] analysis table schema failed %d.", errCode);
1429 return errCode;
1430 }
1431 } else {
1432 table.SetTrackerTable(trackerTable);
1433 errCode = table.CheckTrackerTable();
1434 if (errCode != E_OK) {
1435 LOGE("[CheckTrackerTable] check tracker table schema failed. %d", errCode);
1436 return errCode;
1437 }
1438 }
1439 if (!trackerSchema.isForceUpgrade && !tracker.GetTrackerTable(trackerSchema.tableName).IsChanging(trackerSchema)
1440 && !isOnceDrop) {
1441 LOGW("[CheckTrackerTable] tracker schema is no change, table[%s [%zu]]",
1442 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size());
1443 return -E_IGNORE_DATA;
1444 }
1445 return E_OK;
1446 }
1447
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1448 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1449 {
1450 if (condition.sql.empty()) {
1451 LOGE("[RelationalStore] execute sql is empty.");
1452 return -E_INVALID_ARGS;
1453 }
1454 return sqliteStorageEngine_->ExecuteSql(condition, records);
1455 }
1456
CleanWaterMarkInner(SQLiteSingleVerRelationalStorageExecutor * & handle,const std::set<std::string> & clearWaterMarkTable)1457 int SQLiteRelationalStore::CleanWaterMarkInner(SQLiteSingleVerRelationalStorageExecutor *&handle,
1458 const std::set<std::string> &clearWaterMarkTable)
1459 {
1460 int errCode = E_OK;
1461 for (const auto &tableName : clearWaterMarkTable) {
1462 std::string cloudWaterMark;
1463 Value blobMetaVal;
1464 errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1465 if (errCode != E_OK) {
1466 LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1467 return errCode;
1468 }
1469 errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1470 if (errCode != E_OK) {
1471 LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1472 return errCode;
1473 }
1474 errCode = handle->CleanUploadFinishedFlag(tableName);
1475 if (errCode != E_OK) {
1476 LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1477 return errCode;
1478 }
1479 }
1480 #ifdef USE_DISTRIBUTEDDB_CLOUD
1481 errCode = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTable);
1482 if (errCode != E_OK) {
1483 LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1484 }
1485 #endif
1486 return errCode;
1487 }
1488
CleanWaterMark(const std::set<std::string> clearWaterMarkTables)1489 std::function<int(void)> SQLiteRelationalStore::CleanWaterMark(const std::set<std::string> clearWaterMarkTables)
1490 {
1491 return [this, clearWaterMarkTables]()->int {
1492 int errCode = E_OK;
1493 auto *handle = GetHandle(true, errCode);
1494 if (handle == nullptr) {
1495 LOGE("[SQLiteRelationalStore] get handle failed:%d", errCode);
1496 return errCode;
1497 }
1498 storageEngine_->SetReusedHandle(handle);
1499 errCode = CleanWaterMarkInner(handle, clearWaterMarkTables);
1500 storageEngine_->SetReusedHandle(nullptr);
1501 ReleaseHandle(handle);
1502 if (errCode != E_OK) {
1503 LOGE("[SQLiteRelationalStore] SetReference clear water mark failed, errCode = %d", errCode);
1504 }
1505 LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1506 return errCode;
1507 };
1508 }
1509
SetReferenceInner(const std::vector<TableReferenceProperty> & tableReferenceProperty,std::set<std::string> & clearWaterMarkTables)1510 int SQLiteRelationalStore::SetReferenceInner(const std::vector<TableReferenceProperty> &tableReferenceProperty,
1511 std::set<std::string> &clearWaterMarkTables)
1512 {
1513 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1514 int errCode = GetHandleAndStartTransaction(handle);
1515 if (errCode != E_OK) {
1516 LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1517 return errCode;
1518 }
1519 RelationalSchemaObject schema;
1520 errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1521 if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1522 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1523 (void)handle->Rollback();
1524 ReleaseHandle(handle);
1525 return errCode;
1526 }
1527
1528 int ret = handle->Commit();
1529 ReleaseHandle(handle);
1530 if (ret == E_OK) {
1531 sqliteStorageEngine_->SetSchema(schema);
1532 return errCode;
1533 }
1534 LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1535 return ret;
1536 }
1537
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1538 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1539 {
1540 std::set<std::string> clearWaterMarkTables;
1541 int errCode = SetReferenceInner(tableReferenceProperty, clearWaterMarkTables);
1542 if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1543 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1544 return errCode;
1545 }
1546 std::function<int(void)> clearWaterMarkFunc = [] { return E_OK; };
1547 if (!clearWaterMarkTables.empty()) {
1548 clearWaterMarkFunc = CleanWaterMark(clearWaterMarkTables);
1549 }
1550 if (errCode == -E_TABLE_REFERENCE_CHANGED) {
1551 int ret = E_OK;
1552 #ifdef USE_DISTRIBUTEDDB_CLOUD
1553 ret = cloudSyncer_->StopSyncTask(clearWaterMarkFunc);
1554 #endif
1555 return ret != E_OK ? ret : errCode;
1556 }
1557 return E_OK;
1558 }
1559
InitTrackerSchemaFromMeta()1560 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1561 {
1562 int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1563 return errCode == -E_NOT_FOUND ? E_OK : errCode;
1564 }
1565
CleanTrackerData(const std::string & tableName,int64_t cursor)1566 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1567 {
1568 if (tableName.empty()) {
1569 return -E_INVALID_ARGS;
1570 }
1571 return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1572 }
1573
ExecuteCreateSharedTable(const DataBaseSchema & schema)1574 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1575 {
1576 if (sqliteStorageEngine_ == nullptr) {
1577 LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1578 return -E_INVALID_DB;
1579 }
1580 std::vector<std::string> deleteTableNames;
1581 std::map<std::string, std::vector<Field>> updateTableNames;
1582 std::map<std::string, std::string> alterTableNames;
1583 if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1584 LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1585 return -E_INVALID_ARGS;
1586 }
1587 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1588 // upgrade contains delete, alter, update and create
1589 int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1590 if (errCode != E_OK) {
1591 LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1592 } else {
1593 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1594 }
1595 return errCode;
1596 }
1597
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1598 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1599 CloudSyncer::CloudTaskInfo &info)
1600 {
1601 if (info.priorityTask && actualTable.size() != info.table.size()) {
1602 LOGE("[RelationalStore] Not support regenerate table with priority task");
1603 return -E_NOT_SUPPORT;
1604 }
1605 if (actualTable.size() == info.table.size()) {
1606 return E_OK;
1607 }
1608 LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1609 info.table = actualTable;
1610 info.queryList.clear();
1611 for (const auto &item : info.table) {
1612 QuerySyncObject object(Query::Select());
1613 object.SetTableName(item);
1614 info.queryList.push_back(object);
1615 }
1616 return E_OK;
1617 }
1618
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1619 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1620 {
1621 if (cmd != LOGIC_DELETE_SYNC_DATA) {
1622 return -E_NOT_SUPPORT;
1623 }
1624 if (pragmaData == nullptr) {
1625 return -E_INVALID_ARGS;
1626 }
1627 auto logicDelete = *(static_cast<bool *>(pragmaData));
1628 if (storageEngine_ == nullptr) {
1629 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1630 return -E_INVALID_DB;
1631 }
1632 storageEngine_->SetLogicDelete(logicDelete);
1633 return E_OK;
1634 }
1635
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1636 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1637 const std::vector<VBucket> &records)
1638 {
1639 if (storageEngine_ == nullptr) {
1640 LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1641 return -E_INVALID_DB;
1642 }
1643 int errCode = CheckParamForUpsertData(status, tableName, records);
1644 if (errCode != E_OK) {
1645 return errCode;
1646 }
1647 return storageEngine_->UpsertData(status, tableName, records);
1648 }
1649
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1650 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1651 const std::vector<VBucket> &records)
1652 {
1653 if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1654 LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1655 return -E_INVALID_ARGS;
1656 }
1657 if (records.empty()) {
1658 LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1659 return -E_INVALID_ARGS;
1660 }
1661 size_t recordSize = records.size();
1662 if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1663 LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1664 return -E_MAX_LIMITS;
1665 }
1666 return CheckSchemaForUpsertData(tableName, records);
1667 }
1668
ChkTable(const TableInfo & table)1669 static int ChkTable(const TableInfo &table)
1670 {
1671 if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1672 LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1673 return -E_NOT_SUPPORT;
1674 }
1675 if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1676 return -E_NOT_FOUND;
1677 }
1678 return E_OK;
1679 }
1680
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1681 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1682 {
1683 if (tableName.empty()) {
1684 return -E_INVALID_ARGS;
1685 }
1686 auto schema = storageEngine_->GetSchemaInfo();
1687 auto table = schema.GetTable(tableName);
1688 int errCode = ChkTable(table);
1689 if (errCode != E_OK) {
1690 return errCode;
1691 }
1692 TableSchema cloudTableSchema;
1693 errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1694 if (errCode != E_OK) {
1695 LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1696 return errCode;
1697 }
1698 errCode = ChkSchema(tableName);
1699 if (errCode != E_OK) {
1700 return errCode;
1701 }
1702 std::set<std::string> dbPkFields;
1703 for (auto &field : table.GetIdentifyKey()) {
1704 dbPkFields.insert(field);
1705 }
1706 std::set<std::string> schemaFields;
1707 for (auto &fieldInfo : table.GetFieldInfos()) {
1708 schemaFields.insert(fieldInfo.GetFieldName());
1709 }
1710 for (const auto &record : records) {
1711 std::set<std::string> recordPkFields;
1712 for (const auto &item : record) {
1713 if (schemaFields.find(item.first) == schemaFields.end()) {
1714 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1715 return -E_INVALID_ARGS;
1716 }
1717 if (dbPkFields.find(item.first) == dbPkFields.end()) {
1718 continue;
1719 }
1720 recordPkFields.insert(item.first);
1721 }
1722 if (recordPkFields.size() != dbPkFields.size()) {
1723 LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1724 recordPkFields.size(), dbPkFields.size());
1725 return -E_INVALID_ARGS;
1726 }
1727 }
1728 return errCode;
1729 }
1730
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1731 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1732 {
1733 auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1734 if (engine == nullptr) {
1735 LOGE("[RelationalStore][Open] Create storage engine failed");
1736 return -E_OUT_OF_MEMORY;
1737 }
1738 sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1739 [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1740 RefObject::KillAndDecObjRef(releaseEngine);
1741 });
1742 return E_OK;
1743 }
1744
1745 #ifdef USE_DISTRIBUTEDDB_CLOUD
CheckCloudSchema(const DataBaseSchema & schema)1746 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1747 {
1748 if (storageEngine_ == nullptr) {
1749 LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1750 return -E_INVALID_DB;
1751 }
1752 std::shared_ptr<DataBaseSchema> cloudSchema;
1753 (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1754 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1755 for (const auto &tableSchema : schema.tables) {
1756 TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1757 if (tableInfo.Empty()) {
1758 continue;
1759 }
1760 if (tableInfo.GetSharedTableMark()) {
1761 LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1762 return -E_INVALID_ARGS;
1763 }
1764 }
1765 for (const auto &tableSchema : schema.tables) {
1766 if (cloudSchema == nullptr) {
1767 continue;
1768 }
1769 for (const auto &oldSchema : cloudSchema->tables) {
1770 if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1771 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1772 return -E_INVALID_ARGS;
1773 }
1774 }
1775 }
1776 return E_OK;
1777 }
1778
SetCloudSyncConfig(const CloudSyncConfig & config)1779 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1780 {
1781 if (storageEngine_ == nullptr) {
1782 LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1783 return -E_INVALID_DB;
1784 }
1785 storageEngine_->SetCloudSyncConfig(config);
1786 LOGI("[RelationalStore][SetCloudSyncConfig] value:[%" PRId32 ", %" PRId32 ", %" PRId32 ", %d]",
1787 config.maxUploadCount, config.maxUploadSize, config.maxRetryConflictTimes, config.isSupportEncrypt);
1788 return E_OK;
1789 }
1790
GetCloudTaskStatus(uint64_t taskId)1791 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1792 {
1793 return cloudSyncer_->GetCloudTaskStatus(taskId);
1794 }
1795 #endif
1796
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1797 int SQLiteRelationalStore::SetDistributedSchema(const DistributedSchema &schema, bool isForceUpgrade)
1798 {
1799 if (sqliteStorageEngine_ == nullptr || storageEngine_ == nullptr) {
1800 LOGE("[RelationalStore] engine was not initialized");
1801 return -E_INVALID_DB;
1802 }
1803 auto mode = sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode();
1804 std::string localIdentity; // collaboration mode need local identify
1805 if (mode == DistributedTableMode::COLLABORATION) {
1806 int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
1807 if (errCode != E_OK || localIdentity.empty()) {
1808 LOGW("Get local identity failed: %d", errCode);
1809 }
1810 }
1811 auto [errCode, isSchemaChange] = sqliteStorageEngine_->SetDistributedSchema(schema, localIdentity, isForceUpgrade);
1812 if (errCode != E_OK) {
1813 return errCode;
1814 }
1815 if (isSchemaChange) {
1816 LOGI("[RelationalStore] schema was changed by setting distributed schema");
1817 storageEngine_->NotifySchemaChanged();
1818 }
1819 return E_OK;
1820 }
1821
GetDownloadingAssetsCount(int32_t & count)1822 int SQLiteRelationalStore::GetDownloadingAssetsCount(int32_t &count)
1823 {
1824 std::vector<std::string> tableNameList = GetAllDistributedTableName(TableSyncType::CLOUD_COOPERATION);
1825 if (tableNameList.empty()) {
1826 return E_OK;
1827 }
1828
1829 int errCode = E_OK;
1830 SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(false, errCode);
1831 if (handle == nullptr) {
1832 return errCode;
1833 }
1834 for (const auto &tableName : tableNameList) {
1835 TableSchema tableSchema;
1836 int errCode = storageEngine_->GetCloudTableSchema(tableName, tableSchema);
1837 if (errCode != E_OK) {
1838 LOGE("[RelationalStore] Get schema failed when get download assets count, %d, tableName: %s, length: %zu",
1839 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1840 break;
1841 }
1842 errCode = handle->GetDownloadingAssetsCount(tableSchema, count);
1843 if (errCode != E_OK) {
1844 LOGE("[RelationalStore] Get download assets count failed: %d, tableName: %s, length: %zu",
1845 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1846 break;
1847 }
1848 }
1849 ReleaseHandle(handle);
1850 return errCode;
1851 }
1852
SetTableMode(DistributedTableMode tableMode)1853 int SQLiteRelationalStore::SetTableMode(DistributedTableMode tableMode)
1854 {
1855 if (sqliteStorageEngine_ == nullptr) {
1856 LOGE("[RelationalStore][SetTableMode] sqliteStorageEngine was not initialized");
1857 return -E_INVALID_DB;
1858 }
1859 if (sqliteStorageEngine_->GetRelationalProperties().GetDistributedTableMode() != tableMode) {
1860 auto schema = sqliteStorageEngine_->GetSchema();
1861 for (const auto &tableMap : schema.GetTables()) {
1862 if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1863 LOGW("[RelationalStore][SetTableMode] Can not set table mode for table %s[%zu]",
1864 DBCommon::StringMiddleMasking(tableMap.first).c_str(), tableMap.first.size());
1865 return -E_NOT_SUPPORT;
1866 }
1867 }
1868 }
1869 RelationalDBProperties properties = sqliteStorageEngine_->GetRelationalProperties();
1870 properties.SetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(tableMode));
1871 sqliteStorageEngine_->SetProperties(properties);
1872 LOGI("[RelationalStore][SetTableMode] Set table mode to %d successful", static_cast<int>(tableMode));
1873 return E_OK;
1874 }
1875
OperateDataStatus(uint32_t dataOperator)1876 int SQLiteRelationalStore::OperateDataStatus(uint32_t dataOperator)
1877 {
1878 LOGI("[RelationalStore] OperateDataStatus %" PRIu32, dataOperator);
1879 if ((dataOperator & static_cast<uint32_t>(DataOperator::UPDATE_TIME)) == 0) {
1880 return E_OK;
1881 }
1882 if (sqliteStorageEngine_ == nullptr) {
1883 LOGE("[RelationalStore] sqliteStorageEngine was not initialized when operate data status");
1884 return -E_INVALID_DB;
1885 }
1886 if (syncAbleEngine_ == nullptr) {
1887 LOGE("[RelationalStore] syncAbleEngine was not initialized when operate data status");
1888 return -E_INVALID_DB;
1889 }
1890 auto virtualTime = syncAbleEngine_->GetTimestamp();
1891 auto schema = sqliteStorageEngine_->GetSchema();
1892 std::vector<std::string> operateTables;
1893 for (const auto &tableMap : schema.GetTables()) {
1894 if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1895 operateTables.push_back(tableMap.second.GetTableName());
1896 }
1897 }
1898 return OperateDataStatusInner(operateTables, virtualTime);
1899 }
1900
OperateDataStatusInner(const std::vector<std::string> & tables,uint64_t virtualTime) const1901 int SQLiteRelationalStore::OperateDataStatusInner(const std::vector<std::string> &tables, uint64_t virtualTime) const
1902 {
1903 int errCode = E_OK;
1904 SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(true, errCode);
1905 if (handle == nullptr) {
1906 LOGE("[RelationalStore][OperateDataStatus] Get handle failed %d when operate data status", errCode);
1907 return errCode;
1908 }
1909 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1910 if (errCode != E_OK) {
1911 LOGE("[RelationalStore][OperateDataStatus] Start transaction failed %d when operate data status", errCode);
1912 ReleaseHandle(handle);
1913 return errCode;
1914 }
1915 sqlite3 *db = nullptr;
1916 errCode = handle->GetDbHandle(db);
1917 if (errCode != E_OK) {
1918 LOGE("[RelationalStore][OperateDataStatus] Get db failed %d when operate data status", errCode);
1919 (void)handle->Rollback();
1920 ReleaseHandle(handle);
1921 return errCode;
1922 }
1923 errCode = SQLiteRelationalUtils::OperateDataStatus(db, tables, virtualTime);
1924 if (errCode == E_OK) {
1925 errCode = handle->Commit();
1926 if (errCode != E_OK) {
1927 LOGE("[RelationalStore][OperateDataStatus] Commit failed %d when operate data status", errCode);
1928 }
1929 } else {
1930 int ret = handle->Rollback();
1931 if (ret != E_OK) {
1932 LOGE("[RelationalStore][OperateDataStatus] Rollback failed %d when operate data status", ret);
1933 }
1934 }
1935 ReleaseHandle(handle);
1936 return errCode;
1937 }
1938
GetDeviceSyncTaskCount() const1939 int32_t SQLiteRelationalStore::GetDeviceSyncTaskCount() const
1940 {
1941 if (syncAbleEngine_ == nullptr) {
1942 LOGW("[RelationalStore] syncAbleEngine was not initialized when get device sync task count");
1943 return 0;
1944 }
1945 return syncAbleEngine_->GetDeviceSyncTaskCount();
1946 }
1947
CleanDirtyLogIfNeed(const std::string & tableName) const1948 void SQLiteRelationalStore::CleanDirtyLogIfNeed(const std::string &tableName) const
1949 {
1950 int errCode = E_OK;
1951 SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(true, errCode);
1952 if (handle == nullptr) {
1953 LOGW("[RDBStore][ClearDirtyLog] Get handle failed %d", errCode);
1954 return;
1955 }
1956 ResFinalizer finalizer([this, handle]() {
1957 SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle;
1958 ReleaseHandle(releaseHandle);
1959 });
1960 sqlite3 *db = nullptr;
1961 errCode = handle->GetDbHandle(db);
1962 if (errCode != E_OK) {
1963 LOGW("[RDBStore][ClearDirtyLog] Get db handle failed %d", errCode);
1964 return;
1965 }
1966 bool isExistDirtyLog = false;
1967 std::tie(errCode, isExistDirtyLog) = SQLiteRelationalUtils::CheckExistDirtyLog(db, tableName);
1968 if (errCode != E_OK) {
1969 LOGW("[RDBStore][ClearDirtyLog] Check dirty log failed %d", errCode);
1970 return;
1971 }
1972 if (!isExistDirtyLog) {
1973 return;
1974 }
1975 auto obj = GetSchemaObj();
1976 if (!obj.IsSchemaValid()) {
1977 return;
1978 }
1979 errCode = SQLiteRelationalUtils::CleanDirtyLog(db, tableName, obj);
1980 if (errCode != E_OK) {
1981 LOGW("[RDBStore][ClearDirtyLog] Clean dirty log failed %d", errCode);
1982 }
1983 }
1984
GetSchemaObj() const1985 RelationalSchemaObject SQLiteRelationalStore::GetSchemaObj() const
1986 {
1987 if (storageEngine_ == nullptr) {
1988 return {};
1989 }
1990 return storageEngine_->GetSchemaInfo();
1991 }
1992 } // namespace DistributedDB
1993 #endif
1994