1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "sqlite_log_table_manager.h"
26 #include "sqlite_relational_store_connection.h"
27 #include "storage_engine_manager.h"
28 #include "cloud_sync_utils.h"
29
30 namespace DistributedDB {
31 namespace {
32 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
33 }
34
~SQLiteRelationalStore()35 SQLiteRelationalStore::~SQLiteRelationalStore()
36 {
37 sqliteStorageEngine_ = nullptr;
38 }
39
40 // Called when a new connection created.
IncreaseConnectionCounter()41 void SQLiteRelationalStore::IncreaseConnectionCounter()
42 {
43 connectionCount_.fetch_add(1, std::memory_order_seq_cst);
44 if (connectionCount_.load() > 0) {
45 sqliteStorageEngine_->SetConnectionFlag(true);
46 }
47 }
48
GetDBConnection(int & errCode)49 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
50 {
51 std::lock_guard<std::mutex> lock(connectMutex_);
52 RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
53 if (connection == nullptr) {
54 errCode = -E_OUT_OF_MEMORY;
55 return nullptr;
56 }
57 IncObjRef(this);
58 IncreaseConnectionCounter();
59 return connection;
60 }
61
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)62 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
63 {
64 option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
65 option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
66 if (properties.IsEncrypted()) {
67 option.cipherType = properties.GetCipherType();
68 option.passwd = properties.GetPasswd();
69 option.iterTimes = properties.GetIterTimes();
70 }
71 }
72
InitStorageEngine(const RelationalDBProperties & properties)73 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
74 {
75 OpenDbProperties option;
76 InitDataBaseOption(properties, option);
77 std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
78
79 StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
80 int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
81 if (errCode != E_OK) {
82 LOGE("Init the sqlite storage engine failed:%d", errCode);
83 }
84 return errCode;
85 }
86
ReleaseResources()87 void SQLiteRelationalStore::ReleaseResources()
88 {
89 if (sqliteStorageEngine_ != nullptr) {
90 sqliteStorageEngine_->ClearEnginePasswd();
91 sqliteStorageEngine_ = nullptr;
92 }
93 #ifdef USE_DISTRIBUTEDDB_CLOUD
94 if (cloudSyncer_ != nullptr) {
95 cloudSyncer_->Close();
96 RefObject::KillAndDecObjRef(cloudSyncer_);
97 cloudSyncer_ = nullptr;
98 }
99 #endif
100 RefObject::DecObjRef(storageEngine_);
101 }
102
CheckDBMode()103 int SQLiteRelationalStore::CheckDBMode()
104 {
105 int errCode = E_OK;
106 auto *handle = GetHandle(true, errCode);
107 if (handle == nullptr) {
108 return errCode;
109 }
110 errCode = handle->CheckDBModeForRelational();
111 if (errCode != E_OK) {
112 LOGE("check relational DB mode failed. %d", errCode);
113 }
114
115 ReleaseHandle(handle);
116 return errCode;
117 }
118
GetSchemaFromMeta(RelationalSchemaObject & schema)119 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
120 {
121 Key schemaKey;
122 DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
123 Value schemaVal;
124 int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
125 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
126 LOGE("Get relational schema from meta table failed. %d", errCode);
127 return errCode;
128 } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
129 LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
130 return -E_NOT_FOUND;
131 }
132
133 std::string schemaStr;
134 DBCommon::VectorToString(schemaVal, schemaStr);
135 errCode = schema.ParseFromSchemaString(schemaStr);
136 if (errCode != E_OK) {
137 LOGE("Parse schema string from meta table failed.");
138 return errCode;
139 }
140
141 sqliteStorageEngine_->SetSchema(schema);
142 return E_OK;
143 }
144
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)145 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
146 {
147 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
148 Value modeVal;
149 int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
150 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
151 LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
152 return errCode;
153 }
154
155 DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
156 if (!modeVal.empty()) {
157 std::string value(modeVal.begin(), modeVal.end());
158 orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
159 } else if (isUnSet) {
160 return E_OK; // First set table mode.
161 }
162
163 if (orgMode == DistributedTableMode::COLLABORATION && orgMode != mode) {
164 LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
165 return -E_INVALID_ARGS;
166 }
167 return E_OK;
168 }
169
CheckProperties(RelationalDBProperties properties)170 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
171 {
172 RelationalSchemaObject schema;
173 int errCode = GetSchemaFromMeta(schema);
174 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
175 LOGE("Get relational schema from meta failed. errcode=%d", errCode);
176 return errCode;
177 }
178 int ret = InitTrackerSchemaFromMeta();
179 if (ret != E_OK) {
180 LOGE("Init tracker schema from meta failed. errcode=%d", ret);
181 return ret;
182 }
183 properties.SetSchema(schema);
184
185 // Empty schema means no distributed table has been used, we may set DB to any table mode
186 // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
187 bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
188 auto mode = properties.GetDistributedTableMode();
189 errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
190 if (errCode != E_OK) {
191 LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
192 return errCode;
193 }
194 if (!isSchemaEmpty) {
195 return errCode;
196 }
197
198 errCode = SaveTableModeToMeta(mode);
199 if (errCode != E_OK) {
200 LOGE("Save table mode to meta failed. errCode=%d", errCode);
201 return errCode;
202 }
203
204 return E_OK;
205 }
206
SaveTableModeToMeta(DistributedTableMode mode)207 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
208 {
209 const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
210 Value modeVal;
211 DBCommon::StringToVector(std::to_string(static_cast<int>(mode)), modeVal);
212 int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
213 if (errCode != E_OK) {
214 LOGE("Save relational schema to meta table failed. %d", errCode);
215 }
216 return errCode;
217 }
218
SaveLogTableVersionToMeta()219 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
220 {
221 LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
222 const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY.begin(), DBConstant::LOG_TABLE_VERSION_KEY.end());
223 Value logVersion;
224 int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
225 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
226 LOGE("Get log version from meta table failed. %d", errCode);
227 return errCode;
228 }
229 std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
230 Value logVersionVal(versionStr.begin(), versionStr.end());
231 // log version is same, no need to update
232 if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
233 return errCode;
234 }
235 // If the log version does not exist or is different, update the log version
236 errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
237 if (errCode != E_OK) {
238 LOGE("save log table version to meta table failed. %d", errCode);
239 }
240 return errCode;
241 }
242
CleanDistributedDeviceTable()243 int SQLiteRelationalStore::CleanDistributedDeviceTable()
244 {
245 std::vector<std::string> missingTables;
246 int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
247 if (errCode != E_OK) {
248 LOGE("Clean distributed device table failed. %d", errCode);
249 }
250 for (const auto &deviceTableName : missingTables) {
251 std::string deviceHash;
252 std::string tableName;
253 DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
254 syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
255 if (errCode != E_OK) {
256 LOGE("Erase water mark failed:%d", errCode);
257 return errCode;
258 }
259 }
260 return errCode;
261 }
262
Open(const RelationalDBProperties & properties)263 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
264 {
265 std::lock_guard<std::mutex> lock(initalMutex_);
266 if (isInitialized_) {
267 LOGD("[RelationalStore][Open] relational db was already initialized.");
268 return E_OK;
269 }
270 int errCode = InitSQLiteStorageEngine(properties);
271 if (errCode != E_OK) {
272 return errCode;
273 }
274
275 do {
276 errCode = InitStorageEngine(properties);
277 if (errCode != E_OK) {
278 LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
279 break;
280 }
281
282 storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
283 if (storageEngine_ == nullptr) {
284 LOGE("[RelationalStore][Open] Create syncable storage failed");
285 errCode = -E_OUT_OF_MEMORY;
286 break;
287 }
288
289 syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
290 // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
291 // be destructed when close store
292 storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
293 #ifdef USE_DISTRIBUTEDDB_CLOUD
294 cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_), false);
295 #endif
296 errCode = CheckDBMode();
297 if (errCode != E_OK) {
298 break;
299 }
300
301 errCode = CheckProperties(properties);
302 if (errCode != E_OK) {
303 break;
304 }
305
306 errCode = SaveLogTableVersionToMeta();
307 if (errCode != E_OK) {
308 break;
309 }
310
311 errCode = CleanDistributedDeviceTable();
312 if (errCode != E_OK) {
313 break;
314 }
315
316 isInitialized_ = true;
317 return E_OK;
318 } while (false);
319
320 ReleaseResources();
321 return errCode;
322 }
323
OnClose(const std::function<void (void)> & notifier)324 void SQLiteRelationalStore::OnClose(const std::function<void(void)> ¬ifier)
325 {
326 AutoLock lockGuard(this);
327 if (notifier) {
328 closeNotifiers_.push_back(notifier);
329 } else {
330 LOGW("Register 'Close()' notifier failed, notifier is null.");
331 }
332 }
333
GetHandle(bool isWrite,int & errCode) const334 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
335 {
336 if (sqliteStorageEngine_ == nullptr) {
337 errCode = -E_INVALID_DB;
338 return nullptr;
339 }
340
341 return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
342 sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
343 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const344 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
345 {
346 if (handle == nullptr) {
347 return;
348 }
349
350 if (sqliteStorageEngine_ != nullptr) {
351 StorageExecutor *databaseHandle = handle;
352 sqliteStorageEngine_->Recycle(databaseHandle);
353 handle = nullptr;
354 }
355 }
356
Sync(const ISyncer::SyncParma & syncParam,uint64_t connectionId)357 int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId)
358 {
359 return syncAbleEngine_->Sync(syncParam, connectionId);
360 }
361
362 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)363 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
364 {
365 int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
366 if (count <= 0) {
367 LOGF("Decrease db connection counter failed, count <= 0.");
368 return;
369 }
370 if (storageEngine_ != nullptr) {
371 storageEngine_->EraseDataChangeCallback(connectionId);
372 }
373 if (count != 1) {
374 return;
375 }
376
377 LockObj();
378 auto notifiers = std::move(closeNotifiers_);
379 UnlockObj();
380 for (const auto ¬ifier : notifiers) {
381 if (notifier) {
382 notifier();
383 }
384 }
385
386 // Sync Close
387 syncAbleEngine_->Close();
388
389 #ifdef USE_DISTRIBUTEDDB_CLOUD
390 if (cloudSyncer_ != nullptr) {
391 cloudSyncer_->Close();
392 RefObject::KillAndDecObjRef(cloudSyncer_);
393 cloudSyncer_ = nullptr;
394 }
395 #endif
396
397 if (sqliteStorageEngine_ != nullptr) {
398 sqliteStorageEngine_ = nullptr;
399 }
400 {
401 if (storageEngine_ != nullptr) {
402 storageEngine_->RegisterHeartBeatListener(nullptr);
403 }
404 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
405 StopLifeCycleTimer();
406 lifeCycleNotifier_ = nullptr;
407 }
408 // close will dec sync ref of storageEngine_
409 DecObjRef(storageEngine_);
410 }
411
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)412 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
413 {
414 if (connectionCount_.load() == 1) {
415 sqliteStorageEngine_->SetConnectionFlag(false);
416 }
417
418 connectMutex_.lock();
419 if (connection != nullptr) {
420 KillAndDecObjRef(connection);
421 DecreaseConnectionCounter(connectionId);
422 connectMutex_.unlock();
423 KillAndDecObjRef(this);
424 } else {
425 connectMutex_.unlock();
426 }
427 }
428
WakeUpSyncer()429 void SQLiteRelationalStore::WakeUpSyncer()
430 {
431 syncAbleEngine_->WakeUpSyncer();
432 }
433
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)434 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
435 bool trackerSchemaChanged)
436 {
437 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
438 TableInfo tableInfo = localSchema.GetTable(tableName);
439 if (!tableInfo.Empty()) {
440 bool isSharedTable = tableInfo.GetSharedTableMark();
441 if (isSharedTable && !trackerSchemaChanged) {
442 return E_OK; // shared table will create distributed table when use SetCloudDbSchema
443 }
444 }
445
446 bool schemaChanged = false;
447 int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(""),
448 schemaChanged, syncType, trackerSchemaChanged);
449 if (errCode != E_OK) {
450 LOGE("Create distributed table failed. %d", errCode);
451 }
452 if (schemaChanged) {
453 LOGD("Notify schema changed.");
454 storageEngine_->NotifySchemaChanged();
455 }
456 return errCode;
457 }
458
459 #ifdef USE_DISTRIBUTEDDB_CLOUD
GetCloudSyncTaskCount()460 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
461 {
462 if (cloudSyncer_ == nullptr) {
463 LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
464 return -1;
465 }
466 return cloudSyncer_->GetCloudSyncTaskCount();
467 }
468
CleanCloudData(ClearMode mode)469 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
470 {
471 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
472 TableInfoMap tables = localSchema.GetTables();
473 std::vector<std::string> cloudTableNameList;
474 for (const auto &tableInfo : tables) {
475 bool isSharedTable = tableInfo.second.GetSharedTableMark();
476 if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
477 continue;
478 }
479 if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
480 cloudTableNameList.push_back(tableInfo.first);
481 }
482 }
483 if (cloudTableNameList.empty()) {
484 LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
485 return E_OK;
486 }
487 if (cloudSyncer_ == nullptr) {
488 LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
489 return -E_INVALID_DB;
490 }
491 int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
492 if (errCode != E_OK) {
493 LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
494 }
495
496 return errCode;
497 }
498 #endif
499
RemoveDeviceData()500 int SQLiteRelationalStore::RemoveDeviceData()
501 {
502 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
503 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
504 if (mode == DistributedTableMode::COLLABORATION) {
505 LOGE("Not support remove all device data in collaboration mode.");
506 return -E_NOT_SUPPORT;
507 }
508
509 std::vector<std::string> tableNameList = GetAllDistributedTableName();
510 if (tableNameList.empty()) {
511 return E_OK;
512 }
513 // erase watermark first
514 int errCode = EraseAllDeviceWatermark(tableNameList);
515 if (errCode != E_OK) {
516 LOGE("remove watermark failed %d", errCode);
517 return errCode;
518 }
519 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
520 errCode = GetHandleAndStartTransaction(handle);
521 if (handle == nullptr) {
522 return errCode;
523 }
524
525 for (const auto &table : tableNameList) {
526 errCode = handle->DeleteDistributedDeviceTable("", table);
527 if (errCode != E_OK) {
528 LOGE("delete device data failed. %d", errCode);
529 break;
530 }
531
532 errCode = handle->DeleteDistributedAllDeviceTableLog(table);
533 if (errCode != E_OK) {
534 LOGE("delete device data failed. %d", errCode);
535 break;
536 }
537 }
538
539 if (errCode != E_OK) {
540 (void)handle->Rollback();
541 ReleaseHandle(handle);
542 return errCode;
543 }
544
545 errCode = handle->Commit();
546 ReleaseHandle(handle);
547 storageEngine_->NotifySchemaChanged();
548 return errCode;
549 }
550
RemoveDeviceData(const std::string & device,const std::string & tableName)551 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
552 {
553 auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
554 RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(DistributedTableMode::SPLIT_BY_DEVICE)));
555 if (mode == DistributedTableMode::COLLABORATION) {
556 LOGE("Not support remove device data in collaboration mode.");
557 return -E_NOT_SUPPORT;
558 }
559
560 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
561 auto iter = tables.find(tableName);
562 if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
563 LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
564 return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
565 }
566 // cloud mode is not permit
567 if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
568 LOGE("Remove device data with cloud sync table name.");
569 return -E_NOT_SUPPORT;
570 }
571 bool isNeedHash = false;
572 std::string hashDeviceId;
573 int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
574 if (errCode == -E_NOT_SUPPORT) {
575 isNeedHash = true;
576 hashDeviceId = device;
577 errCode = E_OK;
578 }
579 if (errCode != E_OK) {
580 return errCode;
581 }
582 if (isNeedHash) {
583 // check device is uuid in meta
584 std::set<std::string> hashDevices;
585 errCode = GetExistDevices(hashDevices);
586 if (errCode != E_OK) {
587 return errCode;
588 }
589 if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
590 LOGD("[SQLiteRelationalStore] not match device, just return");
591 return E_OK;
592 }
593 }
594 return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
595 }
596
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)597 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
598 const RelationalObserverAction &action)
599 {
600 return storageEngine_->RegisterObserverAction(connectionId, observer, action);
601 }
602
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)603 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
604 {
605 return storageEngine_->UnRegisterObserverAction(connectionId, observer);
606 }
607
StopLifeCycleTimer()608 int SQLiteRelationalStore::StopLifeCycleTimer()
609 {
610 auto runtimeCxt = RuntimeContext::GetInstance();
611 if (runtimeCxt == nullptr) {
612 return -E_INVALID_ARGS;
613 }
614 if (lifeTimerId_ != 0) {
615 TimerId timerId = lifeTimerId_;
616 lifeTimerId_ = 0;
617 runtimeCxt->RemoveTimer(timerId, false);
618 }
619 return E_OK;
620 }
621
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)622 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier)
623 {
624 auto runtimeCxt = RuntimeContext::GetInstance();
625 if (runtimeCxt == nullptr) {
626 return -E_INVALID_ARGS;
627 }
628 RefObject::IncObjRef(this);
629 TimerId timerId = 0;
630 int errCode = runtimeCxt->SetTimer(
631 DBConstant::DEF_LIFE_CYCLE_TIME,
632 [this](TimerId id) -> int {
633 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
634 if (lifeCycleNotifier_) {
635 // normal identifier mode
636 std::string identifier;
637 if (sqliteStorageEngine_->GetProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
638 identifier = sqliteStorageEngine_->GetProperties().GetStringProp(
639 DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
640 } else {
641 identifier = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
642 }
643 auto userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
644 lifeCycleNotifier_(identifier, userId);
645 }
646 return 0;
647 },
648 [this]() {
649 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
650 if (ret != E_OK) {
651 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
652 }
653 },
654 timerId);
655 if (errCode != E_OK) {
656 lifeTimerId_ = 0;
657 LOGE("SetTimer failed:%d", errCode);
658 RefObject::DecObjRef(this);
659 return errCode;
660 }
661
662 lifeCycleNotifier_ = notifier;
663 lifeTimerId_ = timerId;
664 return E_OK;
665 }
666
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)667 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier)
668 {
669 int errCode;
670 {
671 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
672 if (lifeTimerId_ != 0) {
673 errCode = StopLifeCycleTimer();
674 if (errCode != E_OK) {
675 LOGE("Stop the life cycle timer failed:%d", errCode);
676 return errCode;
677 }
678 }
679
680 if (!notifier) {
681 return E_OK;
682 }
683 errCode = StartLifeCycleTimer(notifier);
684 if (errCode != E_OK) {
685 LOGE("Register life cycle timer failed:%d", errCode);
686 return errCode;
687 }
688 }
689 auto listener = [this] { HeartBeat(); };
690 storageEngine_->RegisterHeartBeatListener(listener);
691 return errCode;
692 }
693
HeartBeat()694 void SQLiteRelationalStore::HeartBeat()
695 {
696 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
697 int errCode = ResetLifeCycleTimer();
698 if (errCode != E_OK) {
699 LOGE("Heart beat for life cycle failed:%d", errCode);
700 }
701 }
702
ResetLifeCycleTimer()703 int SQLiteRelationalStore::ResetLifeCycleTimer()
704 {
705 if (lifeTimerId_ == 0) {
706 return E_OK;
707 }
708 auto lifeNotifier = lifeCycleNotifier_;
709 lifeCycleNotifier_ = nullptr;
710 int errCode = StopLifeCycleTimer();
711 if (errCode != E_OK) {
712 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
713 }
714 return StartLifeCycleTimer(lifeNotifier);
715 }
716
GetStorePath() const717 std::string SQLiteRelationalStore::GetStorePath() const
718 {
719 return sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
720 }
721
GetProperties() const722 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
723 {
724 return sqliteStorageEngine_->GetProperties();
725 }
726
StopSync(uint64_t connectionId)727 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
728 {
729 return syncAbleEngine_->StopSync(connectionId);
730 }
731
Dump(int fd)732 void SQLiteRelationalStore::Dump(int fd)
733 {
734 std::string userId = "";
735 std::string appId = "";
736 std::string storeId = "";
737 std::string label = "";
738 if (sqliteStorageEngine_ != nullptr) {
739 userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
740 appId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, "");
741 storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
742 label = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
743 }
744 label = DBCommon::TransferStringToHex(label);
745 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
746 storeId.c_str(), label.c_str());
747 if (syncAbleEngine_ != nullptr) {
748 syncAbleEngine_->Dump(fd);
749 }
750 }
751
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)752 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
753 uint64_t connectionId, std::shared_ptr<ResultSet> &result)
754 {
755 if (sqliteStorageEngine_ == nullptr) {
756 return -E_INVALID_DB;
757 }
758 if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
759 LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
760 return -E_MAX_LIMITS;
761 }
762
763 if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
764 LOGW("not a distributed relational store.");
765 return -E_NOT_SUPPORT;
766 }
767
768 // Check whether to be able to operate the db.
769 int errCode = E_OK;
770 auto *handle = GetHandle(false, errCode);
771 if (handle == nullptr) {
772 return errCode;
773 }
774 errCode = handle->CheckEncryptedOrCorrupted();
775 ReleaseHandle(handle);
776 if (errCode != E_OK) {
777 return errCode;
778 }
779
780 return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
781 }
782
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)783 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
784 {
785 std::set<std::string> devices;
786 int errCode = GetExistDevices(devices);
787 if (errCode != E_OK) {
788 return errCode;
789 }
790 for (const auto &tableName : tableNameList) {
791 for (const auto &device : devices) {
792 errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
793 if (errCode != E_OK) {
794 return errCode;
795 }
796 }
797 }
798 return E_OK;
799 }
800
GetDevTableName(const std::string & device,const std::string & hashDev) const801 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
802 {
803 std::string devTableName;
804 StoreInfo info = { sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
805 sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
806 sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "") };
807 if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
808 devTableName = hashDev;
809 }
810 return devTableName;
811 }
812
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const813 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
814 {
815 int errCode = E_OK;
816 handle = GetHandle(true, errCode);
817 if (handle == nullptr) {
818 LOGE("get handle failed %d", errCode);
819 return errCode;
820 }
821
822 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
823 if (errCode != E_OK) {
824 LOGE("start transaction failed %d", errCode);
825 ReleaseHandle(handle);
826 }
827 return errCode;
828 }
829
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)830 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
831 const std::string &tableName, bool isNeedHash)
832 {
833 std::string hashHexDev;
834 std::string hashDev;
835 std::string devTableName;
836 if (!isNeedHash) {
837 // if is not need hash mappingDev mean hash(uuid) device is param device
838 hashHexDev = DBCommon::TransferStringToHex(mappingDev);
839 hashDev = mappingDev;
840 devTableName = device;
841 } else {
842 // if is need hash mappingDev mean uuid
843 hashDev = DBCommon::TransferHashString(mappingDev);
844 hashHexDev = DBCommon::TransferStringToHex(hashDev);
845 devTableName = GetDevTableName(mappingDev, hashHexDev);
846 }
847 // erase watermark first
848 int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
849 if (errCode != E_OK) {
850 LOGE("erase watermark failed %d", errCode);
851 return errCode;
852 }
853 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
854 errCode = GetHandleAndStartTransaction(handle);
855 if (handle == nullptr) {
856 return errCode;
857 }
858
859 errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
860 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
861 if (errCode != E_OK) {
862 LOGE("delete device data failed. %d", errCode);
863 tables.clear();
864 }
865
866 for (const auto &it : tables) {
867 if (tableName.empty() || it.second.GetTableName() == tableName) {
868 errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
869 if (errCode != E_OK) {
870 LOGE("delete device data failed. %d", errCode);
871 break;
872 }
873 }
874 }
875
876 if (errCode != E_OK) {
877 (void)handle->Rollback();
878 ReleaseHandle(handle);
879 return errCode;
880 }
881 errCode = handle->Commit();
882 ReleaseHandle(handle);
883 storageEngine_->NotifySchemaChanged();
884 return errCode;
885 }
886
GetExistDevices(std::set<std::string> & hashDevices) const887 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
888 {
889 int errCode = E_OK;
890 auto *handle = GetHandle(true, errCode);
891 if (handle == nullptr) {
892 LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
893 return errCode;
894 }
895 errCode = handle->GetExistsDeviceList(hashDevices);
896 if (errCode != E_OK) {
897 LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
898 }
899 ReleaseHandle(handle);
900 return errCode;
901 }
902
GetAllDistributedTableName(TableSyncType tableSyncType)903 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName(TableSyncType tableSyncType)
904 {
905 TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
906 std::vector<std::string> tableNames;
907 for (const auto &table : tables) {
908 if (table.second.GetTableSyncType() != tableSyncType) {
909 continue;
910 }
911 tableNames.push_back(table.second.GetTableName());
912 }
913 return tableNames;
914 }
915
916 #ifdef USE_DISTRIBUTEDDB_CLOUD
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)917 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
918 {
919 if (cloudSyncer_ == nullptr) {
920 LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
921 return -E_INVALID_DB;
922 }
923 cloudSyncer_->SetCloudDB(cloudDb);
924 return E_OK;
925 }
926 #endif
927
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)928 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
929 std::vector<Field> &addFields)
930 {
931 for (const auto &newField : newFields) {
932 if (equalFields.find(newField.colName) == equalFields.end()) {
933 addFields.push_back(newField);
934 }
935 }
936 }
937
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)938 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
939 std::vector<Field> &addFields)
940 {
941 std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
942 if (newFields.size() < oldFields.size()) {
943 return false;
944 }
945 std::set<std::string> equalFields;
946 for (const auto &oldField : oldFields) {
947 bool isFieldExist = false;
948 for (const auto &newField : newFields) {
949 if (newField.colName != oldField.GetFieldName()) {
950 continue;
951 }
952 isFieldExist = true;
953 int32_t type = newField.type;
954 // Field type need to match storage type
955 // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
956 // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
957 if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
958 type++; // storage type - field type = 1
959 } else if (type == TYPE_INDEX<bool>) {
960 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
961 } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
962 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
963 }
964 auto primaryKeyMap = tableInfo.GetPrimaryKey();
965 auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
966 [&newField](const std::map<int, std::string>::value_type &pair) {
967 return pair.second == newField.colName;
968 });
969 if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
970 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
971 return false;
972 }
973 equalFields.insert(newField.colName);
974 }
975 if (!isFieldExist) {
976 return false;
977 }
978 }
979 AddFields(newFields, equalFields, addFields);
980 return true;
981 }
982
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)983 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
984 std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
985 {
986 std::set<std::string> tableNames;
987 std::map<std::string, std::string> sharedTableNamesMap;
988 std::map<std::string, std::vector<Field>> fieldsMap;
989 for (const auto &table : schema.tables) {
990 tableNames.insert(table.name);
991 sharedTableNamesMap[table.name] = table.sharedTableName;
992 std::vector<Field> fields = table.fields;
993 bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
994 Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
995 Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
996 fields.insert(fields.begin(), privilegeField);
997 fields.insert(fields.begin(), ownerField);
998 fieldsMap[table.name] = fields;
999 }
1000
1001 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1002 TableInfoMap tableList = localSchema.GetTables();
1003 for (const auto &tableInfo : tableList) {
1004 if (!tableInfo.second.GetSharedTableMark()) {
1005 continue;
1006 }
1007 std::string oldSharedTableName = tableInfo.second.GetTableName();
1008 std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1009 std::vector<Field> addFields;
1010 if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1011 deleteTableNames.push_back(oldSharedTableName);
1012 } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1013 deleteTableNames.push_back(oldSharedTableName);
1014 } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1015 if (!addFields.empty()) {
1016 updateTableNames[oldSharedTableName] = addFields;
1017 }
1018 if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1019 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1020 }
1021 } else {
1022 return false;
1023 }
1024 }
1025 return true;
1026 }
1027
1028 #ifdef USE_DISTRIBUTEDDB_CLOUD
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1029 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1030 {
1031 if (storageEngine_ == nullptr) {
1032 LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1033 return -E_INVALID_DB;
1034 }
1035 int errCode = CheckCloudSchema(schema);
1036 if (errCode != E_OK) {
1037 return errCode;
1038 }
1039 // delete, update and create shared table and its distributed table
1040 errCode = ExecuteCreateSharedTable(schema);
1041 if (errCode != E_OK) {
1042 LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1043 return errCode;
1044 }
1045 return storageEngine_->SetCloudDbSchema(schema);
1046 }
1047
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1048 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1049 {
1050 if (cloudSyncer_ == nullptr) {
1051 LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1052 return -E_INVALID_DB;
1053 }
1054 cloudSyncer_->SetIAssetLoader(loader);
1055 return E_OK;
1056 }
1057 #endif
1058
ChkSchema(const TableName & tableName)1059 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1060 {
1061 // check schema first then compare columns to avoid change the origin return error code
1062 if (storageEngine_ == nullptr) {
1063 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1064 return -E_INVALID_DB;
1065 }
1066 int errCode = storageEngine_->ChkSchema(tableName);
1067 if (errCode != E_OK) {
1068 LOGE("[SQLiteRelationalStore][ChkSchema] ChkSchema failed %d.", errCode);
1069 return errCode;
1070 }
1071 auto *handle = GetHandle(false, errCode);
1072 if (handle == nullptr) {
1073 LOGE("[SQLiteRelationalStore][ChkSchema] handle is nullptr");
1074 return errCode;
1075 }
1076 RelationalSchemaObject localSchema = storageEngine_->GetSchemaInfo();
1077 handle->SetLocalSchema(localSchema);
1078 errCode = handle->CompareSchemaTableColumns(tableName);
1079 if (errCode != E_OK) {
1080 LOGE("[SQLiteRelationalStore][ChkSchema] local schema info incompatible %d.", errCode);
1081 }
1082 ReleaseHandle(handle);
1083 return errCode;
1084 }
1085
1086 #ifdef USE_DISTRIBUTEDDB_CLOUD
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1087 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1088 {
1089 if (storageEngine_ == nullptr) {
1090 LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1091 return -E_INVALID_DB;
1092 }
1093 int errCode = CheckBeforeSync(option);
1094 if (errCode != E_OK) {
1095 return errCode;
1096 }
1097 LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1098 if (option.compensatedSyncOnly) {
1099 CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1100 info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1101 cloudSyncer_->GenerateCompensatedSync(info);
1102 return E_OK;
1103 }
1104 CloudSyncer::CloudTaskInfo info;
1105 FillSyncInfo(option, onProcess, info);
1106 auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1107 if (ret != E_OK) {
1108 return ret;
1109 }
1110 ret = ReFillSyncInfoTable(table, info);
1111 if (ret != E_OK) {
1112 return ret;
1113 }
1114 info.taskId = taskId;
1115 errCode = cloudSyncer_->Sync(info);
1116 return errCode;
1117 }
1118
CheckBeforeSync(const CloudSyncOption & option)1119 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1120 {
1121 if (cloudSyncer_ == nullptr) {
1122 LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1123 return -E_INVALID_DB;
1124 }
1125 if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1126 return -E_INVALID_ARGS;
1127 }
1128 if (option.priorityLevel < CloudDbConstant::PRIORITY_TASK_DEFALUT_LEVEL ||
1129 option.priorityLevel > CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1130 LOGE("[RelationalStore] priority level is invalid value:%d", option.priorityLevel);
1131 return -E_INVALID_ARGS;
1132 }
1133 if (option.compensatedSyncOnly && option.asyncDownloadAssets) {
1134 return -E_NOT_SUPPORT;
1135 }
1136 int errCode = CheckQueryValid(option);
1137 if (errCode != E_OK) {
1138 return errCode;
1139 }
1140 SecurityOption securityOption;
1141 errCode = storageEngine_->GetSecurityOption(securityOption);
1142 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1143 return -E_SECURITY_OPTION_CHECK_ERROR;
1144 }
1145 if (errCode == E_OK && securityOption.securityLabel == S4) {
1146 return -E_SECURITY_OPTION_CHECK_ERROR;
1147 }
1148 return E_OK;
1149 }
1150
CheckAssetsOnlyValid(const QuerySyncObject & querySyncObject,const CloudSyncOption & option)1151 int SQLiteRelationalStore::CheckAssetsOnlyValid(const QuerySyncObject &querySyncObject, const CloudSyncOption &option)
1152 {
1153 if (!querySyncObject.IsAssetsOnly()) {
1154 return E_OK;
1155 }
1156 if (option.mode != SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1157 LOGE("[RelationalStore] not support mode %d when sync with assets only", option.mode);
1158 return -E_NOT_SUPPORT;
1159 }
1160 if (option.priorityLevel != CloudDbConstant::PRIORITY_TASK_MAX_LEVEL) {
1161 LOGE("[RelationalStore] priorityLevel must be 2 when sync with assets only, now is %d",
1162 option.priorityLevel);
1163 return -E_INVALID_ARGS;
1164 }
1165 if (querySyncObject.AssetsOnlyErrFlag() == -E_INVALID_ARGS) {
1166 LOGE("[RelationalStore] the query statement of assets only is incorrect.");
1167 return -E_INVALID_ARGS;
1168 }
1169 return E_OK;
1170 }
1171
CheckQueryValid(const CloudSyncOption & option)1172 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1173 {
1174 if (option.compensatedSyncOnly) {
1175 return E_OK;
1176 }
1177 QuerySyncObject syncObject(option.query);
1178 int errCode = syncObject.GetValidStatus();
1179 if (errCode != E_OK) {
1180 LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1181 return errCode;
1182 }
1183 std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1184 bool isFromTable = object.empty();
1185 if (!option.priorityTask && !isFromTable) {
1186 LOGE("[RelationalStore] not support normal sync with query");
1187 return -E_NOT_SUPPORT;
1188 }
1189 const auto tableNames = syncObject.GetRelationTableNames();
1190 for (const auto &tableName : tableNames) {
1191 QuerySyncObject querySyncObject;
1192 querySyncObject.SetTableName(tableName);
1193 object.push_back(querySyncObject);
1194 }
1195 std::vector<std::string> syncTableNames;
1196 for (const auto &item : object) {
1197 std::string tableName = item.GetRelationTableName();
1198 syncTableNames.emplace_back(tableName);
1199 if (item.IsContainQueryNodes() && option.asyncDownloadAssets) {
1200 LOGE("[RelationalStore] not support async download assets with query table %s length %zu",
1201 DBCommon::StringMiddleMasking(tableName).c_str(), tableName.length());
1202 return -E_NOT_SUPPORT;
1203 }
1204 errCode = CheckAssetsOnlyValid(item, option);
1205 if (errCode != E_OK) {
1206 return errCode;
1207 }
1208 }
1209 errCode = CheckTableName(syncTableNames);
1210 if (errCode != E_OK) {
1211 return errCode;
1212 }
1213 return CheckObjectValid(option.priorityTask, object, isFromTable);
1214 }
1215
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1216 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1217 bool isFromTable)
1218 {
1219 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1220 for (const auto &item : object) {
1221 if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1222 LOGE("[RelationalStore] not support priority sync with full table");
1223 return -E_INVALID_ARGS;
1224 }
1225 int errCode = storageEngine_->CheckQueryValid(item);
1226 if (errCode != E_OK) {
1227 return errCode;
1228 }
1229 if (!priorityTask || isFromTable) {
1230 continue;
1231 }
1232 if (!item.IsInValueOutOfLimit()) {
1233 LOGE("[RelationalStore] not support priority sync in count out of limit");
1234 return -E_MAX_LIMITS;
1235 }
1236 std::string tableName = item.GetRelationTableName();
1237 TableInfo tableInfo = localSchema.GetTable(tableName);
1238 if (!tableInfo.Empty()) {
1239 const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1240 errCode = item.CheckPrimaryKey(primaryKeyMap);
1241 if (errCode != E_OK) {
1242 return errCode;
1243 }
1244 }
1245 }
1246 return E_OK;
1247 }
1248
CheckTableName(const std::vector<std::string> & tableNames)1249 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1250 {
1251 if (tableNames.empty()) {
1252 LOGE("[RelationalStore] sync with empty table");
1253 return -E_INVALID_ARGS;
1254 }
1255 for (const auto &table : tableNames) {
1256 int errCode = ChkSchema(table);
1257 if (errCode != E_OK) {
1258 LOGE("[RelationalStore] schema check failed when sync");
1259 return errCode;
1260 }
1261 }
1262 return E_OK;
1263 }
1264
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1265 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1266 CloudSyncer::CloudTaskInfo &info)
1267 {
1268 auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1269 if (syncObject.empty()) {
1270 QuerySyncObject querySyncObject(option.query);
1271 info.table = querySyncObject.GetRelationTableNames();
1272 for (const auto &item : info.table) {
1273 QuerySyncObject object(Query::Select());
1274 object.SetTableName(item);
1275 info.queryList.push_back(object);
1276 }
1277 } else {
1278 for (auto &item : syncObject) {
1279 info.table.push_back(item.GetRelationTableName());
1280 info.queryList.push_back(std::move(item));
1281 }
1282 }
1283 info.devices = option.devices;
1284 info.mode = option.mode;
1285 info.callback = onProcess;
1286 info.timeout = option.waitTime;
1287 info.priorityTask = option.priorityTask;
1288 info.compensatedTask = option.compensatedSyncOnly;
1289 info.priorityLevel = option.priorityLevel;
1290 info.users.emplace_back("");
1291 info.lockAction = option.lockAction;
1292 info.merge = option.merge;
1293 info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1294 info.prepareTraceId = option.prepareTraceId;
1295 info.asyncDownloadAssets = option.asyncDownloadAssets;
1296 }
1297 #endif
1298
SetTrackerTable(const TrackerSchema & trackerSchema)1299 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1300 {
1301 TableInfo tableInfo;
1302 bool isFirstCreate = false;
1303 bool isNoTableInSchema = false;
1304 int errCode = CheckTrackerTable(trackerSchema, tableInfo, isNoTableInSchema, isFirstCreate);
1305 if (errCode != E_OK) {
1306 if (errCode != -E_IGNORE_DATA) {
1307 return errCode;
1308 }
1309 auto *handle = GetHandle(true, errCode);
1310 if (handle != nullptr) {
1311 handle->CheckAndCreateTrigger(tableInfo);
1312 ReleaseHandle(handle);
1313 }
1314 return E_OK;
1315 }
1316 errCode = sqliteStorageEngine_->UpdateExtendField(trackerSchema);
1317 if (errCode != E_OK) {
1318 LOGE("[RelationalStore] update [%s [%zu]] extend_field failed: %d",
1319 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1320 return errCode;
1321 }
1322 if (isNoTableInSchema) {
1323 return sqliteStorageEngine_->SetTrackerTable(trackerSchema, tableInfo, isFirstCreate);
1324 }
1325 sqliteStorageEngine_->CacheTrackerSchema(trackerSchema);
1326 errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1327 if (errCode != E_OK) {
1328 LOGE("[RelationalStore] create distributed table of [%s [%zu]] failed: %d",
1329 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size(), errCode);
1330 return errCode;
1331 }
1332 return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1333 }
1334
CheckTrackerTable(const TrackerSchema & trackerSchema,TableInfo & table,bool & isNoTableInSchema,bool & isFirstCreate)1335 int SQLiteRelationalStore::CheckTrackerTable(const TrackerSchema &trackerSchema, TableInfo &table,
1336 bool &isNoTableInSchema, bool &isFirstCreate)
1337 {
1338 const RelationalSchemaObject &tracker = sqliteStorageEngine_->GetTrackerSchema();
1339 isFirstCreate = tracker.GetTrackerTable(trackerSchema.tableName).GetTableName().empty();
1340 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1341 table = localSchema.GetTable(trackerSchema.tableName);
1342 TrackerTable trackerTable;
1343 trackerTable.Init(trackerSchema);
1344 int errCode = E_OK;
1345 if (table.Empty()) {
1346 isNoTableInSchema = true;
1347 table.SetTableSyncType(TableSyncType::CLOUD_COOPERATION);
1348 auto *handle = GetHandle(true, errCode);
1349 if (handle == nullptr) {
1350 return errCode;
1351 }
1352 errCode = handle->AnalysisTrackerTable(trackerTable, table);
1353 ReleaseHandle(handle);
1354 if (errCode != E_OK) {
1355 LOGE("[CheckTrackerTable] analysis table schema failed %d.", errCode);
1356 return errCode;
1357 }
1358 } else {
1359 table.SetTrackerTable(trackerTable);
1360 errCode = table.CheckTrackerTable();
1361 if (errCode != E_OK) {
1362 LOGE("[CheckTrackerTable] check tracker table schema failed. %d", errCode);
1363 return errCode;
1364 }
1365 }
1366 if (!trackerSchema.isForceUpgrade && !tracker.GetTrackerTable(trackerSchema.tableName).IsChanging(trackerSchema)) {
1367 LOGW("[CheckTrackerTable] tracker schema is no change, table[%s [%zu]]",
1368 DBCommon::StringMiddleMasking(trackerSchema.tableName).c_str(), trackerSchema.tableName.size());
1369 return -E_IGNORE_DATA;
1370 }
1371 return E_OK;
1372 }
1373
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1374 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1375 {
1376 if (condition.sql.empty()) {
1377 LOGE("[RelationalStore] execute sql is empty.");
1378 return -E_INVALID_ARGS;
1379 }
1380 return sqliteStorageEngine_->ExecuteSql(condition, records);
1381 }
1382
CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor * & handle,std::set<std::string> & clearWaterMarkTable)1383 int SQLiteRelationalStore::CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle,
1384 std::set<std::string> &clearWaterMarkTable)
1385 {
1386 int errCode = E_OK;
1387 for (const auto &tableName : clearWaterMarkTable) {
1388 std::string cloudWaterMark;
1389 Value blobMetaVal;
1390 errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1391 if (errCode != E_OK) {
1392 LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1393 return errCode;
1394 }
1395 errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1396 if (errCode != E_OK) {
1397 LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1398 return errCode;
1399 }
1400 errCode = handle->CleanUploadFinishedFlag(tableName);
1401 if (errCode != E_OK) {
1402 LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1403 return errCode;
1404 }
1405 }
1406 return errCode;
1407 }
1408
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1409 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1410 {
1411 SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1412 int errCode = GetHandleAndStartTransaction(handle);
1413 if (errCode != E_OK) {
1414 LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1415 return errCode;
1416 }
1417 std::set<std::string> clearWaterMarkTables;
1418 RelationalSchemaObject schema;
1419 errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1420 if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1421 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1422 (void)handle->Rollback();
1423 ReleaseHandle(handle);
1424 return errCode;
1425 }
1426
1427 if (!clearWaterMarkTables.empty()) {
1428 storageEngine_->SetReusedHandle(handle);
1429 int ret = CleanWaterMark(handle, clearWaterMarkTables);
1430 if (ret != E_OK) {
1431 LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", ret);
1432 storageEngine_->SetReusedHandle(nullptr);
1433 (void)handle->Rollback();
1434 ReleaseHandle(handle);
1435 return ret;
1436 }
1437 storageEngine_->SetReusedHandle(nullptr);
1438 LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1439 }
1440
1441 int ret = handle->Commit();
1442 ReleaseHandle(handle);
1443 if (ret != E_OK) {
1444 LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1445 return ret;
1446 }
1447 sqliteStorageEngine_->SetSchema(schema);
1448 #ifdef USE_DISTRIBUTEDDB_CLOUD
1449 if (!clearWaterMarkTables.empty()) {
1450 ret = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTables);
1451 if (ret != E_OK) {
1452 LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1453 return ret;
1454 }
1455 }
1456 #endif
1457 return errCode;
1458 }
1459
InitTrackerSchemaFromMeta()1460 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1461 {
1462 int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1463 return errCode == -E_NOT_FOUND ? E_OK : errCode;
1464 }
1465
CleanTrackerData(const std::string & tableName,int64_t cursor)1466 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1467 {
1468 if (tableName.empty()) {
1469 return -E_INVALID_ARGS;
1470 }
1471 return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1472 }
1473
ExecuteCreateSharedTable(const DataBaseSchema & schema)1474 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1475 {
1476 if (sqliteStorageEngine_ == nullptr) {
1477 LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1478 return -E_INVALID_DB;
1479 }
1480 std::vector<std::string> deleteTableNames;
1481 std::map<std::string, std::vector<Field>> updateTableNames;
1482 std::map<std::string, std::string> alterTableNames;
1483 if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1484 LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1485 return -E_INVALID_ARGS;
1486 }
1487 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1488 // upgrade contains delete, alter, update and create
1489 int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1490 if (errCode != E_OK) {
1491 LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1492 } else {
1493 LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1494 }
1495 return errCode;
1496 }
1497
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1498 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1499 CloudSyncer::CloudTaskInfo &info)
1500 {
1501 if (info.priorityTask && actualTable.size() != info.table.size()) {
1502 LOGE("[RelationalStore] Not support regenerate table with priority task");
1503 return -E_NOT_SUPPORT;
1504 }
1505 if (actualTable.size() == info.table.size()) {
1506 return E_OK;
1507 }
1508 LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1509 info.table = actualTable;
1510 info.queryList.clear();
1511 for (const auto &item : info.table) {
1512 QuerySyncObject object(Query::Select());
1513 object.SetTableName(item);
1514 info.queryList.push_back(object);
1515 }
1516 return E_OK;
1517 }
1518
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1519 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1520 {
1521 if (cmd != LOGIC_DELETE_SYNC_DATA) {
1522 return -E_NOT_SUPPORT;
1523 }
1524 if (pragmaData == nullptr) {
1525 return -E_INVALID_ARGS;
1526 }
1527 auto logicDelete = *(static_cast<bool *>(pragmaData));
1528 if (storageEngine_ == nullptr) {
1529 LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1530 return -E_INVALID_DB;
1531 }
1532 storageEngine_->SetLogicDelete(logicDelete);
1533 return E_OK;
1534 }
1535
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1536 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1537 const std::vector<VBucket> &records)
1538 {
1539 if (storageEngine_ == nullptr) {
1540 LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1541 return -E_INVALID_DB;
1542 }
1543 int errCode = CheckParamForUpsertData(status, tableName, records);
1544 if (errCode != E_OK) {
1545 return errCode;
1546 }
1547 return storageEngine_->UpsertData(status, tableName, records);
1548 }
1549
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1550 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1551 const std::vector<VBucket> &records)
1552 {
1553 if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1554 LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1555 return -E_INVALID_ARGS;
1556 }
1557 if (records.empty()) {
1558 LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1559 return -E_INVALID_ARGS;
1560 }
1561 size_t recordSize = records.size();
1562 if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1563 LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1564 return -E_MAX_LIMITS;
1565 }
1566 return CheckSchemaForUpsertData(tableName, records);
1567 }
1568
ChkTable(const TableInfo & table)1569 static int ChkTable(const TableInfo &table)
1570 {
1571 if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1572 LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1573 return -E_NOT_SUPPORT;
1574 }
1575 if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1576 return -E_NOT_FOUND;
1577 }
1578 return E_OK;
1579 }
1580
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1581 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1582 {
1583 if (tableName.empty()) {
1584 return -E_INVALID_ARGS;
1585 }
1586 auto schema = storageEngine_->GetSchemaInfo();
1587 auto table = schema.GetTable(tableName);
1588 int errCode = ChkTable(table);
1589 if (errCode != E_OK) {
1590 return errCode;
1591 }
1592 TableSchema cloudTableSchema;
1593 errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1594 if (errCode != E_OK) {
1595 LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1596 return errCode;
1597 }
1598 errCode = ChkSchema(tableName);
1599 if (errCode != E_OK) {
1600 return errCode;
1601 }
1602 std::set<std::string> dbPkFields;
1603 for (auto &field : table.GetIdentifyKey()) {
1604 dbPkFields.insert(field);
1605 }
1606 std::set<std::string> schemaFields;
1607 for (auto &fieldInfo : table.GetFieldInfos()) {
1608 schemaFields.insert(fieldInfo.GetFieldName());
1609 }
1610 for (const auto &record : records) {
1611 std::set<std::string> recordPkFields;
1612 for (const auto &item : record) {
1613 if (schemaFields.find(item.first) == schemaFields.end()) {
1614 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1615 return -E_INVALID_ARGS;
1616 }
1617 if (dbPkFields.find(item.first) == dbPkFields.end()) {
1618 continue;
1619 }
1620 recordPkFields.insert(item.first);
1621 }
1622 if (recordPkFields.size() != dbPkFields.size()) {
1623 LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1624 recordPkFields.size(), dbPkFields.size());
1625 return -E_INVALID_ARGS;
1626 }
1627 }
1628 return errCode;
1629 }
1630
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1631 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1632 {
1633 auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1634 if (engine == nullptr) {
1635 LOGE("[RelationalStore][Open] Create storage engine failed");
1636 return -E_OUT_OF_MEMORY;
1637 }
1638 sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1639 [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1640 RefObject::KillAndDecObjRef(releaseEngine);
1641 });
1642 return E_OK;
1643 }
1644
1645 #ifdef USE_DISTRIBUTEDDB_CLOUD
CheckCloudSchema(const DataBaseSchema & schema)1646 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1647 {
1648 if (storageEngine_ == nullptr) {
1649 LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1650 return -E_INVALID_DB;
1651 }
1652 std::shared_ptr<DataBaseSchema> cloudSchema;
1653 (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1654 RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1655 for (const auto &tableSchema : schema.tables) {
1656 TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1657 if (tableInfo.Empty()) {
1658 continue;
1659 }
1660 if (tableInfo.GetSharedTableMark()) {
1661 LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1662 return -E_INVALID_ARGS;
1663 }
1664 }
1665 for (const auto &tableSchema : schema.tables) {
1666 if (cloudSchema == nullptr) {
1667 continue;
1668 }
1669 for (const auto &oldSchema : cloudSchema->tables) {
1670 if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1671 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1672 return -E_INVALID_ARGS;
1673 }
1674 }
1675 }
1676 return E_OK;
1677 }
1678
SetCloudSyncConfig(const CloudSyncConfig & config)1679 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1680 {
1681 if (storageEngine_ == nullptr) {
1682 LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1683 return -E_INVALID_DB;
1684 }
1685 storageEngine_->SetCloudSyncConfig(config);
1686 return E_OK;
1687 }
1688
GetCloudTaskStatus(uint64_t taskId)1689 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1690 {
1691 return cloudSyncer_->GetCloudTaskStatus(taskId);
1692 }
1693 #endif
1694
SetDistributedSchema(const DistributedSchema & schema,bool isForceUpgrade)1695 int SQLiteRelationalStore::SetDistributedSchema(const DistributedSchema &schema, bool isForceUpgrade)
1696 {
1697 if (sqliteStorageEngine_ == nullptr || storageEngine_ == nullptr) {
1698 LOGE("[RelationalStore] engine was not initialized");
1699 return -E_INVALID_DB;
1700 }
1701 auto [errCode, isSchemaChange] = sqliteStorageEngine_->SetDistributedSchema(schema, isForceUpgrade);
1702 if (errCode != E_OK) {
1703 return errCode;
1704 }
1705 if (isSchemaChange) {
1706 LOGI("[RelationalStore] schema was changed by setting distributed schema");
1707 storageEngine_->NotifySchemaChanged();
1708 }
1709 return E_OK;
1710 }
1711
GetDownloadingAssetsCount(int32_t & count)1712 int SQLiteRelationalStore::GetDownloadingAssetsCount(int32_t &count)
1713 {
1714 std::vector<std::string> tableNameList = GetAllDistributedTableName(TableSyncType::CLOUD_COOPERATION);
1715 if (tableNameList.empty()) {
1716 return E_OK;
1717 }
1718
1719 int errCode = E_OK;
1720 SQLiteSingleVerRelationalStorageExecutor *handle = GetHandle(false, errCode);
1721 if (handle == nullptr) {
1722 return errCode;
1723 }
1724 for (const auto &tableName : tableNameList) {
1725 TableSchema tableSchema;
1726 int errCode = storageEngine_->GetCloudTableSchema(tableName, tableSchema);
1727 if (errCode != E_OK) {
1728 LOGE("[RelationalStore] Get schema failed when get download assets count, %d, tableName: %s, length: %zu",
1729 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1730 break;
1731 }
1732 errCode = handle->GetDownloadingAssetsCount(tableSchema, count);
1733 if (errCode != E_OK) {
1734 LOGE("[RelationalStore] Get download assets count failed: %d, tableName: %s, length: %zu",
1735 errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
1736 break;
1737 }
1738 }
1739 ReleaseHandle(handle);
1740 return errCode;
1741 }
1742
SetTableMode(DistributedTableMode tableMode)1743 int SQLiteRelationalStore::SetTableMode(DistributedTableMode tableMode)
1744 {
1745 if (sqliteStorageEngine_ == nullptr) {
1746 LOGE("[RelationalStore][SetTableMode] sqliteStorageEngine was not initialized");
1747 return -E_INVALID_DB;
1748 }
1749 if (sqliteStorageEngine_->GetProperties().GetDistributedTableMode() == DistributedTableMode::SPLIT_BY_DEVICE &&
1750 tableMode == DistributedTableMode::COLLABORATION) {
1751 auto schema = sqliteStorageEngine_->GetSchema();
1752 for (const auto &tableMap : schema.GetTables()) {
1753 if (tableMap.second.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
1754 LOGW("[RelationalStore][SetTableMode] Can not set table mode for table %s[%zu]",
1755 DBCommon::StringMiddleMasking(tableMap.first).c_str(), tableMap.first.size());
1756 return -E_NOT_SUPPORT;
1757 }
1758 }
1759 }
1760 RelationalDBProperties properties = sqliteStorageEngine_->GetProperties();
1761 properties.SetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, static_cast<int>(tableMode));
1762 sqliteStorageEngine_->SetProperties(properties);
1763 LOGI("[RelationalStore][SetTableMode] Set table mode to %d successful", static_cast<int>(tableMode));
1764 return E_OK;
1765 }
1766 } // namespace DistributedDB
1767 #endif
1768