• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_relational_store.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "db_common.h"
20 #include "db_constant.h"
21 #include "db_dump_helper.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "db_types.h"
25 #include "sqlite_log_table_manager.h"
26 #include "sqlite_relational_store_connection.h"
27 #include "storage_engine_manager.h"
28 #include "cloud_sync_utils.h"
29 
30 namespace DistributedDB {
31 namespace {
32 constexpr const char *DISTRIBUTED_TABLE_MODE = "distributed_table_mode";
33 }
34 
~SQLiteRelationalStore()35 SQLiteRelationalStore::~SQLiteRelationalStore()
36 {
37     sqliteStorageEngine_ = nullptr;
38 }
39 
40 // Called when a new connection created.
IncreaseConnectionCounter()41 void SQLiteRelationalStore::IncreaseConnectionCounter()
42 {
43     connectionCount_.fetch_add(1, std::memory_order_seq_cst);
44     if (connectionCount_.load() > 0) {
45         sqliteStorageEngine_->SetConnectionFlag(true);
46     }
47 }
48 
GetDBConnection(int & errCode)49 RelationalStoreConnection *SQLiteRelationalStore::GetDBConnection(int &errCode)
50 {
51     std::lock_guard<std::mutex> lock(connectMutex_);
52     RelationalStoreConnection *connection = new (std::nothrow) SQLiteRelationalStoreConnection(this);
53     if (connection == nullptr) {
54         errCode = -E_OUT_OF_MEMORY;
55         return nullptr;
56     }
57     IncObjRef(this);
58     IncreaseConnectionCounter();
59     return connection;
60 }
61 
InitDataBaseOption(const RelationalDBProperties & properties,OpenDbProperties & option)62 static void InitDataBaseOption(const RelationalDBProperties &properties, OpenDbProperties &option)
63 {
64     option.uri = properties.GetStringProp(DBProperties::DATA_DIR, "");
65     option.createIfNecessary = properties.GetBoolProp(DBProperties::CREATE_IF_NECESSARY, false);
66     if (properties.IsEncrypted()) {
67         option.cipherType = properties.GetCipherType();
68         option.passwd = properties.GetPasswd();
69         option.iterTimes = properties.GetIterTimes();
70     }
71 }
72 
InitStorageEngine(const RelationalDBProperties & properties)73 int SQLiteRelationalStore::InitStorageEngine(const RelationalDBProperties &properties)
74 {
75     OpenDbProperties option;
76     InitDataBaseOption(properties, option);
77     std::string identifier = properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "");
78 
79     StorageEngineAttr poolSize = { 1, 1, 0, 16 }; // at most 1 write 16 read.
80     int errCode = sqliteStorageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
81     if (errCode != E_OK) {
82         LOGE("Init the sqlite storage engine failed:%d", errCode);
83     }
84     return errCode;
85 }
86 
ReleaseResources()87 void SQLiteRelationalStore::ReleaseResources()
88 {
89     if (sqliteStorageEngine_ != nullptr) {
90         sqliteStorageEngine_->ClearEnginePasswd();
91         sqliteStorageEngine_ = nullptr;
92     }
93     if (cloudSyncer_ != nullptr) {
94         cloudSyncer_->Close();
95         RefObject::KillAndDecObjRef(cloudSyncer_);
96         cloudSyncer_ = nullptr;
97     }
98     RefObject::DecObjRef(storageEngine_);
99 }
100 
CheckDBMode()101 int SQLiteRelationalStore::CheckDBMode()
102 {
103     int errCode = E_OK;
104     auto *handle = GetHandle(true, errCode);
105     if (handle == nullptr) {
106         return errCode;
107     }
108     errCode = handle->CheckDBModeForRelational();
109     if (errCode != E_OK) {
110         LOGE("check relational DB mode failed. %d", errCode);
111     }
112 
113     ReleaseHandle(handle);
114     return errCode;
115 }
116 
GetSchemaFromMeta(RelationalSchemaObject & schema)117 int SQLiteRelationalStore::GetSchemaFromMeta(RelationalSchemaObject &schema)
118 {
119     Key schemaKey;
120     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
121     Value schemaVal;
122     int errCode = storageEngine_->GetMetaData(schemaKey, schemaVal);
123     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
124         LOGE("Get relational schema from meta table failed. %d", errCode);
125         return errCode;
126     } else if (errCode == -E_NOT_FOUND || schemaVal.empty()) {
127         LOGW("No relational schema info was found. error %d size %zu", errCode, schemaVal.size());
128         return -E_NOT_FOUND;
129     }
130 
131     std::string schemaStr;
132     DBCommon::VectorToString(schemaVal, schemaStr);
133     errCode = schema.ParseFromSchemaString(schemaStr);
134     if (errCode != E_OK) {
135         LOGE("Parse schema string from meta table failed.");
136         return errCode;
137     }
138 
139     sqliteStorageEngine_->SetSchema(schema);
140     return E_OK;
141 }
142 
CheckTableModeFromMeta(DistributedTableMode mode,bool isUnSet)143 int SQLiteRelationalStore::CheckTableModeFromMeta(DistributedTableMode mode, bool isUnSet)
144 {
145     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
146     Value modeVal;
147     int errCode = storageEngine_->GetMetaData(modeKey, modeVal);
148     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
149         LOGE("Get distributed table mode from meta table failed. errCode=%d", errCode);
150         return errCode;
151     }
152 
153     DistributedTableMode orgMode = DistributedTableMode::SPLIT_BY_DEVICE;
154     if (!modeVal.empty()) {
155         std::string value(modeVal.begin(), modeVal.end());
156         orgMode = static_cast<DistributedTableMode>(strtoll(value.c_str(), nullptr, 10)); // 10: decimal
157     } else if (isUnSet) {
158         return E_OK; // First set table mode.
159     }
160 
161     if (orgMode != mode) {
162         LOGE("Check distributed table mode mismatch, orgMode=%d, openMode=%d", orgMode, mode);
163         return -E_INVALID_ARGS;
164     }
165     return E_OK;
166 }
167 
CheckProperties(RelationalDBProperties properties)168 int SQLiteRelationalStore::CheckProperties(RelationalDBProperties properties)
169 {
170     RelationalSchemaObject schema;
171     int errCode = GetSchemaFromMeta(schema);
172     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
173         LOGE("Get relational schema from meta failed. errcode=%d", errCode);
174         return errCode;
175     }
176     int ret = InitTrackerSchemaFromMeta();
177     if (ret != E_OK) {
178         LOGE("Init tracker schema from meta failed. errcode=%d", ret);
179         return ret;
180     }
181     properties.SetSchema(schema);
182 
183     // Empty schema means no distributed table has been used, we may set DB to any table mode
184     // If there is a schema but no table mode, it is the 'SPLIT_BY_DEVICE' mode of old version
185     bool isSchemaEmpty = (errCode == -E_NOT_FOUND);
186     auto mode = static_cast<DistributedTableMode>(
187         properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
188     errCode = CheckTableModeFromMeta(mode, isSchemaEmpty);
189     if (errCode != E_OK) {
190         LOGE("Get distributed table mode from meta failed. errcode=%d", errCode);
191         return errCode;
192     }
193     if (!isSchemaEmpty) {
194         return errCode;
195     }
196 
197     errCode = SaveTableModeToMeta(mode);
198     if (errCode != E_OK) {
199         LOGE("Save table mode to meta failed. errCode=%d", errCode);
200         return errCode;
201     }
202 
203     return E_OK;
204 }
205 
SaveSchemaToMeta()206 int SQLiteRelationalStore::SaveSchemaToMeta()
207 {
208     Key schemaKey;
209     DBCommon::StringToVector(DBConstant::RELATIONAL_SCHEMA_KEY, schemaKey);
210     Value schemaVal;
211     DBCommon::StringToVector(sqliteStorageEngine_->GetSchema().ToSchemaString(), schemaVal);
212     int errCode = storageEngine_->PutMetaData(schemaKey, schemaVal);
213     if (errCode != E_OK) {
214         LOGE("Save relational schema to meta table failed. %d", errCode);
215     }
216     return errCode;
217 }
218 
SaveTableModeToMeta(DistributedTableMode mode)219 int SQLiteRelationalStore::SaveTableModeToMeta(DistributedTableMode mode)
220 {
221     const Key modeKey(DISTRIBUTED_TABLE_MODE, DISTRIBUTED_TABLE_MODE + strlen(DISTRIBUTED_TABLE_MODE));
222     Value modeVal;
223     DBCommon::StringToVector(std::to_string(mode), modeVal);
224     int errCode = storageEngine_->PutMetaData(modeKey, modeVal);
225     if (errCode != E_OK) {
226         LOGE("Save relational schema to meta table failed. %d", errCode);
227     }
228     return errCode;
229 }
230 
SaveLogTableVersionToMeta()231 int SQLiteRelationalStore::SaveLogTableVersionToMeta()
232 {
233     LOGD("save log table version to meta table, version: %s", DBConstant::LOG_TABLE_VERSION_CURRENT);
234     const Key logVersionKey(DBConstant::LOG_TABLE_VERSION_KEY.begin(), DBConstant::LOG_TABLE_VERSION_KEY.end());
235     Value logVersion;
236     int errCode = storageEngine_->GetMetaData(logVersionKey, logVersion);
237     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
238         LOGE("Get log version from meta table failed. %d", errCode);
239         return errCode;
240     }
241     std::string versionStr(DBConstant::LOG_TABLE_VERSION_CURRENT);
242     Value logVersionVal(versionStr.begin(), versionStr.end());
243     // log version is same, no need to update
244     if (errCode == E_OK && !logVersion.empty() && logVersionVal == logVersion) {
245         return errCode;
246     }
247     // If the log version does not exist or is different, update the log version
248     errCode = storageEngine_->PutMetaData(logVersionKey, logVersionVal);
249     if (errCode != E_OK) {
250         LOGE("save log table version to meta table failed. %d", errCode);
251     }
252     return errCode;
253 }
254 
CleanDistributedDeviceTable()255 int SQLiteRelationalStore::CleanDistributedDeviceTable()
256 {
257     std::vector<std::string> missingTables;
258     int errCode = sqliteStorageEngine_->CleanDistributedDeviceTable(missingTables);
259     if (errCode != E_OK) {
260         LOGE("Clean distributed device table failed. %d", errCode);
261     }
262     for (const auto &deviceTableName : missingTables) {
263         std::string deviceHash;
264         std::string tableName;
265         DBCommon::GetDeviceFromName(deviceTableName, deviceHash, tableName);
266         syncAbleEngine_->EraseDeviceWaterMark(deviceHash, false, tableName);
267         if (errCode != E_OK) {
268             LOGE("Erase water mark failed:%d", errCode);
269             return errCode;
270         }
271     }
272     return errCode;
273 }
274 
Open(const RelationalDBProperties & properties)275 int SQLiteRelationalStore::Open(const RelationalDBProperties &properties)
276 {
277     std::lock_guard<std::mutex> lock(initalMutex_);
278     if (isInitialized_) {
279         LOGD("[RelationalStore][Open] relational db was already initialized.");
280         return E_OK;
281     }
282     int errCode = InitSQLiteStorageEngine(properties);
283     if (errCode != E_OK) {
284         return errCode;
285     }
286 
287     do {
288         errCode = InitStorageEngine(properties);
289         if (errCode != E_OK) {
290             LOGE("[RelationalStore][Open] Init database context fail! errCode = [%d]", errCode);
291             break;
292         }
293 
294         storageEngine_ = new (std::nothrow) RelationalSyncAbleStorage(sqliteStorageEngine_);
295         if (storageEngine_ == nullptr) {
296             LOGE("[RelationalStore][Open] Create syncable storage failed");
297             errCode = -E_OUT_OF_MEMORY;
298             break;
299         }
300 
301         syncAbleEngine_ = std::make_shared<SyncAbleEngine>(storageEngine_);
302         // to guarantee the life cycle of sync module and syncAbleEngine_ are the same, then the sync module will not
303         // be destructed when close store
304         storageEngine_->SetSyncAbleEngine(syncAbleEngine_);
305         cloudSyncer_ = new (std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_));
306 
307         errCode = CheckDBMode();
308         if (errCode != E_OK) {
309             break;
310         }
311 
312         errCode = CheckProperties(properties);
313         if (errCode != E_OK) {
314             break;
315         }
316 
317         errCode = SaveLogTableVersionToMeta();
318         if (errCode != E_OK) {
319             break;
320         }
321 
322         errCode = CleanDistributedDeviceTable();
323         if (errCode != E_OK) {
324             break;
325         }
326 
327         isInitialized_ = true;
328         return E_OK;
329     } while (false);
330 
331     ReleaseResources();
332     return errCode;
333 }
334 
OnClose(const std::function<void (void)> & notifier)335 void SQLiteRelationalStore::OnClose(const std::function<void(void)> &notifier)
336 {
337     AutoLock lockGuard(this);
338     if (notifier) {
339         closeNotifiers_.push_back(notifier);
340     } else {
341         LOGW("Register 'Close()' notifier failed, notifier is null.");
342     }
343 }
344 
GetHandle(bool isWrite,int & errCode) const345 SQLiteSingleVerRelationalStorageExecutor *SQLiteRelationalStore::GetHandle(bool isWrite, int &errCode) const
346 {
347     if (sqliteStorageEngine_ == nullptr) {
348         errCode = -E_INVALID_DB;
349         return nullptr;
350     }
351 
352     return static_cast<SQLiteSingleVerRelationalStorageExecutor *>(
353         sqliteStorageEngine_->FindExecutor(isWrite, OperatePerm::NORMAL_PERM, errCode));
354 }
ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor * & handle) const355 void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const
356 {
357     if (handle == nullptr) {
358         return;
359     }
360 
361     if (sqliteStorageEngine_ != nullptr) {
362         StorageExecutor *databaseHandle = handle;
363         sqliteStorageEngine_->Recycle(databaseHandle);
364         handle = nullptr;
365     }
366 }
367 
Sync(const ISyncer::SyncParma & syncParam,uint64_t connectionId)368 int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId)
369 {
370     return syncAbleEngine_->Sync(syncParam, connectionId);
371 }
372 
373 // Called when a connection released.
DecreaseConnectionCounter(uint64_t connectionId)374 void SQLiteRelationalStore::DecreaseConnectionCounter(uint64_t connectionId)
375 {
376     int count = connectionCount_.fetch_sub(1, std::memory_order_seq_cst);
377     if (count <= 0) {
378         LOGF("Decrease db connection counter failed, count <= 0.");
379         return;
380     }
381     if (storageEngine_ != nullptr) {
382         storageEngine_->EraseDataChangeCallback(connectionId);
383     }
384     if (count != 1) {
385         return;
386     }
387 
388     LockObj();
389     auto notifiers = std::move(closeNotifiers_);
390     UnlockObj();
391     for (const auto &notifier : notifiers) {
392         if (notifier) {
393             notifier();
394         }
395     }
396 
397     // Sync Close
398     syncAbleEngine_->Close();
399 
400     if (cloudSyncer_ != nullptr) {
401         cloudSyncer_->Close();
402         RefObject::KillAndDecObjRef(cloudSyncer_);
403         cloudSyncer_ = nullptr;
404     }
405 
406     if (sqliteStorageEngine_ != nullptr) {
407         sqliteStorageEngine_ = nullptr;
408     }
409     {
410         if (storageEngine_ != nullptr) {
411             storageEngine_->RegisterHeartBeatListener(nullptr);
412         }
413         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
414         StopLifeCycleTimer();
415         lifeCycleNotifier_ = nullptr;
416     }
417     // close will dec sync ref of storageEngine_
418     DecObjRef(storageEngine_);
419 }
420 
ReleaseDBConnection(uint64_t connectionId,RelationalStoreConnection * connection)421 void SQLiteRelationalStore::ReleaseDBConnection(uint64_t connectionId, RelationalStoreConnection *connection)
422 {
423     if (connectionCount_.load() == 1) {
424         sqliteStorageEngine_->SetConnectionFlag(false);
425     }
426 
427     connectMutex_.lock();
428     if (connection != nullptr) {
429         KillAndDecObjRef(connection);
430         DecreaseConnectionCounter(connectionId);
431         connectMutex_.unlock();
432         KillAndDecObjRef(this);
433     } else {
434         connectMutex_.unlock();
435     }
436 }
437 
WakeUpSyncer()438 void SQLiteRelationalStore::WakeUpSyncer()
439 {
440     syncAbleEngine_->WakeUpSyncer();
441 }
442 
CreateDistributedTable(const std::string & tableName,TableSyncType syncType,bool trackerSchemaChanged)443 int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType,
444     bool trackerSchemaChanged)
445 {
446     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
447     TableInfo tableInfo = localSchema.GetTable(tableName);
448     if (!tableInfo.Empty()) {
449         bool isSharedTable = tableInfo.GetSharedTableMark();
450         if (isSharedTable && !trackerSchemaChanged) {
451             return E_OK; // shared table will create distributed table when use SetCloudDbSchema
452         }
453     }
454 
455     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
456         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
457 
458     std::string localIdentity; // collaboration mode need local identify
459     if (mode == DistributedTableMode::COLLABORATION) {
460         int errCode = syncAbleEngine_->GetLocalIdentity(localIdentity);
461         if (errCode != E_OK || localIdentity.empty()) {
462             LOGD("Get local identity failed, can not create.");
463             return -E_NOT_SUPPORT;
464         }
465     }
466 
467     bool schemaChanged = false;
468     int errCode = sqliteStorageEngine_->CreateDistributedTable(tableName, DBCommon::TransferStringToHex(localIdentity),
469         schemaChanged, syncType, trackerSchemaChanged);
470     if (errCode != E_OK) {
471         LOGE("Create distributed table failed. %d", errCode);
472     }
473     if (schemaChanged) {
474         LOGD("Notify schema changed.");
475         storageEngine_->NotifySchemaChanged();
476     }
477     return errCode;
478 }
479 
GetCloudSyncTaskCount()480 int32_t SQLiteRelationalStore::GetCloudSyncTaskCount()
481 {
482     if (cloudSyncer_ == nullptr) {
483         LOGE("[RelationalStore] cloudSyncer was not initialized when get cloud sync task count.");
484         return -1;
485     }
486     return cloudSyncer_->GetCloudSyncTaskCount();
487 }
488 
CleanCloudData(ClearMode mode)489 int SQLiteRelationalStore::CleanCloudData(ClearMode mode)
490 {
491     auto tableMode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
492         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
493     if (tableMode == DistributedTableMode::COLLABORATION) {
494         LOGE("Not support remove device data in collaboration mode.");
495         return -E_NOT_SUPPORT;
496     }
497     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
498     TableInfoMap tables = localSchema.GetTables();
499     std::vector<std::string> cloudTableNameList;
500     for (const auto &tableInfo : tables) {
501         bool isSharedTable = tableInfo.second.GetSharedTableMark();
502         if ((mode == CLEAR_SHARED_TABLE && !isSharedTable) || (mode != CLEAR_SHARED_TABLE && isSharedTable)) {
503             continue;
504         }
505         if (tableInfo.second.GetTableSyncType() == CLOUD_COOPERATION) {
506             cloudTableNameList.push_back(tableInfo.first);
507         }
508     }
509     if (cloudTableNameList.empty()) {
510         LOGI("[RelationalStore] device doesn't has cloud table, clean cloud data finished.");
511         return E_OK;
512     }
513     if (cloudSyncer_ == nullptr) {
514         LOGE("[RelationalStore] cloudSyncer was not initialized when clean cloud data");
515         return -E_INVALID_DB;
516     }
517     int errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema);
518     if (errCode != E_OK) {
519         LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode);
520     }
521 
522     return errCode;
523 }
524 
RemoveDeviceData()525 int SQLiteRelationalStore::RemoveDeviceData()
526 {
527     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
528         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
529     if (mode == DistributedTableMode::COLLABORATION) {
530         LOGE("Not support remove device data in collaboration mode.");
531         return -E_NOT_SUPPORT;
532     }
533 
534     std::vector<std::string> tableNameList = GetAllDistributedTableName();
535     if (tableNameList.empty()) {
536         return E_OK;
537     }
538     // erase watermark first
539     int errCode = EraseAllDeviceWatermark(tableNameList);
540     if (errCode != E_OK) {
541         LOGE("remove watermark failed %d", errCode);
542         return errCode;
543     }
544     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
545     errCode = GetHandleAndStartTransaction(handle);
546     if (handle == nullptr) {
547         return errCode;
548     }
549 
550     for (const auto &table : tableNameList) {
551         errCode = handle->DeleteDistributedDeviceTable("", table);
552         if (errCode != E_OK) {
553             LOGE("delete device data failed. %d", errCode);
554             break;
555         }
556 
557         errCode = handle->DeleteDistributedAllDeviceTableLog(table);
558         if (errCode != E_OK) {
559             LOGE("delete device data failed. %d", errCode);
560             break;
561         }
562     }
563 
564     if (errCode != E_OK) {
565         (void)handle->Rollback();
566         ReleaseHandle(handle);
567         return errCode;
568     }
569 
570     errCode = handle->Commit();
571     ReleaseHandle(handle);
572     storageEngine_->NotifySchemaChanged();
573     return errCode;
574 }
575 
RemoveDeviceData(const std::string & device,const std::string & tableName)576 int SQLiteRelationalStore::RemoveDeviceData(const std::string &device, const std::string &tableName)
577 {
578     auto mode = static_cast<DistributedTableMode>(sqliteStorageEngine_->GetProperties().GetIntProp(
579         RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE));
580     if (mode == DistributedTableMode::COLLABORATION) {
581         LOGE("Not support remove device data in collaboration mode.");
582         return -E_NOT_SUPPORT;
583     }
584 
585     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
586     auto iter = tables.find(tableName);
587     if (tables.empty() || (!tableName.empty() && iter == tables.end())) {
588         LOGE("Remove device data with table name which is not a distributed table or no distributed table found.");
589         return -E_DISTRIBUTED_SCHEMA_NOT_FOUND;
590     }
591     // cloud mode is not permit
592     if (iter != tables.end() && iter->second.GetTableSyncType() == CLOUD_COOPERATION) {
593         LOGE("Remove device data with cloud sync table name.");
594         return -E_NOT_SUPPORT;
595     }
596     bool isNeedHash = false;
597     std::string hashDeviceId;
598     int errCode = syncAbleEngine_->GetHashDeviceId(device, hashDeviceId);
599     if (errCode == -E_NOT_SUPPORT) {
600         isNeedHash = true;
601         hashDeviceId = device;
602         errCode = E_OK;
603     }
604     if (errCode != E_OK) {
605         return errCode;
606     }
607     if (isNeedHash) {
608         // check device is uuid in meta
609         std::set<std::string> hashDevices;
610         errCode = GetExistDevices(hashDevices);
611         if (errCode != E_OK) {
612             return errCode;
613         }
614         if (hashDevices.find(DBCommon::TransferHashString(device)) == hashDevices.end()) {
615             LOGD("[SQLiteRelationalStore] not match device, just return");
616             return E_OK;
617         }
618     }
619     return RemoveDeviceDataInner(hashDeviceId, device, tableName, isNeedHash);
620 }
621 
RegisterObserverAction(uint64_t connectionId,const StoreObserver * observer,const RelationalObserverAction & action)622 int SQLiteRelationalStore::RegisterObserverAction(uint64_t connectionId, const StoreObserver *observer,
623     const RelationalObserverAction &action)
624 {
625     return storageEngine_->RegisterObserverAction(connectionId, observer, action);
626 }
627 
UnRegisterObserverAction(uint64_t connectionId,const StoreObserver * observer)628 int SQLiteRelationalStore::UnRegisterObserverAction(uint64_t connectionId, const StoreObserver *observer)
629 {
630     return storageEngine_->UnRegisterObserverAction(connectionId, observer);
631 }
632 
StopLifeCycleTimer()633 int SQLiteRelationalStore::StopLifeCycleTimer()
634 {
635     auto runtimeCxt = RuntimeContext::GetInstance();
636     if (runtimeCxt == nullptr) {
637         return -E_INVALID_ARGS;
638     }
639     if (lifeTimerId_ != 0) {
640         TimerId timerId = lifeTimerId_;
641         lifeTimerId_ = 0;
642         runtimeCxt->RemoveTimer(timerId, false);
643     }
644     return E_OK;
645 }
646 
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier)647 int SQLiteRelationalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier)
648 {
649     auto runtimeCxt = RuntimeContext::GetInstance();
650     if (runtimeCxt == nullptr) {
651         return -E_INVALID_ARGS;
652     }
653     RefObject::IncObjRef(this);
654     TimerId timerId = 0;
655     int errCode = runtimeCxt->SetTimer(
656         DBConstant::DEF_LIFE_CYCLE_TIME,
657         [this](TimerId id) -> int {
658             std::lock_guard<std::mutex> lock(lifeCycleMutex_);
659             if (lifeCycleNotifier_) {
660                 // normal identifier mode
661                 std::string identifier;
662                 if (sqliteStorageEngine_->GetProperties().GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
663                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(
664                         DBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
665                 } else {
666                     identifier = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
667                 }
668                 auto userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
669                 lifeCycleNotifier_(identifier, userId);
670             }
671             return 0;
672         },
673         [this]() {
674             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() { RefObject::DecObjRef(this); });
675             if (ret != E_OK) {
676                 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
677             }
678         },
679         timerId);
680     if (errCode != E_OK) {
681         lifeTimerId_ = 0;
682         LOGE("SetTimer failed:%d", errCode);
683         RefObject::DecObjRef(this);
684         return errCode;
685     }
686 
687     lifeCycleNotifier_ = notifier;
688     lifeTimerId_ = timerId;
689     return E_OK;
690 }
691 
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)692 int SQLiteRelationalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier)
693 {
694     int errCode;
695     {
696         std::lock_guard<std::mutex> lock(lifeCycleMutex_);
697         if (lifeTimerId_ != 0) {
698             errCode = StopLifeCycleTimer();
699             if (errCode != E_OK) {
700                 LOGE("Stop the life cycle timer failed:%d", errCode);
701                 return errCode;
702             }
703         }
704 
705         if (!notifier) {
706             return E_OK;
707         }
708         errCode = StartLifeCycleTimer(notifier);
709         if (errCode != E_OK) {
710             LOGE("Register life cycle timer failed:%d", errCode);
711             return errCode;
712         }
713     }
714     auto listener = [this] { HeartBeat(); };
715     storageEngine_->RegisterHeartBeatListener(listener);
716     return errCode;
717 }
718 
HeartBeat()719 void SQLiteRelationalStore::HeartBeat()
720 {
721     std::lock_guard<std::mutex> lock(lifeCycleMutex_);
722     int errCode = ResetLifeCycleTimer();
723     if (errCode != E_OK) {
724         LOGE("Heart beat for life cycle failed:%d", errCode);
725     }
726 }
727 
ResetLifeCycleTimer()728 int SQLiteRelationalStore::ResetLifeCycleTimer()
729 {
730     if (lifeTimerId_ == 0) {
731         return E_OK;
732     }
733     auto lifeNotifier = lifeCycleNotifier_;
734     lifeCycleNotifier_ = nullptr;
735     int errCode = StopLifeCycleTimer();
736     if (errCode != E_OK) {
737         LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
738     }
739     return StartLifeCycleTimer(lifeNotifier);
740 }
741 
GetStorePath() const742 std::string SQLiteRelationalStore::GetStorePath() const
743 {
744     return sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::DATA_DIR, "");
745 }
746 
GetProperties() const747 RelationalDBProperties SQLiteRelationalStore::GetProperties() const
748 {
749     return sqliteStorageEngine_->GetProperties();
750 }
751 
StopSync(uint64_t connectionId)752 void SQLiteRelationalStore::StopSync(uint64_t connectionId)
753 {
754     return syncAbleEngine_->StopSync(connectionId);
755 }
756 
Dump(int fd)757 void SQLiteRelationalStore::Dump(int fd)
758 {
759     std::string userId = "";
760     std::string appId = "";
761     std::string storeId = "";
762     std::string label = "";
763     if (sqliteStorageEngine_ != nullptr) {
764         userId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, "");
765         appId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, "");
766         storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
767         label = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
768     }
769     label = DBCommon::TransferStringToHex(label);
770     DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n", userId.c_str(), appId.c_str(),
771         storeId.c_str(), label.c_str());
772     if (syncAbleEngine_ != nullptr) {
773         syncAbleEngine_->Dump(fd);
774     }
775 }
776 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)777 int SQLiteRelationalStore::RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout,
778     uint64_t connectionId, std::shared_ptr<ResultSet> &result)
779 {
780     if (sqliteStorageEngine_ == nullptr) {
781         return -E_INVALID_DB;
782     }
783     if (condition.sql.size() > DBConstant::REMOTE_QUERY_MAX_SQL_LEN) {
784         LOGE("remote query sql len is larger than %" PRIu32, DBConstant::REMOTE_QUERY_MAX_SQL_LEN);
785         return -E_MAX_LIMITS;
786     }
787 
788     if (!sqliteStorageEngine_->GetSchema().IsSchemaValid()) {
789         LOGW("not a distributed relational store.");
790         return -E_NOT_SUPPORT;
791     }
792     const auto &properties = sqliteStorageEngine_->GetProperties();
793     int tableMode =
794         properties.GetIntProp(RelationalDBProperties::DISTRIBUTED_TABLE_MODE, DistributedTableMode::SPLIT_BY_DEVICE);
795     if (tableMode != DistributedTableMode::SPLIT_BY_DEVICE) {
796         LOGW("only support split mode.");
797         return -E_NOT_SUPPORT;
798     }
799 
800     // Check whether to be able to operate the db.
801     int errCode = E_OK;
802     auto *handle = GetHandle(false, errCode);
803     if (handle == nullptr) {
804         return errCode;
805     }
806     errCode = handle->CheckEncryptedOrCorrupted();
807     ReleaseHandle(handle);
808     if (errCode != E_OK) {
809         return errCode;
810     }
811 
812     return syncAbleEngine_->RemoteQuery(device, condition, timeout, connectionId, result);
813 }
814 
EraseAllDeviceWatermark(const std::vector<std::string> & tableNameList)815 int SQLiteRelationalStore::EraseAllDeviceWatermark(const std::vector<std::string> &tableNameList)
816 {
817     std::set<std::string> devices;
818     int errCode = GetExistDevices(devices);
819     if (errCode != E_OK) {
820         return errCode;
821     }
822     for (const auto &tableName : tableNameList) {
823         for (const auto &device : devices) {
824             errCode = syncAbleEngine_->EraseDeviceWaterMark(device, false, tableName);
825             if (errCode != E_OK) {
826                 return errCode;
827             }
828         }
829     }
830     return E_OK;
831 }
832 
GetDevTableName(const std::string & device,const std::string & hashDev) const833 std::string SQLiteRelationalStore::GetDevTableName(const std::string &device, const std::string &hashDev) const
834 {
835     std::string devTableName;
836     StoreInfo info = { sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::USER_ID, ""),
837         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::APP_ID, ""),
838         sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "") };
839     if (RuntimeContext::GetInstance()->TranslateDeviceId(device, info, devTableName) != E_OK) {
840         devTableName = hashDev;
841     }
842     return devTableName;
843 }
844 
GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor * & handle) const845 int SQLiteRelationalStore::GetHandleAndStartTransaction(SQLiteSingleVerRelationalStorageExecutor *&handle) const
846 {
847     int errCode = E_OK;
848     handle = GetHandle(true, errCode);
849     if (handle == nullptr) {
850         LOGE("get handle failed %d", errCode);
851         return errCode;
852     }
853 
854     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
855     if (errCode != E_OK) {
856         LOGE("start transaction failed %d", errCode);
857         ReleaseHandle(handle);
858     }
859     return errCode;
860 }
861 
RemoveDeviceDataInner(const std::string & mappingDev,const std::string & device,const std::string & tableName,bool isNeedHash)862 int SQLiteRelationalStore::RemoveDeviceDataInner(const std::string &mappingDev, const std::string &device,
863     const std::string &tableName, bool isNeedHash)
864 {
865     std::string hashHexDev;
866     std::string hashDev;
867     std::string devTableName;
868     if (!isNeedHash) {
869         // if is not need hash mappingDev mean hash(uuid) device is param device
870         hashHexDev = DBCommon::TransferStringToHex(mappingDev);
871         hashDev = mappingDev;
872         devTableName = device;
873     } else {
874         // if is need hash mappingDev mean uuid
875         hashDev = DBCommon::TransferHashString(mappingDev);
876         hashHexDev = DBCommon::TransferStringToHex(hashDev);
877         devTableName = GetDevTableName(mappingDev, hashHexDev);
878     }
879     // erase watermark first
880     int errCode = syncAbleEngine_->EraseDeviceWaterMark(hashDev, false, tableName);
881     if (errCode != E_OK) {
882         LOGE("erase watermark failed %d", errCode);
883         return errCode;
884     }
885     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
886     errCode = GetHandleAndStartTransaction(handle);
887     if (handle == nullptr) {
888         return errCode;
889     }
890 
891     errCode = handle->DeleteDistributedDeviceTable(devTableName, tableName);
892     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
893     if (errCode != E_OK) {
894         LOGE("delete device data failed. %d", errCode);
895         tables.clear();
896     }
897 
898     for (const auto &it : tables) {
899         if (tableName.empty() || it.second.GetTableName() == tableName) {
900             errCode = handle->DeleteDistributedDeviceTableLog(hashHexDev, it.second.GetTableName());
901             if (errCode != E_OK) {
902                 LOGE("delete device data failed. %d", errCode);
903                 break;
904             }
905         }
906     }
907 
908     if (errCode != E_OK) {
909         (void)handle->Rollback();
910         ReleaseHandle(handle);
911         return errCode;
912     }
913     errCode = handle->Commit();
914     ReleaseHandle(handle);
915     storageEngine_->NotifySchemaChanged();
916     return errCode;
917 }
918 
GetExistDevices(std::set<std::string> & hashDevices) const919 int SQLiteRelationalStore::GetExistDevices(std::set<std::string> &hashDevices) const
920 {
921     int errCode = E_OK;
922     auto *handle = GetHandle(true, errCode);
923     if (handle == nullptr) {
924         LOGE("[SingleVerRDBStore] GetExistsDeviceList get handle failed:%d", errCode);
925         return errCode;
926     }
927     errCode = handle->GetExistsDeviceList(hashDevices);
928     if (errCode != E_OK) {
929         LOGE("[SingleVerRDBStore] Get remove device list from meta failed. err=%d", errCode);
930     }
931     ReleaseHandle(handle);
932     return errCode;
933 }
934 
GetAllDistributedTableName()935 std::vector<std::string> SQLiteRelationalStore::GetAllDistributedTableName()
936 {
937     TableInfoMap tables = sqliteStorageEngine_->GetSchema().GetTables(); // TableInfoMap
938     std::vector<std::string> tableNames;
939     for (const auto &table : tables) {
940         if (table.second.GetTableSyncType() == TableSyncType::CLOUD_COOPERATION) {
941             continue;
942         }
943         tableNames.push_back(table.second.GetTableName());
944     }
945     return tableNames;
946 }
947 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)948 int SQLiteRelationalStore::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
949 {
950     if (cloudSyncer_ == nullptr) {
951         LOGE("[RelationalStore][SetCloudDB] cloudSyncer was not initialized");
952         return -E_INVALID_DB;
953     }
954     cloudSyncer_->SetCloudDB(cloudDb);
955     return E_OK;
956 }
957 
AddFields(const std::vector<Field> & newFields,const std::set<std::string> & equalFields,std::vector<Field> & addFields)958 void SQLiteRelationalStore::AddFields(const std::vector<Field> &newFields, const std::set<std::string> &equalFields,
959     std::vector<Field> &addFields)
960 {
961     for (const auto &newField : newFields) {
962         if (equalFields.find(newField.colName) == equalFields.end()) {
963             addFields.push_back(newField);
964         }
965     }
966 }
967 
CheckFields(const std::vector<Field> & newFields,const TableInfo & tableInfo,std::vector<Field> & addFields)968 bool SQLiteRelationalStore::CheckFields(const std::vector<Field> &newFields, const TableInfo &tableInfo,
969     std::vector<Field> &addFields)
970 {
971     std::vector<FieldInfo> oldFields = tableInfo.GetFieldInfos();
972     if (newFields.size() < oldFields.size()) {
973         return false;
974     }
975     std::set<std::string> equalFields;
976     for (const auto &oldField : oldFields) {
977         bool isFieldExist = false;
978         for (const auto &newField : newFields) {
979             if (newField.colName != oldField.GetFieldName()) {
980                 continue;
981             }
982             isFieldExist = true;
983             int32_t type = newField.type;
984             // Field type need to match storage type
985             // Field type : Nil, int64_t, double, std::string, bool, Bytes, Asset, Assets
986             // Storage type : NONE, NULL, INTEGER, REAL, TEXT, BLOB
987             if (type >= TYPE_INDEX<Nil> && type <= TYPE_INDEX<std::string>) {
988                 type++; // storage type - field type = 1
989             } else if (type == TYPE_INDEX<bool>) {
990                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_NULL);
991             } else if (type >= TYPE_INDEX<Asset> && type <= TYPE_INDEX<Assets>) {
992                 type = static_cast<int32_t>(StorageType::STORAGE_TYPE_BLOB);
993             }
994             auto primaryKeyMap = tableInfo.GetPrimaryKey();
995             auto it = std::find_if(primaryKeyMap.begin(), primaryKeyMap.end(),
996                 [&newField](const std::map<int, std::string>::value_type &pair) {
997                     return pair.second == newField.colName;
998                 });
999             if (type != static_cast<int32_t>(oldField.GetStorageType()) ||
1000                 newField.primary != (it != primaryKeyMap.end()) || newField.nullable == oldField.IsNotNull()) {
1001                 return false;
1002             }
1003             equalFields.insert(newField.colName);
1004         }
1005         if (!isFieldExist) {
1006             return false;
1007         }
1008     }
1009     AddFields(newFields, equalFields, addFields);
1010     return true;
1011 }
1012 
PrepareSharedTable(const DataBaseSchema & schema,std::vector<std::string> & deleteTableNames,std::map<std::string,std::vector<Field>> & updateTableNames,std::map<std::string,std::string> & alterTableNames)1013 bool SQLiteRelationalStore::PrepareSharedTable(const DataBaseSchema &schema, std::vector<std::string> &deleteTableNames,
1014     std::map<std::string, std::vector<Field>> &updateTableNames, std::map<std::string, std::string> &alterTableNames)
1015 {
1016     std::set<std::string> tableNames;
1017     std::map<std::string, std::string> sharedTableNamesMap;
1018     std::map<std::string, std::vector<Field>> fieldsMap;
1019     for (const auto &table : schema.tables) {
1020         tableNames.insert(table.name);
1021         sharedTableNamesMap[table.name] = table.sharedTableName;
1022         std::vector<Field> fields = table.fields;
1023         bool hasPrimaryKey = DBCommon::HasPrimaryKey(fields);
1024         Field ownerField = { CloudDbConstant::CLOUD_OWNER, TYPE_INDEX<std::string>, hasPrimaryKey };
1025         Field privilegeField = { CloudDbConstant::CLOUD_PRIVILEGE, TYPE_INDEX<std::string> };
1026         fields.insert(fields.begin(), privilegeField);
1027         fields.insert(fields.begin(), ownerField);
1028         fieldsMap[table.name] = fields;
1029     }
1030 
1031     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1032     TableInfoMap tableList = localSchema.GetTables();
1033     for (const auto &tableInfo : tableList) {
1034         if (!tableInfo.second.GetSharedTableMark()) {
1035             continue;
1036         }
1037         std::string oldSharedTableName = tableInfo.second.GetTableName();
1038         std::string oldOriginTableName = tableInfo.second.GetOriginTableName();
1039         std::vector<Field> addFields;
1040         if (tableNames.find(oldOriginTableName) == tableNames.end()) {
1041             deleteTableNames.push_back(oldSharedTableName);
1042         } else if (sharedTableNamesMap[oldOriginTableName].empty()) {
1043             deleteTableNames.push_back(oldSharedTableName);
1044         } else if (CheckFields(fieldsMap[oldOriginTableName], tableInfo.second, addFields)) {
1045             if (!addFields.empty()) {
1046                 updateTableNames[oldSharedTableName] = addFields;
1047             }
1048             if (oldSharedTableName != sharedTableNamesMap[oldOriginTableName]) {
1049                 alterTableNames[oldSharedTableName] = sharedTableNamesMap[oldOriginTableName];
1050             }
1051         } else {
1052             return false;
1053         }
1054     }
1055     return true;
1056 }
1057 
PrepareAndSetCloudDbSchema(const DataBaseSchema & schema)1058 int SQLiteRelationalStore::PrepareAndSetCloudDbSchema(const DataBaseSchema &schema)
1059 {
1060     if (storageEngine_ == nullptr) {
1061         LOGE("[RelationalStore][PrepareAndSetCloudDbSchema] storageEngine was not initialized");
1062         return -E_INVALID_DB;
1063     }
1064     int errCode = CheckCloudSchema(schema);
1065     if (errCode != E_OK) {
1066         return errCode;
1067     }
1068     // delete, update and create shared table and its distributed table
1069     errCode = ExecuteCreateSharedTable(schema);
1070     if (errCode != E_OK) {
1071         LOGE("[RelationalStore] prepare shared table failed:%d", errCode);
1072         return errCode;
1073     }
1074     return storageEngine_->SetCloudDbSchema(schema);
1075 }
1076 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)1077 int SQLiteRelationalStore::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
1078 {
1079     if (cloudSyncer_ == nullptr) {
1080         LOGE("[RelationalStore][SetIAssetLoader] cloudSyncer was not initialized");
1081         return -E_INVALID_DB;
1082     }
1083     cloudSyncer_->SetIAssetLoader(loader);
1084     return E_OK;
1085 }
1086 
ChkSchema(const TableName & tableName)1087 int SQLiteRelationalStore::ChkSchema(const TableName &tableName)
1088 {
1089     if (storageEngine_ == nullptr) {
1090         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1091         return -E_INVALID_DB;
1092     }
1093     return storageEngine_->ChkSchema(tableName);
1094 }
1095 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)1096 int SQLiteRelationalStore::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess, uint64_t taskId)
1097 {
1098     if (storageEngine_ == nullptr) {
1099         LOGE("[RelationalStore][Sync] storageEngine was not initialized");
1100         return -E_INVALID_DB;
1101     }
1102     int errCode = CheckBeforeSync(option);
1103     if (errCode != E_OK) {
1104         return errCode;
1105     }
1106     LOGI("sync mode:%d, pri:%d, comp:%d", option.mode, option.priorityTask, option.compensatedSyncOnly);
1107     if (option.compensatedSyncOnly) {
1108         CloudSyncer::CloudTaskInfo info = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
1109         info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1110         cloudSyncer_->GenerateCompensatedSync(info);
1111         return E_OK;
1112     }
1113     CloudSyncer::CloudTaskInfo info;
1114     FillSyncInfo(option, onProcess, info);
1115     auto [table, ret] = sqliteStorageEngine_->CalTableRef(info.table, storageEngine_->GetSharedTableOriginNames());
1116     if (ret != E_OK) {
1117         return ret;
1118     }
1119     ret = ReFillSyncInfoTable(table, info);
1120     if (ret != E_OK) {
1121         return ret;
1122     }
1123     info.taskId = taskId;
1124     errCode = cloudSyncer_->Sync(info);
1125     return errCode;
1126 }
1127 
CheckBeforeSync(const CloudSyncOption & option)1128 int SQLiteRelationalStore::CheckBeforeSync(const CloudSyncOption &option)
1129 {
1130     if (cloudSyncer_ == nullptr) {
1131         LOGE("[RelationalStore] cloudSyncer was not initialized when sync");
1132         return -E_INVALID_DB;
1133     }
1134     if (option.waitTime > DBConstant::MAX_SYNC_TIMEOUT || option.waitTime < DBConstant::INFINITE_WAIT) {
1135         return -E_INVALID_ARGS;
1136     }
1137     int errCode = CheckQueryValid(option);
1138     if (errCode != E_OK) {
1139         return errCode;
1140     }
1141     SecurityOption securityOption;
1142     errCode = storageEngine_->GetSecurityOption(securityOption);
1143     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
1144         return -E_SECURITY_OPTION_CHECK_ERROR;
1145     }
1146     if (errCode == E_OK && securityOption.securityLabel == S4) {
1147         return -E_SECURITY_OPTION_CHECK_ERROR;
1148     }
1149     return E_OK;
1150 }
1151 
CheckQueryValid(const CloudSyncOption & option)1152 int SQLiteRelationalStore::CheckQueryValid(const CloudSyncOption &option)
1153 {
1154     if (option.compensatedSyncOnly) {
1155         return E_OK;
1156     }
1157     QuerySyncObject syncObject(option.query);
1158     int errCode = syncObject.GetValidStatus();
1159     if (errCode != E_OK) {
1160         LOGE("[RelationalStore] query is invalid or not support %d", errCode);
1161         return errCode;
1162     }
1163     std::vector<QuerySyncObject> object = QuerySyncObject::GetQuerySyncObject(option.query);
1164     bool isFromTable = object.empty();
1165     if (!option.priorityTask && !isFromTable) {
1166         LOGE("[RelationalStore] not support normal sync with query");
1167         return -E_NOT_SUPPORT;
1168     }
1169     const auto tableNames = syncObject.GetRelationTableNames();
1170     for (const auto &tableName : tableNames) {
1171         QuerySyncObject querySyncObject;
1172         querySyncObject.SetTableName(tableName);
1173         object.push_back(querySyncObject);
1174     }
1175     std::vector<std::string> syncTableNames;
1176     for (const auto &item : object) {
1177         std::string tableName = item.GetRelationTableName();
1178         syncTableNames.emplace_back(tableName);
1179     }
1180     errCode = CheckTableName(syncTableNames);
1181     if (errCode != E_OK) {
1182         return errCode;
1183     }
1184     return CheckObjectValid(option.priorityTask, object, isFromTable);
1185 }
1186 
CheckObjectValid(bool priorityTask,const std::vector<QuerySyncObject> & object,bool isFromTable)1187 int SQLiteRelationalStore::CheckObjectValid(bool priorityTask, const std::vector<QuerySyncObject> &object,
1188     bool isFromTable)
1189 {
1190     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1191     for (const auto &item : object) {
1192         if (priorityTask && !item.IsContainQueryNodes() && !isFromTable) {
1193             LOGE("[RelationalStore] not support priority sync with full table");
1194             return -E_INVALID_ARGS;
1195         }
1196         int errCode = storageEngine_->CheckQueryValid(item);
1197         if (errCode != E_OK) {
1198             return errCode;
1199         }
1200         if (!priorityTask || isFromTable) {
1201             continue;
1202         }
1203         if (!item.IsInValueOutOfLimit()) {
1204             LOGE("[RelationalStore] not support priority sync in count out of limit");
1205             return -E_MAX_LIMITS;
1206         }
1207         std::string tableName = item.GetRelationTableName();
1208         TableInfo tableInfo = localSchema.GetTable(tableName);
1209         if (!tableInfo.Empty()) {
1210             const std::map<int, FieldName> &primaryKeyMap = tableInfo.GetPrimaryKey();
1211             errCode = item.CheckPrimaryKey(primaryKeyMap);
1212             if (errCode != E_OK) {
1213                 return errCode;
1214             }
1215         }
1216     }
1217     return E_OK;
1218 }
1219 
CheckTableName(const std::vector<std::string> & tableNames)1220 int SQLiteRelationalStore::CheckTableName(const std::vector<std::string> &tableNames)
1221 {
1222     if (tableNames.empty()) {
1223         LOGE("[RelationalStore] sync with empty table");
1224         return -E_INVALID_ARGS;
1225     }
1226     for (const auto &table : tableNames) {
1227         int errCode = ChkSchema(table);
1228         if (errCode != E_OK) {
1229             LOGE("[RelationalStore] schema check failed when sync");
1230             return errCode;
1231         }
1232     }
1233     return E_OK;
1234 }
1235 
FillSyncInfo(const CloudSyncOption & option,const SyncProcessCallback & onProcess,CloudSyncer::CloudTaskInfo & info)1236 void SQLiteRelationalStore::FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
1237     CloudSyncer::CloudTaskInfo &info)
1238 {
1239     auto syncObject = QuerySyncObject::GetQuerySyncObject(option.query);
1240     if (syncObject.empty()) {
1241         QuerySyncObject querySyncObject(option.query);
1242         info.table = querySyncObject.GetRelationTableNames();
1243         for (const auto &item : info.table) {
1244             QuerySyncObject object(Query::Select());
1245             object.SetTableName(item);
1246             info.queryList.push_back(object);
1247         }
1248     } else {
1249         for (auto &item : syncObject) {
1250             info.table.push_back(item.GetRelationTableName());
1251             info.queryList.push_back(std::move(item));
1252         }
1253     }
1254     info.devices = option.devices;
1255     info.mode = option.mode;
1256     info.callback = onProcess;
1257     info.timeout = option.waitTime;
1258     info.priorityTask = option.priorityTask;
1259     info.compensatedTask = option.compensatedSyncOnly;
1260     info.users.push_back("");
1261     info.lockAction = option.lockAction;
1262     info.merge = option.merge;
1263     info.storeId = sqliteStorageEngine_->GetProperties().GetStringProp(DBProperties::STORE_ID, "");
1264     info.prepareTraceId = option.prepareTraceId;
1265 }
1266 
SetTrackerTable(const TrackerSchema & trackerSchema)1267 int SQLiteRelationalStore::SetTrackerTable(const TrackerSchema &trackerSchema)
1268 {
1269     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1270     TableInfo tableInfo = localSchema.GetTable(trackerSchema.tableName);
1271     if (tableInfo.Empty()) {
1272         return sqliteStorageEngine_->SetTrackerTable(trackerSchema);
1273     }
1274     bool isFirstCreate = false;
1275     int errCode = sqliteStorageEngine_->CheckAndCacheTrackerSchema(trackerSchema, tableInfo, isFirstCreate);
1276     if (errCode != E_OK) {
1277         return errCode == -E_IGNORE_DATA ? E_OK : errCode;
1278     }
1279     errCode = CreateDistributedTable(trackerSchema.tableName, tableInfo.GetTableSyncType(), true);
1280     if (errCode != E_OK) {
1281         return errCode;
1282     }
1283     return sqliteStorageEngine_->SaveTrackerSchema(trackerSchema.tableName, isFirstCreate);
1284 }
1285 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)1286 int SQLiteRelationalStore::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
1287 {
1288     if (condition.sql.empty()) {
1289         LOGE("[RelationalStore] execute sql is empty.");
1290         return -E_INVALID_ARGS;
1291     }
1292     return sqliteStorageEngine_->ExecuteSql(condition, records);
1293 }
1294 
CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor * & handle,std::set<std::string> & clearWaterMarkTable)1295 int SQLiteRelationalStore::CleanWaterMark(SQLiteSingleVerRelationalStorageExecutor *&handle,
1296     std::set<std::string> &clearWaterMarkTable)
1297 {
1298     int errCode = E_OK;
1299     for (const auto &tableName : clearWaterMarkTable) {
1300         std::string cloudWaterMark;
1301         Value blobMetaVal;
1302         errCode = DBCommon::SerializeWaterMark(0, cloudWaterMark, blobMetaVal);
1303         if (errCode != E_OK) {
1304             LOGE("[SQLiteRelationalStore] SerializeWaterMark failed, errCode = %d", errCode);
1305             return errCode;
1306         }
1307         errCode = storageEngine_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal, true);
1308         if (errCode != E_OK) {
1309             LOGE("[SQLiteRelationalStore] put meta data failed, errCode = %d", errCode);
1310             return errCode;
1311         }
1312         errCode = handle->CleanUploadFinishedFlag(tableName);
1313         if (errCode != E_OK) {
1314             LOGE("[SQLiteRelationalStore] clean upload finished flag failed, errCode = %d", errCode);
1315             return errCode;
1316         }
1317     }
1318     errCode = cloudSyncer_->CleanWaterMarkInMemory(clearWaterMarkTable);
1319     if (errCode != E_OK) {
1320         LOGE("[SQLiteRelationalStore] CleanWaterMarkInMemory failed, errCode = %d", errCode);
1321     }
1322     return errCode;
1323 }
1324 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)1325 int SQLiteRelationalStore::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
1326 {
1327     SQLiteSingleVerRelationalStorageExecutor *handle = nullptr;
1328     int errCode = GetHandleAndStartTransaction(handle);
1329     if (errCode != E_OK) {
1330         LOGE("[SQLiteRelationalStore] SetReference start transaction failed, errCode = %d", errCode);
1331         return errCode;
1332     }
1333     std::set<std::string> clearWaterMarkTables;
1334     RelationalSchemaObject schema;
1335     errCode = sqliteStorageEngine_->SetReference(tableReferenceProperty, handle, clearWaterMarkTables, schema);
1336     if (errCode != E_OK && errCode != -E_TABLE_REFERENCE_CHANGED) {
1337         LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", errCode);
1338         (void)handle->Rollback();
1339         ReleaseHandle(handle);
1340         return errCode;
1341     }
1342 
1343     if (!clearWaterMarkTables.empty()) {
1344         storageEngine_->SetReusedHandle(handle);
1345         int ret = CleanWaterMark(handle, clearWaterMarkTables);
1346         if (ret != E_OK) {
1347             LOGE("[SQLiteRelationalStore] SetReference failed, errCode = %d", ret);
1348             storageEngine_->SetReusedHandle(nullptr);
1349             (void)handle->Rollback();
1350             ReleaseHandle(handle);
1351             return ret;
1352         }
1353         storageEngine_->SetReusedHandle(nullptr);
1354         LOGI("[SQLiteRelationalStore] SetReference clear water mark success");
1355     }
1356 
1357     int ret = handle->Commit();
1358     ReleaseHandle(handle);
1359     if (ret == E_OK) {
1360         sqliteStorageEngine_->SetSchema(schema);
1361         return errCode;
1362     }
1363     LOGE("[SQLiteRelationalStore] SetReference commit transaction failed, errCode = %d", ret);
1364     return ret;
1365 }
1366 
InitTrackerSchemaFromMeta()1367 int SQLiteRelationalStore::InitTrackerSchemaFromMeta()
1368 {
1369     int errCode = sqliteStorageEngine_->GetOrInitTrackerSchemaFromMeta();
1370     return errCode == -E_NOT_FOUND ? E_OK : errCode;
1371 }
1372 
CleanTrackerData(const std::string & tableName,int64_t cursor)1373 int SQLiteRelationalStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
1374 {
1375     if (tableName.empty()) {
1376         return -E_INVALID_ARGS;
1377     }
1378     return sqliteStorageEngine_->CleanTrackerData(tableName, cursor);
1379 }
1380 
ExecuteCreateSharedTable(const DataBaseSchema & schema)1381 int SQLiteRelationalStore::ExecuteCreateSharedTable(const DataBaseSchema &schema)
1382 {
1383     if (sqliteStorageEngine_ == nullptr) {
1384         LOGE("[RelationalStore][ExecuteCreateSharedTable] sqliteStorageEngine was not initialized");
1385         return -E_INVALID_DB;
1386     }
1387     std::vector<std::string> deleteTableNames;
1388     std::map<std::string, std::vector<Field>> updateTableNames;
1389     std::map<std::string, std::string> alterTableNames;
1390     if (!PrepareSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames)) {
1391         LOGE("[RelationalStore][ExecuteCreateSharedTable] table fields are invalid.");
1392         return -E_INVALID_ARGS;
1393     }
1394     LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table start");
1395     // upgrade contains delete, alter, update and create
1396     int errCode = sqliteStorageEngine_->UpgradeSharedTable(schema, deleteTableNames, updateTableNames, alterTableNames);
1397     if (errCode != E_OK) {
1398         LOGE("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table failed. %d", errCode);
1399     } else {
1400         LOGI("[RelationalStore][ExecuteCreateSharedTable] upgrade shared table end");
1401     }
1402     return errCode;
1403 }
1404 
ReFillSyncInfoTable(const std::vector<std::string> & actualTable,CloudSyncer::CloudTaskInfo & info)1405 int SQLiteRelationalStore::ReFillSyncInfoTable(const std::vector<std::string> &actualTable,
1406     CloudSyncer::CloudTaskInfo &info)
1407 {
1408     if (info.priorityTask && actualTable.size() != info.table.size()) {
1409         LOGE("[RelationalStore] Not support regenerate table with priority task");
1410         return -E_NOT_SUPPORT;
1411     }
1412     if (actualTable.size() == info.table.size()) {
1413         return E_OK;
1414     }
1415     LOGD("[RelationalStore] Fill tables from %zu to %zu", info.table.size(), actualTable.size());
1416     info.table = actualTable;
1417     info.queryList.clear();
1418     for (const auto &item : info.table) {
1419         QuerySyncObject object(Query::Select());
1420         object.SetTableName(item);
1421         info.queryList.push_back(object);
1422     }
1423     return E_OK;
1424 }
1425 
Pragma(PragmaCmd cmd,PragmaData & pragmaData)1426 int SQLiteRelationalStore::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
1427 {
1428     if (cmd != LOGIC_DELETE_SYNC_DATA) {
1429         return -E_NOT_SUPPORT;
1430     }
1431     if (pragmaData == nullptr) {
1432         return -E_INVALID_ARGS;
1433     }
1434     auto logicDelete = *(static_cast<bool *>(pragmaData));
1435     if (storageEngine_ == nullptr) {
1436         LOGE("[RelationalStore][ChkSchema] storageEngine was not initialized");
1437         return -E_INVALID_DB;
1438     }
1439     storageEngine_->SetLogicDelete(logicDelete);
1440     return E_OK;
1441 }
1442 
UpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1443 int SQLiteRelationalStore::UpsertData(RecordStatus status, const std::string &tableName,
1444     const std::vector<VBucket> &records)
1445 {
1446     if (storageEngine_ == nullptr) {
1447         LOGE("[RelationalStore][UpsertData] sqliteStorageEngine was not initialized");
1448         return -E_INVALID_DB;
1449     }
1450     int errCode = CheckParamForUpsertData(status, tableName, records);
1451     if (errCode != E_OK) {
1452         return errCode;
1453     }
1454     return storageEngine_->UpsertData(status, tableName, records);
1455 }
1456 
CheckParamForUpsertData(RecordStatus status,const std::string & tableName,const std::vector<VBucket> & records)1457 int SQLiteRelationalStore::CheckParamForUpsertData(RecordStatus status, const std::string &tableName,
1458     const std::vector<VBucket> &records)
1459 {
1460     if (status != RecordStatus::WAIT_COMPENSATED_SYNC) {
1461         LOGE("[RelationalStore][CheckParamForUpsertData] invalid status %" PRId64, static_cast<int64_t>(status));
1462         return -E_INVALID_ARGS;
1463     }
1464     if (records.empty()) {
1465         LOGE("[RelationalStore][CheckParamForUpsertData] records is empty");
1466         return -E_INVALID_ARGS;
1467     }
1468     size_t recordSize = records.size();
1469     if (recordSize > DBConstant::MAX_BATCH_SIZE) {
1470         LOGE("[RelationalStore][CheckParamForUpsertData] records size over limit, size %zu", recordSize);
1471         return -E_MAX_LIMITS;
1472     }
1473     return CheckSchemaForUpsertData(tableName, records);
1474 }
1475 
ChkTable(const TableInfo & table)1476 static int ChkTable(const TableInfo &table)
1477 {
1478     if (table.IsNoPkTable() || table.GetSharedTableMark()) {
1479         LOGE("[RelationalStore][ChkTable] not support table without pk or with tablemark");
1480         return -E_NOT_SUPPORT;
1481     }
1482     if (table.GetTableName().empty() || (table.GetTableSyncType() != TableSyncType::CLOUD_COOPERATION)) {
1483         return -E_NOT_FOUND;
1484     }
1485     return E_OK;
1486 }
1487 
CheckSchemaForUpsertData(const std::string & tableName,const std::vector<VBucket> & records)1488 int SQLiteRelationalStore::CheckSchemaForUpsertData(const std::string &tableName, const std::vector<VBucket> &records)
1489 {
1490     if (tableName.empty()) {
1491         return -E_INVALID_ARGS;
1492     }
1493     auto schema = storageEngine_->GetSchemaInfo();
1494     auto table = schema.GetTable(tableName);
1495     int errCode = ChkTable(table);
1496     if (errCode != E_OK) {
1497         return errCode;
1498     }
1499     TableSchema cloudTableSchema;
1500     errCode = storageEngine_->GetCloudTableSchema(tableName, cloudTableSchema);
1501     if (errCode != E_OK) {
1502         LOGE("Get cloud schema failed when check upsert data, %d", errCode);
1503         return errCode;
1504     }
1505     errCode = ChkSchema(tableName);
1506     if (errCode != E_OK) {
1507         return errCode;
1508     }
1509     std::set<std::string> dbPkFields;
1510     for (auto &field : table.GetIdentifyKey()) {
1511         dbPkFields.insert(field);
1512     }
1513     std::set<std::string> schemaFields;
1514     for (auto &fieldInfo : table.GetFieldInfos()) {
1515         schemaFields.insert(fieldInfo.GetFieldName());
1516     }
1517     for (const auto &record : records) {
1518         std::set<std::string> recordPkFields;
1519         for (const auto &item : record) {
1520             if (schemaFields.find(item.first) == schemaFields.end()) {
1521                 LOGE("[RelationalStore][CheckSchemaForUpsertData] invalid field not exist in schema");
1522                 return -E_INVALID_ARGS;
1523             }
1524             if (dbPkFields.find(item.first) == dbPkFields.end()) {
1525                 continue;
1526             }
1527             recordPkFields.insert(item.first);
1528         }
1529         if (recordPkFields.size() != dbPkFields.size()) {
1530             LOGE("[RelationalStore][CheckSchemaForUpsertData] pk size not equal param %zu schema %zu",
1531                 recordPkFields.size(), dbPkFields.size());
1532             return -E_INVALID_ARGS;
1533         }
1534     }
1535     return errCode;
1536 }
1537 
InitSQLiteStorageEngine(const RelationalDBProperties & properties)1538 int SQLiteRelationalStore::InitSQLiteStorageEngine(const RelationalDBProperties &properties)
1539 {
1540     auto engine = new(std::nothrow) SQLiteSingleRelationalStorageEngine(properties);
1541     if (engine == nullptr) {
1542         LOGE("[RelationalStore][Open] Create storage engine failed");
1543         return -E_OUT_OF_MEMORY;
1544     }
1545     sqliteStorageEngine_ = std::shared_ptr<SQLiteSingleRelationalStorageEngine>(engine,
1546         [](SQLiteSingleRelationalStorageEngine *releaseEngine) {
1547         RefObject::KillAndDecObjRef(releaseEngine);
1548     });
1549     return E_OK;
1550 }
1551 
CheckCloudSchema(const DataBaseSchema & schema)1552 int SQLiteRelationalStore::CheckCloudSchema(const DataBaseSchema &schema)
1553 {
1554     if (storageEngine_ == nullptr) {
1555         LOGE("[RelationalStore][CheckCloudSchema] storageEngine was not initialized");
1556         return -E_INVALID_DB;
1557     }
1558     std::shared_ptr<DataBaseSchema> cloudSchema;
1559     (void) storageEngine_->GetCloudDbSchema(cloudSchema);
1560     RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema();
1561     for (const auto &tableSchema : schema.tables) {
1562         TableInfo tableInfo = localSchema.GetTable(tableSchema.name);
1563         if (tableInfo.Empty()) {
1564             continue;
1565         }
1566         if (tableInfo.GetSharedTableMark()) {
1567             LOGE("[RelationalStore][CheckCloudSchema] Table name is existent shared table's name.");
1568             return -E_INVALID_ARGS;
1569         }
1570     }
1571     for (const auto &tableSchema : schema.tables) {
1572         if (cloudSchema == nullptr) {
1573             continue;
1574         }
1575         for (const auto &oldSchema : cloudSchema->tables) {
1576             if (!CloudStorageUtils::CheckCloudSchemaFields(tableSchema, oldSchema)) {
1577                 LOGE("[RelationalStore][CheckCloudSchema] Schema fields are invalid.");
1578                 return -E_INVALID_ARGS;
1579             }
1580         }
1581     }
1582     return E_OK;
1583 }
1584 
SetCloudSyncConfig(const CloudSyncConfig & config)1585 int SQLiteRelationalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
1586 {
1587     if (storageEngine_ == nullptr) {
1588         LOGE("[RelationalStore][SetCloudSyncConfig] sqliteStorageEngine was not initialized");
1589         return -E_INVALID_DB;
1590     }
1591     storageEngine_->SetCloudSyncConfig(config);
1592     return E_OK;
1593 }
1594 
GetCloudTaskStatus(uint64_t taskId)1595 SyncProcess SQLiteRelationalStore::GetCloudTaskStatus(uint64_t taskId)
1596 {
1597     return cloudSyncer_->GetCloudTaskStatus(taskId);
1598 }
1599 } //namespace DistributedDB
1600 #endif
1601