1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_natural_store.h"
17
18 #include <algorithm>
19 #include <thread>
20 #include <chrono>
21
22 #include "data_compression.h"
23 #include "db_common.h"
24 #include "db_constant.h"
25 #include "db_dump_helper.h"
26 #include "db_dfx_adapter.h"
27 #include "db_errno.h"
28 #include "generic_single_ver_kv_entry.h"
29 #include "intercepted_data_impl.h"
30 #include "kvdb_utils.h"
31 #include "log_print.h"
32 #include "platform_specific.h"
33 #include "schema_object.h"
34 #include "single_ver_database_oper.h"
35 #include "storage_engine_manager.h"
36 #include "sqlite_single_ver_natural_store_connection.h"
37 #include "value_hash_calc.h"
38
39 namespace DistributedDB {
40 namespace {
41 constexpr int WAIT_DELEGATE_CALLBACK_TIME = 100;
42
43 constexpr int DEVICE_ID_LEN = 32;
44 const std::string CREATE_DB_TIME = "createDBTime";
45
46 // Called when get multiple dev data.
47 // deviceID is the device which currently being getting. When getting one dev data, deviceID is "".
48 // dataItems is the DataItems which already be get from DB sorted by timestamp.
49 // token must not be null.
ProcessContinueToken(const DeviceID & deviceID,const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerContinueToken * & token)50 void ProcessContinueToken(const DeviceID &deviceID, const std::vector<DataItem> &dataItems, int &errCode,
51 SQLiteSingleVerContinueToken *&token)
52 {
53 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
54 delete token;
55 token = nullptr;
56 return;
57 }
58
59 if (dataItems.empty()) {
60 errCode = -E_INTERNAL_ERROR;
61 LOGE("Get data unfinished but dataitems is empty.");
62 delete token;
63 token = nullptr;
64 return;
65 }
66
67 Timestamp nextBeginTime = dataItems.back().timestamp + 1;
68 if (nextBeginTime > INT64_MAX) {
69 nextBeginTime = INT64_MAX;
70 }
71 token->SetNextBeginTime(deviceID, nextBeginTime);
72 return;
73 }
74
75 // Called when get one dev data.
ProcessContinueToken(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerContinueToken * & token)76 void ProcessContinueToken(const std::vector<DataItem> &dataItems, int &errCode,
77 SQLiteSingleVerContinueToken *&token)
78 {
79 ProcessContinueToken("", dataItems, errCode, token);
80 }
81
82 // Called when get query sync data.
83 // dataItems is the DataItems which already be get from DB sorted by timestamp.
84 // token must not be null.
ProcessContinueTokenForQuerySync(const std::vector<DataItem> & dataItems,int & errCode,SQLiteSingleVerContinueToken * & token)85 void ProcessContinueTokenForQuerySync(const std::vector<DataItem> &dataItems, int &errCode,
86 SQLiteSingleVerContinueToken *&token)
87 {
88 if (errCode != -E_UNFINISHED) { // Error happened or get data finished. Token should be cleared.
89 delete token;
90 token = nullptr;
91 return;
92 }
93
94 if (dataItems.empty()) {
95 errCode = -E_INTERNAL_ERROR;
96 LOGE("Get data unfinished but dataitems is empty.");
97 delete token;
98 token = nullptr;
99 return;
100 }
101
102 Timestamp nextBeginTime = dataItems.back().timestamp + 1;
103 if (nextBeginTime > INT64_MAX) {
104 nextBeginTime = INT64_MAX;
105 }
106 bool getDeleteData = ((dataItems.back().flag & DataItem::DELETE_FLAG) != 0);
107 if (getDeleteData) {
108 token->FinishGetQueryData();
109 token->SetDeletedNextBeginTime("", nextBeginTime);
110 } else {
111 token->SetNextBeginTime("", nextBeginTime);
112 }
113 return;
114 }
115
UpdateSecProperties(KvDBProperties & properties,bool isReadOnly,const SchemaObject & savedSchemaObj,const SQLiteSingleVerStorageEngine * engine)116 void UpdateSecProperties(KvDBProperties &properties, bool isReadOnly, const SchemaObject &savedSchemaObj,
117 const SQLiteSingleVerStorageEngine *engine)
118 {
119 if (isReadOnly) {
120 properties.SetSchema(savedSchemaObj);
121 properties.SetBoolProp(KvDBProperties::FIRST_OPEN_IS_READ_ONLY, true);
122 }
123 // Update the security option from the storage engine for that
124 // we will not update the security label and flag for the existed database.
125 // So the security label and flag are from the existed database.
126 if (engine == nullptr) {
127 return;
128 }
129 properties.SetIntProp(KvDBProperties::SECURITY_LABEL, engine->GetSecurityOption().securityLabel);
130 properties.SetIntProp(KvDBProperties::SECURITY_FLAG, engine->GetSecurityOption().securityFlag);
131 }
132
GetKvEntriesByDataItems(std::vector<SingleVerKvEntry * > & entries,std::vector<DataItem> & dataItems)133 int GetKvEntriesByDataItems(std::vector<SingleVerKvEntry *> &entries, std::vector<DataItem> &dataItems)
134 {
135 int errCode = E_OK;
136 for (auto &item : dataItems) {
137 auto entry = new (std::nothrow) GenericSingleVerKvEntry();
138 if (entry == nullptr) {
139 errCode = -E_OUT_OF_MEMORY;
140 LOGE("GetKvEntries failed, errCode:%d", errCode);
141 SingleVerKvEntry::Release(entries);
142 break;
143 }
144 entry->SetEntryData(std::move(item));
145 entries.push_back(entry);
146 }
147 return errCode;
148 }
149
CanHoldDeletedData(const std::vector<DataItem> & dataItems,const DataSizeSpecInfo & dataSizeInfo,size_t appendLen)150 bool CanHoldDeletedData(const std::vector<DataItem> &dataItems, const DataSizeSpecInfo &dataSizeInfo,
151 size_t appendLen)
152 {
153 bool reachThreshold = false;
154 size_t blockSize = 0;
155 for (size_t i = 0; !reachThreshold && i < dataItems.size(); i++) {
156 blockSize += SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(dataItems[i], appendLen);
157 reachThreshold = (blockSize >= dataSizeInfo.blockSize * DBConstant::QUERY_SYNC_THRESHOLD);
158 }
159 return !reachThreshold;
160 }
161 }
162
SQLiteSingleVerNaturalStore()163 SQLiteSingleVerNaturalStore::SQLiteSingleVerNaturalStore()
164 : currentMaxTimestamp_(0),
165 storageEngine_(nullptr),
166 notificationEventsRegistered_(false),
167 notificationConflictEventsRegistered_(false),
168 isInitialized_(false),
169 isReadOnly_(false),
170 lifeCycleNotifier_(nullptr),
171 lifeTimerId_(0),
172 autoLifeTime_(DBConstant::DEF_LIFE_CYCLE_TIME),
173 createDBTime_(0),
174 dataInterceptor_(nullptr),
175 maxLogSize_(DBConstant::MAX_LOG_SIZE_DEFAULT)
176 {}
177
~SQLiteSingleVerNaturalStore()178 SQLiteSingleVerNaturalStore::~SQLiteSingleVerNaturalStore()
179 {
180 ReleaseResources();
181 }
182
GetDatabasePath(const KvDBProperties & kvDBProp)183 std::string SQLiteSingleVerNaturalStore::GetDatabasePath(const KvDBProperties &kvDBProp)
184 {
185 std::string filePath = GetSubDirPath(kvDBProp) + "/" +
186 DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
187 return filePath;
188 }
189
GetSubDirPath(const KvDBProperties & kvDBProp)190 std::string SQLiteSingleVerNaturalStore::GetSubDirPath(const KvDBProperties &kvDBProp)
191 {
192 std::string dataDir = kvDBProp.GetStringProp(KvDBProperties::DATA_DIR, "");
193 std::string identifierDir = kvDBProp.GetStringProp(KvDBProperties::IDENTIFIER_DIR, "");
194 std::string dirPath = dataDir + "/" + identifierDir + "/" + DBConstant::SINGLE_SUB_DIR;
195 return dirPath;
196 }
197
SetUserVer(const KvDBProperties & kvDBProp,int version)198 int SQLiteSingleVerNaturalStore::SetUserVer(const KvDBProperties &kvDBProp, int version)
199 {
200 OpenDbProperties properties;
201 properties.uri = GetDatabasePath(kvDBProp);
202 bool isEncryptedDb = kvDBProp.GetBoolProp(KvDBProperties::ENCRYPTED_MODE, false);
203 if (isEncryptedDb) {
204 kvDBProp.GetPassword(properties.cipherType, properties.passwd);
205 }
206
207 int errCode = SQLiteUtils::SetUserVer(properties, version);
208 if (errCode != E_OK) {
209 LOGE("Recover for open db failed in single version:%d", errCode);
210 }
211 return errCode;
212 }
213
InitDatabaseContext(const KvDBProperties & kvDBProp,bool isNeedUpdateSecOpt)214 int SQLiteSingleVerNaturalStore::InitDatabaseContext(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt)
215 {
216 int errCode = InitStorageEngine(kvDBProp, isNeedUpdateSecOpt);
217 if (errCode != E_OK) {
218 return errCode;
219 }
220 InitCurrentMaxStamp();
221 return errCode;
222 }
223
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)224 int SQLiteSingleVerNaturalStore::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier)
225 {
226 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
227 int errCode;
228 if (!notifier) {
229 if (lifeTimerId_ == 0) {
230 return E_OK;
231 }
232 errCode = StopLifeCycleTimer();
233 if (errCode != E_OK) {
234 LOGE("Stop the life cycle timer failed:%d", errCode);
235 }
236 return E_OK;
237 }
238
239 if (lifeTimerId_ != 0) {
240 errCode = StopLifeCycleTimer();
241 if (errCode != E_OK) {
242 LOGE("Stop the life cycle timer failed:%d", errCode);
243 }
244 }
245 errCode = StartLifeCycleTimer(notifier);
246 if (errCode != E_OK) {
247 LOGE("Register life cycle timer failed:%d", errCode);
248 }
249 return errCode;
250 }
251
SetAutoLifeCycleTime(uint32_t time)252 int SQLiteSingleVerNaturalStore::SetAutoLifeCycleTime(uint32_t time)
253 {
254 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
255 if (lifeTimerId_ == 0) {
256 autoLifeTime_ = time;
257 } else {
258 auto runtimeCxt = RuntimeContext::GetInstance();
259 if (runtimeCxt == nullptr) {
260 return -E_INVALID_ARGS;
261 }
262 LOGI("[SingleVer] Set life cycle to %u", time);
263 int errCode = runtimeCxt->ModifyTimer(lifeTimerId_, time);
264 if (errCode != E_OK) {
265 return errCode;
266 }
267 autoLifeTime_ = time;
268 }
269 return E_OK;
270 }
271
GetSecurityOption(SecurityOption & option) const272 int SQLiteSingleVerNaturalStore::GetSecurityOption(SecurityOption &option) const
273 {
274 bool isMemDb = GetDbProperties().GetBoolProp(KvDBProperties::MEMORY_MODE, false);
275 if (isMemDb) {
276 LOGI("[GetSecurityOption] MemDb, no need to get security option");
277 option = SecurityOption();
278 return E_OK;
279 }
280
281 option.securityLabel = GetDbProperties().GetSecLabel();
282 option.securityFlag = GetDbProperties().GetSecFlag();
283
284 return E_OK;
285 }
286
287 namespace {
OriValueCanBeUse(int errCode)288 inline bool OriValueCanBeUse(int errCode)
289 {
290 return (errCode == -E_VALUE_MATCH);
291 }
292
AmendValueShouldBeUse(int errCode)293 inline bool AmendValueShouldBeUse(int errCode)
294 {
295 return (errCode == -E_VALUE_MATCH_AMENDED);
296 }
297
IsValueMismatched(int errCode)298 inline bool IsValueMismatched(int errCode)
299 {
300 return (errCode == -E_VALUE_MISMATCH_FEILD_COUNT ||
301 errCode == -E_VALUE_MISMATCH_FEILD_TYPE ||
302 errCode == -E_VALUE_MISMATCH_CONSTRAINT);
303 }
304 }
305
CheckValueAndAmendIfNeed(ValueSource sourceType,const Value & oriValue,Value & amendValue,bool & useAmendValue) const306 int SQLiteSingleVerNaturalStore::CheckValueAndAmendIfNeed(ValueSource sourceType, const Value &oriValue,
307 Value &amendValue, bool &useAmendValue) const
308 {
309 // oriValue size may already be checked previously, but check here const little
310 if (oriValue.size() > DBConstant::MAX_VALUE_SIZE) {
311 return -E_INVALID_ARGS;
312 }
313 const SchemaObject &schemaObjRef = MyProp().GetSchemaConstRef();
314 if (!schemaObjRef.IsSchemaValid()) {
315 // Not a schema database, do not need to check more
316 return E_OK;
317 }
318 if (schemaObjRef.GetSchemaType() == SchemaType::JSON) {
319 ValueObject valueObj;
320 int errCode = valueObj.Parse(oriValue.data(), oriValue.data() + oriValue.size(), schemaObjRef.GetSkipSize());
321 if (errCode != E_OK) {
322 return -E_INVALID_FORMAT;
323 }
324 errCode = schemaObjRef.CheckValueAndAmendIfNeed(sourceType, valueObj);
325 if (OriValueCanBeUse(errCode)) {
326 useAmendValue = false;
327 return E_OK;
328 }
329 if (AmendValueShouldBeUse(errCode)) {
330 std::string amended = valueObj.ToString();
331 if (amended.size() > DBConstant::MAX_VALUE_SIZE) {
332 LOGE("[SqlSinStore][CheckAmendValue] ValueSize=%zu exceed limit after amend.", amended.size());
333 return -E_INVALID_FORMAT;
334 }
335 amendValue.clear();
336 amendValue.assign(amended.begin(), amended.end());
337 useAmendValue = true;
338 return E_OK;
339 }
340 if (IsValueMismatched(errCode)) {
341 return errCode;
342 }
343 } else {
344 int errCode = schemaObjRef.VerifyValue(sourceType, oriValue);
345 if (errCode == E_OK) {
346 useAmendValue = false;
347 return E_OK;
348 }
349 }
350 // Any unexpected wrong
351 return -E_INVALID_FORMAT;
352 }
353
ClearIncompleteDatabase(const KvDBProperties & kvDBPro) const354 int SQLiteSingleVerNaturalStore::ClearIncompleteDatabase(const KvDBProperties &kvDBPro) const
355 {
356 std::string dbSubDir = SQLiteSingleVerNaturalStore::GetSubDirPath(kvDBPro);
357 if (OS::CheckPathExistence(dbSubDir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE)) {
358 int errCode = DBCommon::RemoveAllFilesOfDirectory(dbSubDir);
359 if (errCode != E_OK) {
360 LOGE("Remove the incomplete database dir failed!");
361 return -E_REMOVE_FILE;
362 }
363 }
364 return E_OK;
365 }
366
CheckDatabaseRecovery(const KvDBProperties & kvDBProp)367 int SQLiteSingleVerNaturalStore::CheckDatabaseRecovery(const KvDBProperties &kvDBProp)
368 {
369 if (kvDBProp.GetBoolProp(KvDBProperties::MEMORY_MODE, false)) { // memory status not need recovery
370 return E_OK;
371 }
372 std::unique_ptr<SingleVerDatabaseOper> operation = std::make_unique<SingleVerDatabaseOper>(this, nullptr);
373 (void)operation->ClearExportedTempFiles(kvDBProp);
374 int errCode = operation->RekeyRecover(kvDBProp);
375 if (errCode != E_OK) {
376 LOGE("Recover from rekey failed in single version:%d", errCode);
377 return errCode;
378 }
379
380 errCode = operation->ClearImportTempFile(kvDBProp);
381 if (errCode != E_OK) {
382 LOGE("Clear imported temp db failed in single version:%d", errCode);
383 return errCode;
384 }
385
386 // Currently, Design for the consistency of directory and file setting secOption
387 errCode = ClearIncompleteDatabase(kvDBProp);
388 if (errCode != E_OK) {
389 LOGE("Clear incomplete database failed in single version:%d", errCode);
390 return errCode;
391 }
392 const std::string dataDir = kvDBProp.GetStringProp(KvDBProperties::DATA_DIR, "");
393 const std::string identifierDir = kvDBProp.GetStringProp(KvDBProperties::IDENTIFIER_DIR, "");
394 bool isCreate = kvDBProp.GetBoolProp(KvDBProperties::CREATE_IF_NECESSARY, true);
395 bool isMemoryDb = kvDBProp.GetBoolProp(KvDBProperties::MEMORY_MODE, false);
396 if (!isMemoryDb) {
397 errCode = DBCommon::CreateStoreDirectory(dataDir, identifierDir, DBConstant::SINGLE_SUB_DIR, isCreate);
398 if (errCode != E_OK) {
399 LOGE("Create single version natural store directory failed:%d", errCode);
400 }
401 }
402 return errCode;
403 }
404
GetAndInitStorageEngine(const KvDBProperties & kvDBProp)405 int SQLiteSingleVerNaturalStore::GetAndInitStorageEngine(const KvDBProperties &kvDBProp)
406 {
407 int errCode = E_OK;
408 {
409 std::unique_lock<std::shared_mutex> lock(engineMutex_);
410 storageEngine_ =
411 static_cast<SQLiteSingleVerStorageEngine *>(StorageEngineManager::GetStorageEngine(kvDBProp, errCode));
412 if (storageEngine_ == nullptr) {
413 return errCode;
414 }
415 }
416
417 if (storageEngine_->IsEngineCorrupted()) {
418 LOGE("[SqlSinStore][GetAndInitStorageEngine] database engine is corrupted, not need continue to open!");
419 return -E_INVALID_PASSWD_OR_CORRUPTED_DB;
420 }
421
422 errCode = InitDatabaseContext(kvDBProp);
423 if (errCode != E_OK) {
424 LOGE("[SqlSinStore][Open] Init database context fail! errCode = [%d]", errCode);
425 }
426 return errCode;
427 }
428
Open(const KvDBProperties & kvDBProp)429 int SQLiteSingleVerNaturalStore::Open(const KvDBProperties &kvDBProp)
430 {
431 std::lock_guard<std::mutex> lock(initialMutex_);
432 if (isInitialized_) {
433 return E_OK; // avoid the reopen operation.
434 }
435
436 int errCode = CheckDatabaseRecovery(kvDBProp);
437 if (errCode != E_OK) {
438 return errCode;
439 }
440
441 bool isReadOnly = false;
442 SchemaObject savedSchemaObj;
443
444 errCode = GetAndInitStorageEngine(kvDBProp);
445 if (errCode != E_OK) {
446 goto ERROR;
447 }
448
449 errCode = RegisterNotification();
450 if (errCode != E_OK) {
451 LOGE("Register notification failed:%d", errCode);
452 goto ERROR;
453 }
454
455 errCode = RemoveAllSubscribe();
456 if (errCode != E_OK) {
457 LOGE("[SqlSinStore][Open] remove subscribe fail! errCode = [%d]", errCode);
458 goto ERROR;
459 }
460
461 // Here, the dbfile is created or opened, and upgrade of table structure has done.
462 // More, Upgrade of schema is also done in upgrader call in InitDatabaseContext, schema in dbfile updated if need.
463 // If inputSchema is empty, upgrader do nothing of schema, isReadOnly will be true if dbfile contain schema before.
464 // In this case, we should load the savedSchema for checking value from sync which not restricted by readOnly.
465 // If inputSchema not empty, isReadOnly will not be true, we should do nothing more.
466 errCode = DecideReadOnlyBaseOnSchema(kvDBProp, isReadOnly, savedSchemaObj);
467 if (errCode != E_OK) {
468 LOGE("[SqlSinStore][Open] DecideReadOnlyBaseOnSchema failed=%d", errCode);
469 goto ERROR;
470 }
471 // Set KvDBProperties and set Schema
472 MyProp() = kvDBProp;
473 UpdateSecProperties(MyProp(), isReadOnly, savedSchemaObj, storageEngine_);
474
475 StartSyncer();
476 OnKill([this]() { ReleaseResources(); });
477
478 errCode = SaveCreateDBTimeIfNotExisted();
479 if (errCode != E_OK) {
480 goto ERROR;
481 }
482
483 InitialLocalDataTimestamp();
484 isInitialized_ = true;
485 isReadOnly_ = isReadOnly;
486 return E_OK;
487 ERROR:
488 ReleaseResources();
489 return errCode;
490 }
491
Close()492 void SQLiteSingleVerNaturalStore::Close()
493 {
494 ReleaseResources();
495 }
496
NewConnection(int & errCode)497 GenericKvDBConnection *SQLiteSingleVerNaturalStore::NewConnection(int &errCode)
498 {
499 SQLiteSingleVerNaturalStoreConnection *connection = new (std::nothrow) SQLiteSingleVerNaturalStoreConnection(this);
500 if (connection == nullptr) {
501 errCode = -E_OUT_OF_MEMORY;
502 return nullptr;
503 }
504 errCode = E_OK;
505 return connection;
506 }
507
508 // Get interface type of this kvdb.
GetInterfaceType() const509 int SQLiteSingleVerNaturalStore::GetInterfaceType() const
510 {
511 return SYNC_SVD;
512 }
513
514 // Get the interface ref-count, in order to access asynchronously.
IncRefCount()515 void SQLiteSingleVerNaturalStore::IncRefCount()
516 {
517 IncObjRef(this);
518 }
519
520 // Drop the interface ref-count.
DecRefCount()521 void SQLiteSingleVerNaturalStore::DecRefCount()
522 {
523 DecObjRef(this);
524 }
525
526 // Get the identifier of this kvdb.
GetIdentifier() const527 std::vector<uint8_t> SQLiteSingleVerNaturalStore::GetIdentifier() const
528 {
529 std::string identifier = MyProp().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
530 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
531 return identifierVect;
532 }
533
GetDualTupleIdentifier() const534 std::vector<uint8_t> SQLiteSingleVerNaturalStore::GetDualTupleIdentifier() const
535 {
536 std::string identifier = MyProp().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
537 std::vector<uint8_t> identifierVect(identifier.begin(), identifier.end());
538 return identifierVect;
539 }
540
541 // Get interface for syncer.
GetSyncInterface()542 IKvDBSyncInterface *SQLiteSingleVerNaturalStore::GetSyncInterface()
543 {
544 return this;
545 }
546
GetMetaData(const Key & key,Value & value) const547 int SQLiteSingleVerNaturalStore::GetMetaData(const Key &key, Value &value) const
548 {
549 if (storageEngine_ == nullptr) {
550 return -E_INVALID_DB;
551 }
552 if (key.size() > DBConstant::MAX_KEY_SIZE) {
553 return -E_INVALID_ARGS;
554 }
555
556 int errCode = E_OK;
557 auto handle = GetHandle(true, errCode);
558 if (handle == nullptr) {
559 return errCode;
560 }
561
562 Timestamp timestamp;
563 errCode = handle->GetKvData(SingleVerDataType::META_TYPE, key, value, timestamp);
564 ReleaseHandle(handle);
565 HeartBeatForLifeCycle();
566 return errCode;
567 }
568
PutMetaData(const Key & key,const Value & value)569 int SQLiteSingleVerNaturalStore::PutMetaData(const Key &key, const Value &value)
570 {
571 int errCode = SQLiteSingleVerNaturalStore::CheckDataStatus(key, value, false);
572 if (errCode != E_OK) {
573 return errCode;
574 }
575
576 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
577 if (handle == nullptr) {
578 return errCode;
579 }
580
581 errCode = handle->PutKvData(SingleVerDataType::META_TYPE, key, value, 0, nullptr); // meta doesn't need time.
582 if (errCode != E_OK) {
583 LOGE("Put kv data err:%d", errCode);
584 }
585
586 HeartBeatForLifeCycle();
587 ReleaseHandle(handle);
588 return errCode;
589 }
590
591 // Delete multiple meta data records in a transaction.
DeleteMetaData(const std::vector<Key> & keys)592 int SQLiteSingleVerNaturalStore::DeleteMetaData(const std::vector<Key> &keys)
593 {
594 for (const auto &key : keys) {
595 if (key.empty() || key.size() > DBConstant::MAX_KEY_SIZE) {
596 return -E_INVALID_ARGS;
597 }
598 }
599 int errCode = E_OK;
600 auto handle = GetHandle(true, errCode);
601 if (handle == nullptr) {
602 return errCode;
603 }
604
605 handle->StartTransaction(TransactType::IMMEDIATE);
606 errCode = handle->DeleteMetaData(keys);
607 if (errCode != E_OK) {
608 handle->Rollback();
609 LOGE("[SinStore] DeleteMetaData failed, errCode = %d", errCode);
610 } else {
611 handle->Commit();
612 }
613
614 ReleaseHandle(handle);
615 HeartBeatForLifeCycle();
616 return errCode;
617 }
618
GetAllMetaKeys(std::vector<Key> & keys) const619 int SQLiteSingleVerNaturalStore::GetAllMetaKeys(std::vector<Key> &keys) const
620 {
621 if (storageEngine_ == nullptr) {
622 return -E_INVALID_DB;
623 }
624 int errCode = E_OK;
625 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
626 if (handle == nullptr) {
627 return errCode;
628 }
629
630 errCode = handle->GetAllMetaKeys(keys);
631 ReleaseHandle(handle);
632 return errCode;
633 }
634
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,bool isNeedCommit,int eventType)635 void SQLiteSingleVerNaturalStore::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
636 bool isNeedCommit, int eventType)
637 {
638 if (isNeedCommit) {
639 if (committedData != nullptr) {
640 if (!committedData->IsChangedDataEmpty()) {
641 CommitNotify(eventType, committedData);
642 }
643 if (!committedData->IsConflictedDataEmpty()) {
644 CommitNotify(SQLITE_GENERAL_CONFLICT_EVENT, committedData);
645 }
646 }
647 }
648
649 if (committedData != nullptr) {
650 committedData->DecObjRef(committedData);
651 committedData = nullptr;
652 }
653 }
654
GetSyncData(Timestamp begin,Timestamp end,std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const655 int SQLiteSingleVerNaturalStore::GetSyncData(Timestamp begin, Timestamp end, std::vector<SingleVerKvEntry *> &entries,
656 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
657 {
658 int errCode = CheckReadDataControlled();
659 if (errCode != E_OK) {
660 LOGE("[GetSyncData] Existed cache database can not read data, errCode = [%d]!", errCode);
661 return errCode;
662 }
663
664 std::vector<DataItem> dataItems;
665 errCode = GetSyncData(begin, end, dataItems, continueStmtToken, dataSizeInfo);
666 if (errCode != E_OK && errCode != -E_UNFINISHED) {
667 LOGE("GetSyncData errCode:%d", errCode);
668 goto ERROR;
669 }
670
671 for (auto &item : dataItems) {
672 GenericSingleVerKvEntry *entry = new (std::nothrow) GenericSingleVerKvEntry();
673 if (entry == nullptr) {
674 errCode = -E_OUT_OF_MEMORY;
675 LOGE("GetSyncData errCode:%d", errCode);
676 goto ERROR;
677 }
678 entry->SetEntryData(std::move(item));
679 entries.push_back(entry);
680 }
681
682 ERROR:
683 if (errCode != E_OK && errCode != -E_UNFINISHED) {
684 SingleVerKvEntry::Release(entries);
685 }
686 HeartBeatForLifeCycle();
687 return errCode;
688 }
689
GetSyncData(Timestamp begin,Timestamp end,std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const690 int SQLiteSingleVerNaturalStore::GetSyncData(Timestamp begin, Timestamp end, std::vector<DataItem> &dataItems,
691 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
692 {
693 if (begin >= end || dataSizeInfo.blockSize > DBConstant::MAX_SYNC_BLOCK_SIZE) {
694 return -E_INVALID_ARGS;
695 }
696
697 auto token = new (std::nothrow) SQLiteSingleVerContinueToken(begin, end);
698 if (token == nullptr) {
699 LOGE("[SQLiteSingleVerNaturalStore][NewToken] Bad alloc.");
700 return -E_OUT_OF_MEMORY;
701 }
702
703 int errCode = E_OK;
704 SQLiteSingleVerStorageExecutor *handle = GetHandle(false, errCode);
705 if (handle == nullptr) {
706 goto ERROR;
707 }
708
709 errCode = handle->GetSyncDataByTimestamp(dataItems, GetAppendedLen(), begin, end, dataSizeInfo);
710 if (errCode == -E_FINISHED) {
711 errCode = E_OK;
712 }
713
714 ERROR:
715 if (errCode != -E_UNFINISHED && errCode != E_OK) {
716 dataItems.clear();
717 }
718 ProcessContinueToken(dataItems, errCode, token);
719 continueStmtToken = static_cast<ContinueToken>(token);
720
721 ReleaseHandle(handle);
722 return errCode;
723 }
724
GetSyncData(QueryObject & query,const SyncTimeRange & timeRange,const DataSizeSpecInfo & dataSizeInfo,ContinueToken & continueStmtToken,std::vector<SingleVerKvEntry * > & entries) const725 int SQLiteSingleVerNaturalStore::GetSyncData(QueryObject &query, const SyncTimeRange &timeRange,
726 const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken,
727 std::vector<SingleVerKvEntry *> &entries) const
728 {
729 if (!timeRange.IsValid()) {
730 return -E_INVALID_ARGS;
731 }
732 int errCode = CheckReadDataControlled();
733 if (errCode != E_OK) {
734 LOGE("[GetEntries] Existed cache prevents the reading from query sync[%d]!", errCode);
735 return errCode;
736 }
737
738 query.SetSchema(GetSchemaObject());
739 auto token = new (std::nothrow) SQLiteSingleVerContinueToken(timeRange, query);
740 if (token == nullptr) {
741 LOGE("[SingleVerNStore] Allocate continue token failed.");
742 return -E_OUT_OF_MEMORY;
743 }
744
745 int innerCode;
746 std::vector<DataItem> dataItems;
747 errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
748 if (errCode != E_OK && errCode != -E_UNFINISHED) { // The code need be sent to outside except new error happened.
749 goto ERROR;
750 }
751
752 innerCode = GetKvEntriesByDataItems(entries, dataItems);
753 if (innerCode != E_OK) {
754 errCode = innerCode;
755 delete token;
756 token = nullptr;
757 }
758
759 ERROR:
760 continueStmtToken = static_cast<ContinueToken>(token);
761 return errCode;
762 }
763
764 /**
765 * Caller must ensure that parameter continueStmtToken is valid.
766 * If error happened, token will be deleted here.
767 */
GetSyncDataForQuerySync(std::vector<DataItem> & dataItems,SQLiteSingleVerContinueToken * & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const768 int SQLiteSingleVerNaturalStore::GetSyncDataForQuerySync(std::vector<DataItem> &dataItems,
769 SQLiteSingleVerContinueToken *&continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
770 {
771 int errCode = E_OK;
772 SQLiteSingleVerStorageExecutor *handle = GetHandle(false, errCode);
773 if (handle == nullptr) {
774 goto ERROR;
775 }
776
777 errCode = handle->StartTransaction(TransactType::DEFERRED);
778 if (errCode != E_OK) {
779 LOGE("[SingleVerNStore] Start transaction for get sync data failed. err=%d", errCode);
780 goto ERROR;
781 }
782
783 // Get query data.
784 if (!continueStmtToken->IsGetQueryDataFinished()) {
785 LOGD("[SingleVerNStore] Get query data between %" PRIu64 " and %" PRIu64 ".",
786 continueStmtToken->GetQueryBeginTime(), continueStmtToken->GetQueryEndTime());
787 errCode = handle->GetSyncDataWithQuery(continueStmtToken->GetQuery(), GetAppendedLen(), dataSizeInfo,
788 std::make_pair(continueStmtToken->GetQueryBeginTime(), continueStmtToken->GetQueryEndTime()), dataItems);
789 }
790
791 // Get query data finished.
792 if (errCode == E_OK || errCode == -E_FINISHED) {
793 // Clear query timeRange.
794 continueStmtToken->FinishGetQueryData();
795 if (!continueStmtToken->IsGetDeletedDataFinished()) {
796 errCode = -E_UNFINISHED;
797 // Get delete time next.
798 if (CanHoldDeletedData(dataItems, dataSizeInfo, GetAppendedLen())) {
799 LOGD("[SingleVerNStore] Get deleted data between %" PRIu64 " and %" PRIu64 ".",
800 continueStmtToken->GetDeletedBeginTime(), continueStmtToken->GetDeletedEndTime());
801 errCode = handle->GetDeletedSyncDataByTimestamp(dataItems, GetAppendedLen(),
802 continueStmtToken->GetDeletedBeginTime(), continueStmtToken->GetDeletedEndTime(), dataSizeInfo);
803 }
804 }
805 }
806
807 (void)handle->Rollback(); // roll back query statement
808 if (errCode == -E_FINISHED) {
809 errCode = E_OK;
810 }
811
812 ERROR:
813 if (errCode != -E_UNFINISHED && errCode != E_OK) { // Error happened.
814 dataItems.clear();
815 }
816 ProcessContinueTokenForQuerySync(dataItems, errCode, continueStmtToken);
817 ReleaseHandle(handle);
818 return errCode;
819 }
820
GetSyncDataNext(std::vector<SingleVerKvEntry * > & entries,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const821 int SQLiteSingleVerNaturalStore::GetSyncDataNext(std::vector<SingleVerKvEntry *> &entries,
822 ContinueToken &continueStmtToken, const DataSizeSpecInfo &dataSizeInfo) const
823 {
824 int errCode = CheckReadDataControlled();
825 if (errCode != E_OK) {
826 LOGE("[GetSyncDataNext] Existed cache database can not read data, errCode = [%d]!", errCode);
827 return errCode;
828 }
829
830 std::vector<DataItem> dataItems;
831 auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
832 if (token->IsQuerySync()) {
833 errCode = GetSyncDataForQuerySync(dataItems, token, dataSizeInfo);
834 continueStmtToken = static_cast<ContinueToken>(token);
835 } else {
836 errCode = GetSyncDataNext(dataItems, continueStmtToken, dataSizeInfo);
837 }
838
839 if (errCode != E_OK && errCode != -E_UNFINISHED) {
840 LOGE("GetSyncDataNext errCode:%d", errCode);
841 return errCode;
842 }
843
844 int innerErrCode = GetKvEntriesByDataItems(entries, dataItems);
845 if (innerErrCode != E_OK) {
846 errCode = innerErrCode;
847 ReleaseContinueToken(continueStmtToken);
848 }
849 return errCode;
850 }
851
GetSyncDataNext(std::vector<DataItem> & dataItems,ContinueToken & continueStmtToken,const DataSizeSpecInfo & dataSizeInfo) const852 int SQLiteSingleVerNaturalStore::GetSyncDataNext(std::vector<DataItem> &dataItems, ContinueToken &continueStmtToken,
853 const DataSizeSpecInfo &dataSizeInfo) const
854 {
855 if (dataSizeInfo.blockSize > DBConstant::MAX_SYNC_BLOCK_SIZE) {
856 return -E_INVALID_ARGS;
857 }
858
859 auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
860 if (token == nullptr || !(token->CheckValid())) {
861 LOGE("[SingleVerNaturalStore][GetSyncDataNext] invalid continue token.");
862 return -E_INVALID_ARGS;
863 }
864
865 int errCode = E_OK;
866 SQLiteSingleVerStorageExecutor *handle = GetHandle(false, errCode);
867 if (handle == nullptr) {
868 ReleaseContinueToken(continueStmtToken);
869 return errCode;
870 }
871
872 errCode = handle->GetSyncDataByTimestamp(dataItems, GetAppendedLen(), token->GetQueryBeginTime(),
873 token->GetQueryEndTime(), dataSizeInfo);
874 if (errCode == -E_FINISHED) {
875 errCode = E_OK;
876 }
877
878 ProcessContinueToken(dataItems, errCode, token);
879 continueStmtToken = static_cast<ContinueToken>(token);
880
881 ReleaseHandle(handle);
882 return errCode;
883 }
884
ReleaseContinueToken(ContinueToken & continueStmtToken) const885 void SQLiteSingleVerNaturalStore::ReleaseContinueToken(ContinueToken &continueStmtToken) const
886 {
887 auto token = static_cast<SQLiteSingleVerContinueToken *>(continueStmtToken);
888 if (token == nullptr || !(token->CheckValid())) {
889 LOGE("[SQLiteSingleVerNaturalStore][ReleaseContinueToken] Input is not a continue token.");
890 return;
891 }
892 delete token;
893 continueStmtToken = nullptr;
894 }
895
PutSyncDataWithQuery(const QueryObject & query,const std::vector<SingleVerKvEntry * > & entries,const std::string & deviceName)896 int SQLiteSingleVerNaturalStore::PutSyncDataWithQuery(const QueryObject &query,
897 const std::vector<SingleVerKvEntry *> &entries, const std::string &deviceName)
898 {
899 if (deviceName.length() > DBConstant::MAX_DEV_LENGTH) {
900 LOGW("Device length is invalid for sync put");
901 return -E_INVALID_ARGS;
902 }
903 HeartBeatForLifeCycle();
904 DeviceInfo deviceInfo = {false, deviceName};
905 if (deviceName.empty()) {
906 deviceInfo.deviceName = "Unknown";
907 }
908
909 std::vector<DataItem> dataItems;
910 for (const auto itemEntry : entries) {
911 auto *entry = static_cast<GenericSingleVerKvEntry *>(itemEntry);
912 if (entry != nullptr) {
913 DataItem item;
914 item.origDev = entry->GetOrigDevice();
915 item.flag = entry->GetFlag();
916 item.timestamp = entry->GetTimestamp();
917 item.writeTimestamp = entry->GetWriteTimestamp();
918 entry->GetKey(item.key);
919 entry->GetValue(item.value);
920 dataItems.push_back(item);
921 }
922 }
923
924 int errCode = SaveSyncDataItems(query, dataItems, deviceInfo, true); // Current is true to check value content
925 if (errCode != E_OK) {
926 LOGE("PutSyncData failed:%d", errCode);
927 }
928
929 return errCode;
930 }
931
GetMaxTimestamp(Timestamp & stamp) const932 void SQLiteSingleVerNaturalStore::GetMaxTimestamp(Timestamp &stamp) const
933 {
934 std::lock_guard<std::mutex> lock(maxTimestampMutex_);
935 stamp = currentMaxTimestamp_;
936 }
937
SetMaxTimestamp(Timestamp timestamp)938 int SQLiteSingleVerNaturalStore::SetMaxTimestamp(Timestamp timestamp)
939 {
940 std::lock_guard<std::mutex> lock(maxTimestampMutex_);
941 if (timestamp > currentMaxTimestamp_) {
942 currentMaxTimestamp_ = timestamp;
943 }
944 return E_OK;
945 }
946
947 // In sync procedure, call this function
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify)948 int SQLiteSingleVerNaturalStore::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify)
949 {
950 LOGI("[RemoveDeviceData] %s{private} rebuild, clear historydata", deviceName.c_str());
951 return RemoveDeviceData(deviceName, isNeedNotify, true);
952 }
953
954 // In local procedure, call this function
RemoveDeviceData(const std::string & deviceName,bool isNeedNotify,bool isInSync)955 int SQLiteSingleVerNaturalStore::RemoveDeviceData(const std::string &deviceName, bool isNeedNotify, bool isInSync)
956 {
957 if (!isInSync && !CheckWritePermission()) {
958 return -E_NOT_PERMIT;
959 }
960 std::string hashDeviceId;
961 bool hash = false;
962 do {
963 if (!deviceName.empty() && !isInSync) {
964 int errCode = GetHashDeviceId(deviceName, hashDeviceId);
965 if (errCode == -E_NOT_SUPPORT) {
966 break;
967 }
968 if (errCode != E_OK) {
969 return errCode;
970 }
971 hash = true;
972 }
973 } while (false);
974 if (!hash) {
975 hashDeviceId = DBCommon::TransferHashString(deviceName);
976 }
977 return RemoveDeviceDataInner(hashDeviceId, isNeedNotify, isInSync);
978 }
979
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify)980 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInCacheMode(const std::string &hashDev, bool isNeedNotify)
981 {
982 int errCode = E_OK;
983 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
984 if (handle == nullptr) {
985 LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode);
986 return errCode;
987 }
988 uint64_t recordVersion = GetAndIncreaseCacheRecordVersion();
989 LOGI("Remove device data in cache mode isNeedNotify:%d, recordVersion:%" PRIu64, isNeedNotify, recordVersion);
990 errCode = handle->RemoveDeviceDataInCacheMode(hashDev, isNeedNotify, recordVersion);
991 if (errCode != E_OK) {
992 LOGE("[SingleVerNStore] RemoveDeviceDataInCacheMode failed:%d", errCode);
993 }
994 ReleaseHandle(handle);
995 return errCode;
996 }
997
RemoveDeviceDataNormally(const std::string & hashDev,bool isNeedNotify)998 int SQLiteSingleVerNaturalStore::RemoveDeviceDataNormally(const std::string &hashDev, bool isNeedNotify)
999 {
1000 int errCode = E_OK;
1001 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
1002 if (handle == nullptr) {
1003 LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode);
1004 return errCode;
1005 }
1006
1007 std::vector<Entry> entries;
1008 if (isNeedNotify) {
1009 handle->GetAllSyncedEntries(hashDev, entries);
1010 }
1011
1012 LOGI("Remove device data:%d", isNeedNotify);
1013 errCode = handle->RemoveDeviceData(hashDev);
1014 if (errCode == E_OK && isNeedNotify) {
1015 NotifyRemovedData(entries);
1016 }
1017 ReleaseHandle(handle);
1018 return errCode;
1019 }
1020
NotifyRemovedData(std::vector<Entry> & entries)1021 void SQLiteSingleVerNaturalStore::NotifyRemovedData(std::vector<Entry> &entries)
1022 {
1023 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1024 return;
1025 }
1026
1027 size_t index = 0;
1028 size_t totalSize = 0;
1029 SingleVerNaturalStoreCommitNotifyData *notifyData = nullptr;
1030 while (index < entries.size()) {
1031 if (notifyData == nullptr) {
1032 notifyData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData;
1033 if (notifyData == nullptr) {
1034 LOGE("Failed to do commit sync removing because of OOM");
1035 break;
1036 }
1037 }
1038
1039 // ignore the invalid key.
1040 if (entries[index].key.size() > DBConstant::MAX_KEY_SIZE ||
1041 entries[index].value.size() > DBConstant::MAX_VALUE_SIZE) {
1042 index++;
1043 continue;
1044 }
1045
1046 if ((entries[index].key.size() + entries[index].value.size() + totalSize) > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1047 CommitAndReleaseNotifyData(notifyData, true, SQLITE_GENERAL_NS_SYNC_EVENT);
1048 totalSize = 0;
1049 notifyData = nullptr;
1050 continue;
1051 }
1052
1053 totalSize += (entries[index].key.size() + entries[index].value.size());
1054 notifyData->InsertCommittedData(std::move(entries[index]), DataType::DELETE, false);
1055 index++;
1056 }
1057 if (notifyData != nullptr) {
1058 CommitAndReleaseNotifyData(notifyData, true, SQLITE_GENERAL_NS_SYNC_EVENT);
1059 }
1060 }
1061
GetHandle(bool isWrite,int & errCode,OperatePerm perm) const1062 SQLiteSingleVerStorageExecutor *SQLiteSingleVerNaturalStore::GetHandle(bool isWrite, int &errCode,
1063 OperatePerm perm) const
1064 {
1065 engineMutex_.lock_shared();
1066 if (storageEngine_ == nullptr) {
1067 errCode = -E_INVALID_DB;
1068 engineMutex_.unlock_shared(); // unlock when get handle failed.
1069 return nullptr;
1070 }
1071 // Use for check database corrupted in Asynchronous task, like cache data migrate to main database
1072 if (storageEngine_->IsEngineCorrupted()) {
1073 CorruptNotify();
1074 errCode = -E_INVALID_PASSWD_OR_CORRUPTED_DB;
1075 engineMutex_.unlock_shared(); // unlock when get handle failed.
1076 LOGI("Handle is corrupted can not to get! errCode = [%d]", errCode);
1077 return nullptr;
1078 }
1079
1080 auto handle = storageEngine_->FindExecutor(isWrite, perm, errCode);
1081 if (handle == nullptr) {
1082 engineMutex_.unlock_shared(); // unlock when get handle failed.
1083 }
1084 return static_cast<SQLiteSingleVerStorageExecutor *>(handle);
1085 }
1086
ReleaseHandle(SQLiteSingleVerStorageExecutor * & handle) const1087 void SQLiteSingleVerNaturalStore::ReleaseHandle(SQLiteSingleVerStorageExecutor *&handle) const
1088 {
1089 if (handle == nullptr) {
1090 return;
1091 }
1092
1093 if (storageEngine_ != nullptr) {
1094 bool isCorrupted = handle->GetCorruptedStatus();
1095 StorageExecutor *databaseHandle = handle;
1096 storageEngine_->Recycle(databaseHandle);
1097 handle = nullptr;
1098 if (isCorrupted) {
1099 CorruptNotify();
1100 }
1101 }
1102 engineMutex_.unlock_shared(); // unlock after handle used up
1103 }
1104
RegisterNotification()1105 int SQLiteSingleVerNaturalStore::RegisterNotification()
1106 {
1107 static const std::vector<int> events {
1108 static_cast<int>(SQLITE_GENERAL_NS_LOCAL_PUT_EVENT),
1109 static_cast<int>(SQLITE_GENERAL_NS_PUT_EVENT),
1110 static_cast<int>(SQLITE_GENERAL_NS_SYNC_EVENT),
1111 static_cast<int>(SQLITE_GENERAL_CONFLICT_EVENT),
1112 };
1113
1114 for (auto event = events.begin(); event != events.end(); ++event) {
1115 int errCode = RegisterNotificationEventType(*event);
1116 if (errCode == E_OK) {
1117 continue;
1118 }
1119 LOGE("Register single version event %d failed:%d!", *event, errCode);
1120 for (auto iter = events.begin(); iter != event; ++iter) {
1121 UnRegisterNotificationEventType(*iter);
1122 }
1123 return errCode;
1124 }
1125
1126 notificationEventsRegistered_ = true;
1127 notificationConflictEventsRegistered_ = true;
1128 return E_OK;
1129 }
1130
ReleaseResources()1131 void SQLiteSingleVerNaturalStore::ReleaseResources()
1132 {
1133 SyncAbleKvDB::Close();
1134 if (notificationEventsRegistered_) {
1135 UnRegisterNotificationEventType(static_cast<EventType>(SQLITE_GENERAL_NS_SYNC_EVENT));
1136 UnRegisterNotificationEventType(static_cast<EventType>(SQLITE_GENERAL_NS_PUT_EVENT));
1137 UnRegisterNotificationEventType(static_cast<EventType>(SQLITE_GENERAL_NS_LOCAL_PUT_EVENT));
1138 notificationEventsRegistered_ = false;
1139 }
1140
1141 if (notificationConflictEventsRegistered_) {
1142 UnRegisterNotificationEventType(static_cast<EventType>(SQLITE_GENERAL_CONFLICT_EVENT));
1143 notificationConflictEventsRegistered_ = false;
1144 }
1145 {
1146 std::unique_lock<std::shared_mutex> lock(engineMutex_);
1147 if (storageEngine_ != nullptr) {
1148 storageEngine_->ClearEnginePasswd();
1149 (void)StorageEngineManager::ReleaseStorageEngine(storageEngine_);
1150 storageEngine_ = nullptr;
1151 }
1152 }
1153
1154 isInitialized_ = false;
1155 }
1156
InitCurrentMaxStamp()1157 void SQLiteSingleVerNaturalStore::InitCurrentMaxStamp()
1158 {
1159 if (storageEngine_ == nullptr) {
1160 return;
1161 }
1162 int errCode = E_OK;
1163 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
1164 if (handle == nullptr) {
1165 return;
1166 }
1167
1168 handle->InitCurrentMaxStamp(currentMaxTimestamp_);
1169 LOGD("Init max timestamp:%" PRIu64, currentMaxTimestamp_);
1170 ReleaseHandle(handle);
1171 }
1172
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * committedData)1173 void SQLiteSingleVerNaturalStore::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *committedData)
1174 {
1175 unsigned int conflictFlag = 0;
1176 if (GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1177 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1178 }
1179 if (GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1180 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1181 }
1182 if (GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1183 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_NATIVE_ALL);
1184 }
1185 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1186 }
1187
1188 // Currently this function only suitable to be call from sync in insert_record_from_sync procedure
1189 // Take attention if future coder attempt to call it in other situation procedure
SaveSyncDataItems(const QueryObject & query,std::vector<DataItem> & dataItems,const DeviceInfo & deviceInfo,bool checkValueContent)1190 int SQLiteSingleVerNaturalStore::SaveSyncDataItems(const QueryObject &query, std::vector<DataItem> &dataItems,
1191 const DeviceInfo &deviceInfo, bool checkValueContent)
1192 {
1193 // Sync procedure does not care readOnly Flag
1194 if (storageEngine_ == nullptr) {
1195 return -E_INVALID_DB;
1196 }
1197 int errCode = E_OK;
1198 for (const auto &item : dataItems) {
1199 // Check only the key and value size
1200 errCode = CheckDataStatus(item.key, item.value, (item.flag & DataItem::DELETE_FLAG) != 0);
1201 if (errCode != E_OK) {
1202 return errCode;
1203 }
1204 }
1205 if (checkValueContent) {
1206 CheckAmendValueContentForSyncProcedure(dataItems);
1207 }
1208 QueryObject queryInner = query;
1209 queryInner.SetSchema(GetSchemaObjectConstRef());
1210 if (IsExtendedCacheDBMode()) {
1211 errCode = SaveSyncDataToCacheDB(queryInner, dataItems, deviceInfo);
1212 } else {
1213 errCode = SaveSyncDataToMain(queryInner, dataItems, deviceInfo);
1214 }
1215 if (errCode != E_OK) {
1216 LOGE("[SingleVerNStore] SaveSyncDataItems failed:%d", errCode);
1217 }
1218 return errCode;
1219 }
1220
SaveSyncDataToMain(const QueryObject & query,std::vector<DataItem> & dataItems,const DeviceInfo & deviceInfo)1221 int SQLiteSingleVerNaturalStore::SaveSyncDataToMain(const QueryObject &query, std::vector<DataItem> &dataItems,
1222 const DeviceInfo &deviceInfo)
1223 {
1224 auto *committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData;
1225 if (committedData == nullptr) {
1226 LOGE("[SingleVerNStore] Failed to alloc single version notify data");
1227 return -E_OUT_OF_MEMORY;
1228 }
1229 InitConflictNotifiedFlag(committedData);
1230 Timestamp maxTimestamp = 0;
1231 bool isNeedCommit = false;
1232 int errCode = SaveSyncItems(query, dataItems, deviceInfo, maxTimestamp, committedData);
1233 if (errCode == E_OK) {
1234 isNeedCommit = true;
1235 (void)SetMaxTimestamp(maxTimestamp);
1236 }
1237
1238 CommitAndReleaseNotifyData(committedData, isNeedCommit, SQLITE_GENERAL_NS_SYNC_EVENT);
1239 return errCode;
1240 }
1241
1242 // Currently, this function only suitable to be call from sync in insert_record_from_sync procedure
1243 // Take attention if future coder attempt to call it in other situation procedure
SaveSyncItems(const QueryObject & query,std::vector<DataItem> & dataItems,const DeviceInfo & deviceInfo,Timestamp & maxTimestamp,SingleVerNaturalStoreCommitNotifyData * commitData) const1244 int SQLiteSingleVerNaturalStore::SaveSyncItems(const QueryObject &query, std::vector<DataItem> &dataItems,
1245 const DeviceInfo &deviceInfo, Timestamp &maxTimestamp, SingleVerNaturalStoreCommitNotifyData *commitData) const
1246 {
1247 int errCode = E_OK;
1248 int innerCode = E_OK;
1249 LOGD("[SQLiteSingleVerNaturalStore::SaveSyncData] Get write handle.");
1250 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
1251 if (handle == nullptr) {
1252 return errCode;
1253 }
1254 DBDfxAdapter::StartTraceSQL();
1255 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1256 if (errCode != E_OK) {
1257 ReleaseHandle(handle);
1258 DBDfxAdapter::FinishTraceSQL();
1259 return errCode;
1260 }
1261 bool isPermitForceWrite = !(GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
1262 errCode = handle->CheckDataWithQuery(query, dataItems, deviceInfo);
1263 if (errCode != E_OK) {
1264 goto END;
1265 }
1266 errCode = handle->PrepareForSavingData(SingleVerDataType::SYNC_TYPE);
1267 if (errCode != E_OK) {
1268 goto END;
1269 }
1270 for (auto &item: dataItems) {
1271 if (item.neglect) { // Do not save this record if it is neglected
1272 continue;
1273 }
1274 errCode = handle->SaveSyncDataItem(item, deviceInfo, maxTimestamp, commitData, isPermitForceWrite);
1275 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1276 break;
1277 }
1278 }
1279 if (errCode == -E_NOT_FOUND) {
1280 errCode = E_OK;
1281 }
1282 innerCode = handle->ResetForSavingData(SingleVerDataType::SYNC_TYPE);
1283 if (innerCode != E_OK) {
1284 errCode = innerCode;
1285 }
1286 END:
1287 if (errCode == E_OK) {
1288 errCode = handle->Commit();
1289 } else {
1290 (void)handle->Rollback(); // Keep the error code of the first scene
1291 }
1292 DBDfxAdapter::FinishTraceSQL();
1293 ReleaseHandle(handle);
1294 return errCode;
1295 }
1296
SaveSyncDataToCacheDB(const QueryObject & query,std::vector<DataItem> & dataItems,const DeviceInfo & deviceInfo)1297 int SQLiteSingleVerNaturalStore::SaveSyncDataToCacheDB(const QueryObject &query, std::vector<DataItem> &dataItems,
1298 const DeviceInfo &deviceInfo)
1299 {
1300 int errCode = E_OK;
1301 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
1302 if (handle == nullptr) {
1303 return errCode;
1304 }
1305
1306 Timestamp maxTimestamp = 0;
1307 DBDfxAdapter::StartTraceSQL();
1308 errCode = SaveSyncItemsInCacheMode(handle, query, dataItems, deviceInfo, maxTimestamp);
1309 if (errCode != E_OK) {
1310 LOGE("[SingleVerNStore] Failed to save sync data in cache mode, err : %d", errCode);
1311 } else {
1312 (void)SetMaxTimestamp(maxTimestamp);
1313 }
1314 DBDfxAdapter::FinishTraceSQL();
1315 ReleaseHandle(handle);
1316 return errCode;
1317 }
1318
GetCurrentTimestamp()1319 Timestamp SQLiteSingleVerNaturalStore::GetCurrentTimestamp()
1320 {
1321 return GetTimestamp();
1322 }
1323
InitStorageEngine(const KvDBProperties & kvDBProp,bool isNeedUpdateSecOpt)1324 int SQLiteSingleVerNaturalStore::InitStorageEngine(const KvDBProperties &kvDBProp, bool isNeedUpdateSecOpt)
1325 {
1326 OpenDbProperties option;
1327 InitDataBaseOption(kvDBProp, option);
1328
1329 bool isMemoryMode = kvDBProp.GetBoolProp(KvDBProperties::MEMORY_MODE, false);
1330 StorageEngineAttr poolSize = {1, 1, 1, 16}; // at most 1 write 16 read.
1331 if (isMemoryMode) {
1332 poolSize.minWriteNum = 1; // keep at least one connection.
1333 }
1334
1335 storageEngine_->SetNotifiedCallback(
1336 [&](int eventType, KvDBCommitNotifyFilterAbleData *committedData) {
1337 if (eventType == SQLITE_GENERAL_FINISH_MIGRATE_EVENT) {
1338 return this->TriggerSync(eventType);
1339 }
1340 auto commitData = static_cast<SingleVerNaturalStoreCommitNotifyData *>(committedData);
1341 this->CommitAndReleaseNotifyData(commitData, true, eventType);
1342 }
1343 );
1344
1345 std::string identifier = kvDBProp.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
1346 storageEngine_->SetNeedUpdateSecOption(isNeedUpdateSecOpt);
1347 int errCode = storageEngine_->InitSQLiteStorageEngine(poolSize, option, identifier);
1348 if (errCode != E_OK) {
1349 LOGE("Init the sqlite storage engine failed:%d", errCode);
1350 }
1351 return errCode;
1352 }
1353
Rekey(const CipherPassword & passwd)1354 int SQLiteSingleVerNaturalStore::Rekey(const CipherPassword &passwd)
1355 {
1356 // Check the storage engine and try to disable the engine.
1357 if (storageEngine_ == nullptr) {
1358 return -E_INVALID_DB;
1359 }
1360
1361 std::unique_ptr<SingleVerDatabaseOper> operation;
1362
1363 // stop the syncer
1364 int errCode = storageEngine_->TryToDisable(false, OperatePerm::REKEY_MONOPOLIZE_PERM);
1365 if (errCode != E_OK) {
1366 return errCode;
1367 }
1368 LOGI("Stop the syncer for rekey");
1369 StopSyncer(true);
1370 std::this_thread::sleep_for(std::chrono::milliseconds(5)); // wait for 5 ms
1371 errCode = storageEngine_->TryToDisable(true, OperatePerm::REKEY_MONOPOLIZE_PERM);
1372 if (errCode != E_OK) {
1373 LOGE("[Rekey] Failed to disable the database: %d", errCode);
1374 goto END;
1375 }
1376
1377 if (storageEngine_->GetEngineState() != EngineState::MAINDB) {
1378 LOGE("Rekey is not supported while cache exists! state = [%d]", storageEngine_->GetEngineState());
1379 errCode = (storageEngine_->GetEngineState() == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
1380 goto END;
1381 }
1382
1383 operation = std::make_unique<SingleVerDatabaseOper>(this, storageEngine_);
1384 LOGI("Operation rekey");
1385 errCode = operation->Rekey(passwd);
1386 END:
1387 // Only maindb state have existed handle, if rekey fail other state will create error cache db
1388 // Abort can forbid get new handle, requesting handle will return BUSY and nullptr handle
1389 if (errCode != -E_FORBID_CACHEDB) {
1390 storageEngine_->Enable(OperatePerm::REKEY_MONOPOLIZE_PERM);
1391 } else {
1392 storageEngine_->Abort(OperatePerm::REKEY_MONOPOLIZE_PERM);
1393 errCode = E_OK;
1394 }
1395 StartSyncer();
1396 return errCode;
1397 }
1398
Export(const std::string & filePath,const CipherPassword & passwd)1399 int SQLiteSingleVerNaturalStore::Export(const std::string &filePath, const CipherPassword &passwd)
1400 {
1401 if (storageEngine_ == nullptr) {
1402 return -E_INVALID_DB;
1403 }
1404 if (MyProp().GetBoolProp(KvDBProperties::MEMORY_MODE, false)) {
1405 return -E_NOT_SUPPORT;
1406 }
1407
1408 // Exclusively write resources
1409 std::string localDev;
1410 int errCode = GetLocalIdentity(localDev);
1411 if (errCode == -E_NOT_INIT) {
1412 localDev.resize(DEVICE_ID_LEN);
1413 } else if (errCode != E_OK) {
1414 LOGE("Get local dev id err:%d", errCode);
1415 localDev.resize(0);
1416 }
1417
1418 // The write handle is applied to prevent writing data during the export process.
1419 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode, OperatePerm::NORMAL_PERM);
1420 if (handle == nullptr) {
1421 return errCode;
1422 }
1423
1424 // forbid migrate by hold write handle not release
1425 if (storageEngine_->GetEngineState() != EngineState::MAINDB) {
1426 LOGE("Not support export when cacheDB existed! state = [%d]", storageEngine_->GetEngineState());
1427 errCode = (storageEngine_->GetEngineState() == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
1428 ReleaseHandle(handle);
1429 return errCode;
1430 }
1431
1432 std::unique_ptr<SingleVerDatabaseOper> operation = std::make_unique<SingleVerDatabaseOper>(this, storageEngine_);
1433 operation->SetLocalDevId(localDev);
1434 LOGI("Begin export the kv store");
1435 errCode = operation->Export(filePath, passwd);
1436
1437 ReleaseHandle(handle);
1438 return errCode;
1439 }
1440
Import(const std::string & filePath,const CipherPassword & passwd)1441 int SQLiteSingleVerNaturalStore::Import(const std::string &filePath, const CipherPassword &passwd)
1442 {
1443 if (storageEngine_ == nullptr) {
1444 return -E_INVALID_DB;
1445 }
1446 if (MyProp().GetBoolProp(KvDBProperties::MEMORY_MODE, false)) {
1447 return -E_NOT_SUPPORT;
1448 }
1449
1450 std::string localDev;
1451 int errCode = GetLocalIdentity(localDev);
1452 if (errCode == -E_NOT_INIT) {
1453 localDev.resize(DEVICE_ID_LEN);
1454 } else if (errCode != E_OK) {
1455 LOGE("Failed to GetLocalIdentity!");
1456 localDev.resize(0);
1457 }
1458
1459 // stop the syncer
1460 errCode = storageEngine_->TryToDisable(false, OperatePerm::IMPORT_MONOPOLIZE_PERM);
1461 if (errCode != E_OK) {
1462 return errCode;
1463 }
1464 StopSyncer(true);
1465 std::this_thread::sleep_for(std::chrono::milliseconds(5)); // wait for 5 ms
1466 std::unique_ptr<SingleVerDatabaseOper> operation;
1467
1468 errCode = storageEngine_->TryToDisable(true, OperatePerm::IMPORT_MONOPOLIZE_PERM);
1469 if (errCode != E_OK) {
1470 LOGE("[Import] Failed to disable the database: %d", errCode);
1471 goto END;
1472 }
1473
1474 if (storageEngine_->GetEngineState() != EngineState::MAINDB) {
1475 LOGE("Not support import when cacheDB existed! state = [%d]", storageEngine_->GetEngineState());
1476 errCode = (storageEngine_->GetEngineState() == EngineState::CACHEDB) ? -E_NOT_SUPPORT : -E_BUSY;
1477 goto END;
1478 }
1479
1480 operation = std::make_unique<SingleVerDatabaseOper>(this, storageEngine_);
1481 operation->SetLocalDevId(localDev);
1482 errCode = operation->Import(filePath, passwd);
1483 if (errCode != E_OK) {
1484 goto END;
1485 }
1486
1487 // Save create db time.
1488 storageEngine_->Enable(OperatePerm::IMPORT_MONOPOLIZE_PERM);
1489
1490 // Get current max timestamp after import and before start syncer, reflash local time offset
1491 InitCurrentMaxStamp();
1492 errCode = SaveCreateDBTime(); // This step will start syncer
1493
1494 END:
1495 // restore the storage engine and the syncer.
1496 storageEngine_->Enable(OperatePerm::IMPORT_MONOPOLIZE_PERM);
1497 StartSyncer();
1498 return errCode;
1499 }
1500
CheckWritePermission() const1501 bool SQLiteSingleVerNaturalStore::CheckWritePermission() const
1502 {
1503 return !isReadOnly_;
1504 }
1505
GetSchemaInfo() const1506 SchemaObject SQLiteSingleVerNaturalStore::GetSchemaInfo() const
1507 {
1508 return MyProp().GetSchemaConstRef();
1509 }
1510
GetSchemaObject() const1511 SchemaObject SQLiteSingleVerNaturalStore::GetSchemaObject() const
1512 {
1513 return MyProp().GetSchema();
1514 }
1515
GetSchemaObjectConstRef() const1516 const SchemaObject &SQLiteSingleVerNaturalStore::GetSchemaObjectConstRef() const
1517 {
1518 return MyProp().GetSchemaConstRef();
1519 }
1520
CheckCompatible(const std::string & schema,uint8_t type) const1521 bool SQLiteSingleVerNaturalStore::CheckCompatible(const std::string &schema, uint8_t type) const
1522 {
1523 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
1524 if (!localSchema.IsSchemaValid() || schema.empty() || ReadSchemaType(type) == SchemaType::NONE) {
1525 // If at least one of local or remote is normal-kvdb, then allow sync
1526 LOGI("IsLocalSchemaDb=%d, IsRemoteSchemaDb=%d.", localSchema.IsSchemaValid(), !schema.empty());
1527 return true;
1528 }
1529 // Here both are schema-db, check their compatibility mutually
1530 SchemaObject remoteSchema;
1531 int errCode = remoteSchema.ParseFromSchemaString(schema);
1532 if (errCode != E_OK) {
1533 // Consider: if the parse errCode is SchemaVersionNotSupport, we can consider allow sync if schemaType equal.
1534 LOGE("Parse remote schema fail, errCode=%d.", errCode);
1535 return false;
1536 }
1537 // First, Compare remoteSchema based on localSchema
1538 errCode = localSchema.CompareAgainstSchemaObject(remoteSchema);
1539 if (errCode != -E_SCHEMA_UNEQUAL_INCOMPATIBLE) {
1540 LOGI("Remote(Maybe newer) compatible based on local, result=%d.", errCode);
1541 return true;
1542 }
1543 // Second, Compare localSchema based on remoteSchema
1544 errCode = remoteSchema.CompareAgainstSchemaObject(localSchema);
1545 if (errCode != -E_SCHEMA_UNEQUAL_INCOMPATIBLE) {
1546 LOGI("Local(Newer) compatible based on remote, result=%d.", errCode);
1547 return true;
1548 }
1549 LOGE("Local incompatible with remote mutually.");
1550 return false;
1551 }
1552
InitDataBaseOption(const KvDBProperties & kvDBProp,OpenDbProperties & option)1553 void SQLiteSingleVerNaturalStore::InitDataBaseOption(const KvDBProperties &kvDBProp, OpenDbProperties &option)
1554 {
1555 std::string uri = GetDatabasePath(kvDBProp);
1556 bool isMemoryDb = kvDBProp.GetBoolProp(KvDBProperties::MEMORY_MODE, false);
1557 if (isMemoryDb) {
1558 std::string identifierDir = kvDBProp.GetStringProp(KvDBProperties::IDENTIFIER_DIR, "");
1559 uri = identifierDir + DBConstant::SQLITE_MEMDB_IDENTIFY;
1560 LOGD("Begin create memory natural store database");
1561 }
1562 std::string subDir = GetSubDirPath(kvDBProp);
1563 CipherType cipherType;
1564 CipherPassword passwd;
1565 kvDBProp.GetPassword(cipherType, passwd);
1566 std::string schemaStr = kvDBProp.GetSchema().ToSchemaString();
1567
1568 bool isCreateNecessary = kvDBProp.GetBoolProp(KvDBProperties::CREATE_IF_NECESSARY, true);
1569 std::vector<std::string> createTableSqls;
1570
1571 SecurityOption securityOpt;
1572 if (RuntimeContext::GetInstance()->IsProcessSystemApiAdapterValid()) {
1573 securityOpt.securityLabel = kvDBProp.GetSecLabel();
1574 securityOpt.securityFlag = kvDBProp.GetSecFlag();
1575 }
1576
1577 option = {uri, isCreateNecessary, isMemoryDb, createTableSqls, cipherType, passwd, schemaStr, subDir, securityOpt};
1578 option.conflictReslovePolicy = kvDBProp.GetIntProp(KvDBProperties::CONFLICT_RESOLVE_POLICY, DEFAULT_LAST_WIN);
1579 option.createDirByStoreIdOnly = kvDBProp.GetBoolProp(KvDBProperties::CREATE_DIR_BY_STORE_ID_ONLY, false);
1580 }
1581
TransObserverTypeToRegisterFunctionType(int observerType,RegisterFuncType & type) const1582 int SQLiteSingleVerNaturalStore::TransObserverTypeToRegisterFunctionType(
1583 int observerType, RegisterFuncType &type) const
1584 {
1585 static constexpr TransPair transMap[] = {
1586 { static_cast<int>(SQLITE_GENERAL_NS_PUT_EVENT), OBSERVER_SINGLE_VERSION_NS_PUT_EVENT },
1587 { static_cast<int>(SQLITE_GENERAL_NS_SYNC_EVENT), OBSERVER_SINGLE_VERSION_NS_SYNC_EVENT },
1588 { static_cast<int>(SQLITE_GENERAL_NS_LOCAL_PUT_EVENT), OBSERVER_SINGLE_VERSION_NS_LOCAL_EVENT },
1589 { static_cast<int>(SQLITE_GENERAL_CONFLICT_EVENT), OBSERVER_SINGLE_VERSION_NS_CONFLICT_EVENT },
1590 };
1591 auto funcType = GetFuncType(observerType, transMap, sizeof(transMap) / sizeof(TransPair));
1592 if (funcType == REGISTER_FUNC_TYPE_MAX) {
1593 return -E_NOT_SUPPORT;
1594 }
1595 type = funcType;
1596 return E_OK;
1597 }
1598
TransConflictTypeToRegisterFunctionType(int conflictType,RegisterFuncType & type) const1599 int SQLiteSingleVerNaturalStore::TransConflictTypeToRegisterFunctionType(
1600 int conflictType, RegisterFuncType &type) const
1601 {
1602 static constexpr TransPair transMap[] = {
1603 { static_cast<int>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY), CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY },
1604 { static_cast<int>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG), CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG },
1605 { static_cast<int>(SQLITE_GENERAL_NS_NATIVE_ALL), CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL },
1606 };
1607 auto funcType = GetFuncType(conflictType, transMap, sizeof(transMap) / sizeof(TransPair));
1608 if (funcType == REGISTER_FUNC_TYPE_MAX) {
1609 return -E_NOT_SUPPORT;
1610 }
1611 type = funcType;
1612 return E_OK;
1613 }
1614
GetFuncType(int index,const TransPair * transMap,int32_t len)1615 RegisterFuncType SQLiteSingleVerNaturalStore::GetFuncType(int index, const TransPair *transMap, int32_t len)
1616 {
1617 int32_t head = 0;
1618 int32_t end = len - 1;
1619 while (head <= end) {
1620 int32_t mid = (head + end) / 2;
1621 if (transMap[mid].index < index) {
1622 head = mid + 1;
1623 continue;
1624 }
1625 if (transMap[mid].index > index) {
1626 end = mid - 1;
1627 continue;
1628 }
1629 return transMap[mid].funcType;
1630 }
1631 return REGISTER_FUNC_TYPE_MAX;
1632 }
1633
GetSchema(SchemaObject & schema) const1634 int SQLiteSingleVerNaturalStore::GetSchema(SchemaObject &schema) const
1635 {
1636 int errCode = E_OK;
1637 auto handle = GetHandle(true, errCode); // Only open kvdb use, no competition for write handle
1638 if (handle == nullptr) {
1639 return errCode;
1640 }
1641
1642 Timestamp timestamp;
1643 std::string schemaKey = DBConstant::SCHEMA_KEY;
1644 Key key(schemaKey.begin(), schemaKey.end());
1645 Value value;
1646 errCode = handle->GetKvData(SingleVerDataType::META_TYPE, key, value, timestamp);
1647 if (errCode == E_OK) {
1648 std::string schemaValue(value.begin(), value.end());
1649 errCode = schema.ParseFromSchemaString(schemaValue);
1650 } else {
1651 LOGI("[SqlSinStore] Get schema error:%d.", errCode);
1652 }
1653 ReleaseHandle(handle);
1654 return errCode;
1655 }
1656
DecideReadOnlyBaseOnSchema(const KvDBProperties & kvDBProp,bool & isReadOnly,SchemaObject & savedSchemaObj) const1657 int SQLiteSingleVerNaturalStore::DecideReadOnlyBaseOnSchema(const KvDBProperties &kvDBProp, bool &isReadOnly,
1658 SchemaObject &savedSchemaObj) const
1659 {
1660 // Check whether it is a memory db
1661 if (kvDBProp.GetBoolProp(KvDBProperties::MEMORY_MODE, false)) {
1662 isReadOnly = false;
1663 return E_OK;
1664 }
1665 SchemaObject inputSchemaObj = kvDBProp.GetSchema();
1666 if (!inputSchemaObj.IsSchemaValid()) {
1667 int errCode = GetSchema(savedSchemaObj);
1668 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1669 LOGE("[SqlSinStore][DecideReadOnly] GetSchema fail=%d.", errCode);
1670 return errCode;
1671 }
1672 if (savedSchemaObj.IsSchemaValid()) {
1673 isReadOnly = true;
1674 return E_OK;
1675 }
1676 }
1677 // An valid schema will not lead to readonly
1678 isReadOnly = false;
1679 return E_OK;
1680 }
1681
InitialLocalDataTimestamp()1682 void SQLiteSingleVerNaturalStore::InitialLocalDataTimestamp()
1683 {
1684 Timestamp timestamp = GetCurrentTimestamp();
1685
1686 int errCode = E_OK;
1687 auto handle = GetHandle(true, errCode);
1688 if (handle == nullptr) {
1689 return;
1690 }
1691
1692 errCode = handle->UpdateLocalDataTimestamp(timestamp);
1693 if (errCode != E_OK) {
1694 LOGE("Update the timestamp for local data failed:%d", errCode);
1695 }
1696 ReleaseHandle(handle);
1697 }
1698
GetDbProperties() const1699 const KvDBProperties &SQLiteSingleVerNaturalStore::GetDbProperties() const
1700 {
1701 return GetMyProperties();
1702 }
1703
RemoveKvDB(const KvDBProperties & properties)1704 int SQLiteSingleVerNaturalStore::RemoveKvDB(const KvDBProperties &properties)
1705 {
1706 // To avoid leakage, the engine resources are forced to be released
1707 const std::string identifier = properties.GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
1708 (void)StorageEngineManager::ForceReleaseStorageEngine(identifier);
1709
1710 // Only care the data directory and the db name.
1711 std::string storeOnlyDir;
1712 std::string storeDir;
1713 GenericKvDB::GetStoreDirectory(properties, KvDBProperties::SINGLE_VER_TYPE, storeDir, storeOnlyDir);
1714
1715 const std::vector<std::pair<const std::string &, const std::string &>> dbDir {
1716 {DBConstant::MAINDB_DIR, DBConstant::SINGLE_VER_DATA_STORE},
1717 {DBConstant::METADB_DIR, DBConstant::SINGLE_VER_META_STORE},
1718 {DBConstant::CACHEDB_DIR, DBConstant::SINGLE_VER_CACHE_STORE}};
1719
1720 bool isAllNotFound = true;
1721 for (const auto &item : dbDir) {
1722 std::string currentDir = storeDir + item.first + "/";
1723 std::string currentOnlyDir = storeOnlyDir + item.first + "/";
1724 int errCode = KvDBUtils::RemoveKvDB(currentDir, currentOnlyDir, item.second);
1725 if (errCode != -E_NOT_FOUND) {
1726 if (errCode != E_OK) {
1727 return errCode;
1728 }
1729 isAllNotFound = false;
1730 }
1731 };
1732 if (isAllNotFound) {
1733 return -E_NOT_FOUND;
1734 }
1735
1736 int errCode = DBCommon::RemoveAllFilesOfDirectory(storeDir, true);
1737 if (errCode != E_OK) {
1738 return errCode;
1739 }
1740 errCode = DBCommon::RemoveAllFilesOfDirectory(storeOnlyDir, true);
1741 if (errCode != E_OK) {
1742 return errCode;
1743 }
1744 return errCode;
1745 }
1746
GetKvDBSize(const KvDBProperties & properties,uint64_t & size) const1747 int SQLiteSingleVerNaturalStore::GetKvDBSize(const KvDBProperties &properties, uint64_t &size) const
1748 {
1749 std::string storeOnlyIdentDir;
1750 std::string storeIdentDir;
1751 GenericKvDB::GetStoreDirectory(properties, KvDBProperties::SINGLE_VER_TYPE, storeIdentDir, storeOnlyIdentDir);
1752 const std::vector<std::pair<const std::string &, const std::string &>> dbDir {
1753 {DBConstant::MAINDB_DIR, DBConstant::SINGLE_VER_DATA_STORE},
1754 {DBConstant::METADB_DIR, DBConstant::SINGLE_VER_META_STORE},
1755 {DBConstant::CACHEDB_DIR, DBConstant::SINGLE_VER_CACHE_STORE}};
1756 int errCode = -E_NOT_FOUND;
1757 for (const auto &item : dbDir) {
1758 std::string storeDir = storeIdentDir + item.first;
1759 std::string storeOnlyDir = storeOnlyIdentDir + item.first;
1760 int err = KvDBUtils::GetKvDbSize(storeDir, storeOnlyDir, item.second, size);
1761 if (err != -E_NOT_FOUND && err != E_OK) {
1762 return err;
1763 }
1764 if (err == E_OK) {
1765 errCode = E_OK;
1766 }
1767 }
1768 return errCode;
1769 }
1770
GetDbPropertyForUpdate()1771 KvDBProperties &SQLiteSingleVerNaturalStore::GetDbPropertyForUpdate()
1772 {
1773 return MyProp();
1774 }
1775
HeartBeatForLifeCycle() const1776 void SQLiteSingleVerNaturalStore::HeartBeatForLifeCycle() const
1777 {
1778 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
1779 int errCode = ResetLifeCycleTimer();
1780 if (errCode != E_OK) {
1781 LOGE("Heart beat for life cycle failed:%d", errCode);
1782 }
1783 }
1784
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier) const1785 int SQLiteSingleVerNaturalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier ¬ifier) const
1786 {
1787 auto runtimeCxt = RuntimeContext::GetInstance();
1788 if (runtimeCxt == nullptr) {
1789 return -E_INVALID_ARGS;
1790 }
1791 RefObject::IncObjRef(this);
1792 TimerId timerId = 0;
1793 int errCode = runtimeCxt->SetTimer(autoLifeTime_,
1794 [this](TimerId id) -> int {
1795 std::lock_guard<std::mutex> lock(lifeCycleMutex_);
1796 if (lifeCycleNotifier_) {
1797 std::string identifier;
1798 if (GetMyProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
1799 identifier = GetMyProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
1800 } else {
1801 identifier = GetMyProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
1802 }
1803 auto userId = GetMyProperties().GetStringProp(DBProperties::USER_ID, "");
1804 lifeCycleNotifier_(identifier, userId);
1805 }
1806 return 0;
1807 },
1808 [this]() {
1809 int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() {
1810 RefObject::DecObjRef(this);
1811 });
1812 if (ret != E_OK) {
1813 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
1814 }
1815 },
1816 timerId);
1817 if (errCode != E_OK) {
1818 lifeTimerId_ = 0;
1819 LOGE("SetTimer failed:%d", errCode);
1820 RefObject::DecObjRef(this);
1821 return errCode;
1822 }
1823
1824 lifeCycleNotifier_ = notifier;
1825 lifeTimerId_ = timerId;
1826 return E_OK;
1827 }
1828
ResetLifeCycleTimer() const1829 int SQLiteSingleVerNaturalStore::ResetLifeCycleTimer() const
1830 {
1831 if (lifeTimerId_ == 0) {
1832 return E_OK;
1833 }
1834 auto lifeNotifier = lifeCycleNotifier_;
1835 lifeCycleNotifier_ = nullptr;
1836 int errCode = StopLifeCycleTimer();
1837 if (errCode != E_OK) {
1838 LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
1839 }
1840 return StartLifeCycleTimer(lifeNotifier);
1841 }
1842
StopLifeCycleTimer() const1843 int SQLiteSingleVerNaturalStore::StopLifeCycleTimer() const
1844 {
1845 auto runtimeCxt = RuntimeContext::GetInstance();
1846 if (runtimeCxt == nullptr) {
1847 return -E_INVALID_ARGS;
1848 }
1849 if (lifeTimerId_ != 0) {
1850 TimerId timerId = lifeTimerId_;
1851 lifeTimerId_ = 0;
1852 runtimeCxt->RemoveTimer(timerId, false);
1853 }
1854 return E_OK;
1855 }
1856
IsDataMigrating() const1857 bool SQLiteSingleVerNaturalStore::IsDataMigrating() const
1858 {
1859 if (storageEngine_ == nullptr) {
1860 return false;
1861 }
1862
1863 if (storageEngine_->IsMigrating()) {
1864 LOGD("Migrating now.");
1865 return true;
1866 }
1867 return false;
1868 }
1869
SetConnectionFlag(bool isExisted) const1870 void SQLiteSingleVerNaturalStore::SetConnectionFlag(bool isExisted) const
1871 {
1872 if (storageEngine_ != nullptr) {
1873 storageEngine_->SetConnectionFlag(isExisted);
1874 }
1875 }
1876
TriggerToMigrateData() const1877 int SQLiteSingleVerNaturalStore::TriggerToMigrateData() const
1878 {
1879 RefObject::IncObjRef(this);
1880 int errCode = RuntimeContext::GetInstance()->ScheduleTask(
1881 std::bind(&SQLiteSingleVerNaturalStore::AsyncDataMigration, this));
1882 if (errCode != E_OK) {
1883 RefObject::DecObjRef(this);
1884 LOGE("[SingleVerNStore] Trigger to migrate data failed : %d.", errCode);
1885 }
1886 return errCode;
1887 }
1888
IsCacheDBMode() const1889 bool SQLiteSingleVerNaturalStore::IsCacheDBMode() const
1890 {
1891 if (storageEngine_ == nullptr) {
1892 LOGE("[SingleVerNStore] IsCacheDBMode storage engine is invalid.");
1893 return false;
1894 }
1895 EngineState engineState = storageEngine_->GetEngineState();
1896 return (engineState == CACHEDB);
1897 }
1898
IsExtendedCacheDBMode() const1899 bool SQLiteSingleVerNaturalStore::IsExtendedCacheDBMode() const
1900 {
1901 if (storageEngine_ == nullptr) {
1902 LOGE("[SingleVerNStore] storage engine is invalid.");
1903 return false;
1904 }
1905 EngineState engineState = storageEngine_->GetEngineState();
1906 return (engineState == CACHEDB || engineState == MIGRATING || engineState == ATTACHING);
1907 }
1908
CheckReadDataControlled() const1909 int SQLiteSingleVerNaturalStore::CheckReadDataControlled() const
1910 {
1911 if (IsExtendedCacheDBMode()) {
1912 int err = IsCacheDBMode() ? -E_EKEYREVOKED : -E_BUSY;
1913 LOGE("Existed cache database can not read data, errCode = [%d]!", err);
1914 return err;
1915 }
1916 return E_OK;
1917 }
1918
IncreaseCacheRecordVersion() const1919 void SQLiteSingleVerNaturalStore::IncreaseCacheRecordVersion() const
1920 {
1921 if (storageEngine_ == nullptr) {
1922 LOGE("[SingleVerNStore] Increase cache version storage engine is invalid.");
1923 return;
1924 }
1925 storageEngine_->IncreaseCacheRecordVersion();
1926 }
1927
GetCacheRecordVersion() const1928 uint64_t SQLiteSingleVerNaturalStore::GetCacheRecordVersion() const
1929 {
1930 if (storageEngine_ == nullptr) {
1931 LOGE("[SingleVerNStore] Get cache version storage engine is invalid.");
1932 return 0;
1933 }
1934 return storageEngine_->GetCacheRecordVersion();
1935 }
1936
GetAndIncreaseCacheRecordVersion() const1937 uint64_t SQLiteSingleVerNaturalStore::GetAndIncreaseCacheRecordVersion() const
1938 {
1939 if (storageEngine_ == nullptr) {
1940 LOGE("[SingleVerNStore] Get cache version storage engine is invalid.");
1941 return 0;
1942 }
1943 return storageEngine_->GetAndIncreaseCacheRecordVersion();
1944 }
1945
AsyncDataMigration() const1946 void SQLiteSingleVerNaturalStore::AsyncDataMigration() const
1947 {
1948 // Delay a little time to ensure the completion of the delegate callback
1949 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_DELEGATE_CALLBACK_TIME));
1950 bool isLocked = RuntimeContext::GetInstance()->IsAccessControlled();
1951 if (!isLocked) {
1952 LOGI("Begin to migrate cache data to manDb asynchronously!");
1953 (void)StorageEngineManager::ExecuteMigration(storageEngine_);
1954 }
1955
1956 RefObject::DecObjRef(this);
1957 }
1958
CheckAmendValueContentForSyncProcedure(std::vector<DataItem> & dataItems) const1959 void SQLiteSingleVerNaturalStore::CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const
1960 {
1961 const SchemaObject &schemaObjRef = MyProp().GetSchemaConstRef();
1962 if (!schemaObjRef.IsSchemaValid()) {
1963 // Not a schema database, do not need to check more
1964 return;
1965 }
1966 uint32_t deleteCount = 0;
1967 uint32_t amendCount = 0;
1968 uint32_t neglectCount = 0;
1969 for (auto &eachItem : dataItems) {
1970 if ((eachItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1971 (eachItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1972 // Delete record not concerned
1973 deleteCount++;
1974 continue;
1975 }
1976 bool useAmendValue = false;
1977 int errCode = CheckValueAndAmendIfNeed(ValueSource::FROM_SYNC, eachItem.value, eachItem.value, useAmendValue);
1978 if (errCode != E_OK) {
1979 eachItem.neglect = true;
1980 neglectCount++;
1981 continue;
1982 }
1983 if (useAmendValue) {
1984 amendCount++;
1985 }
1986 }
1987 LOGI("[SqlSinStore][CheckAmendForSync] OriCount=%zu, DeleteCount=%u, AmendCount=%u, NeglectCount=%u",
1988 dataItems.size(), deleteCount, amendCount, neglectCount);
1989 }
1990
SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor * handle,const QueryObject & query,std::vector<DataItem> & dataItems,const DeviceInfo & deviceInfo,Timestamp & maxTimestamp) const1991 int SQLiteSingleVerNaturalStore::SaveSyncItemsInCacheMode(SQLiteSingleVerStorageExecutor *handle,
1992 const QueryObject &query, std::vector<DataItem> &dataItems, const DeviceInfo &deviceInfo,
1993 Timestamp &maxTimestamp) const
1994 {
1995 int errCode = handle->StartTransaction(TransactType::IMMEDIATE);
1996 if (errCode != E_OK) {
1997 return errCode;
1998 }
1999
2000 int innerCode;
2001 const uint64_t recordVersion = GetCacheRecordVersion();
2002 errCode = handle->PrepareForSavingCacheData(SingleVerDataType::SYNC_TYPE);
2003 if (errCode != E_OK) {
2004 goto END;
2005 }
2006
2007 for (auto &item : dataItems) {
2008 errCode = handle->SaveSyncDataItemInCacheMode(item, deviceInfo, maxTimestamp, recordVersion, query);
2009 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
2010 break;
2011 }
2012 }
2013
2014 if (errCode == -E_NOT_FOUND) {
2015 errCode = E_OK;
2016 }
2017
2018 innerCode = handle->ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
2019 if (innerCode != E_OK) {
2020 errCode = innerCode;
2021 }
2022 END:
2023 if (errCode == E_OK) {
2024 storageEngine_->IncreaseCacheRecordVersion(); // use engine wihtin shard lock by handle
2025 errCode = handle->Commit();
2026 } else {
2027 (void)handle->Rollback(); // Keep the error code of the first scene
2028 }
2029 return errCode;
2030 }
2031
NotifyRemotePushFinished(const std::string & targetId) const2032 void SQLiteSingleVerNaturalStore::NotifyRemotePushFinished(const std::string &targetId) const
2033 {
2034 std::string identifier = DBCommon::VectorToHexString(GetIdentifier());
2035 LOGI("label:%s sourceTarget: %s{private} push finished", identifier.c_str(), targetId.c_str());
2036 NotifyRemotePushFinishedInner(targetId);
2037 }
2038
GetDatabaseCreateTimestamp(Timestamp & outTime) const2039 int SQLiteSingleVerNaturalStore::GetDatabaseCreateTimestamp(Timestamp &outTime) const
2040 {
2041 // Found in memory.
2042 {
2043 std::lock_guard<std::mutex> autoLock(createDBTimeMutex_);
2044 if (createDBTime_ != 0) {
2045 outTime = createDBTime_;
2046 return E_OK;
2047 }
2048 }
2049
2050 const Key key(CREATE_DB_TIME.begin(), CREATE_DB_TIME.end());
2051 Value value;
2052 int errCode = GetMetaData(key, value);
2053 if (errCode != E_OK) {
2054 LOGD("GetDatabaseCreateTimestamp failed, errCode = %d.", errCode);
2055 return errCode;
2056 }
2057
2058 Timestamp createDBTime = 0;
2059 Parcel parcel(value.data(), value.size());
2060 (void)parcel.ReadUInt64(createDBTime);
2061 if (parcel.IsError()) {
2062 return -E_INVALID_ARGS;
2063 }
2064 outTime = createDBTime;
2065 std::lock_guard<std::mutex> autoLock(createDBTimeMutex_);
2066 createDBTime_ = createDBTime;
2067 return E_OK;
2068 }
2069
CheckIntegrity() const2070 int SQLiteSingleVerNaturalStore::CheckIntegrity() const
2071 {
2072 int errCode = E_OK;
2073 auto handle = GetHandle(true, errCode);
2074 if (handle == nullptr) {
2075 return errCode;
2076 }
2077
2078 errCode = handle->CheckIntegrity();
2079 ReleaseHandle(handle);
2080 return errCode;
2081 }
2082
SaveCreateDBTime()2083 int SQLiteSingleVerNaturalStore::SaveCreateDBTime()
2084 {
2085 Timestamp createDBTime = GetCurrentTimestamp();
2086 const Key key(CREATE_DB_TIME.begin(), CREATE_DB_TIME.end());
2087 Value value(Parcel::GetUInt64Len());
2088 Parcel parcel(value.data(), Parcel::GetUInt64Len());
2089 (void)parcel.WriteUInt64(createDBTime);
2090 if (parcel.IsError()) {
2091 LOGE("SaveCreateDBTime failed, something wrong in parcel.");
2092 return -E_PARSE_FAIL;
2093 }
2094
2095 int errCode = PutMetaData(key, value);
2096 if (errCode != E_OK) {
2097 LOGE("SaveCreateDBTime failed, errCode = %d", errCode);
2098 return errCode;
2099 }
2100
2101 // save in memory.
2102 std::lock_guard<std::mutex> autoLock(createDBTimeMutex_);
2103 createDBTime_ = createDBTime;
2104 return errCode;
2105 }
2106
SaveCreateDBTimeIfNotExisted()2107 int SQLiteSingleVerNaturalStore::SaveCreateDBTimeIfNotExisted()
2108 {
2109 Timestamp createDBTime = 0;
2110 int errCode = GetDatabaseCreateTimestamp(createDBTime);
2111 if (errCode == -E_NOT_FOUND) {
2112 errCode = SaveCreateDBTime();
2113 }
2114 if (errCode != E_OK) {
2115 LOGE("SaveCreateDBTimeIfNotExisted failed, errCode=%d.", errCode);
2116 }
2117 return errCode;
2118 }
2119
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const2120 int SQLiteSingleVerNaturalStore::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
2121 {
2122 if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
2123 return -E_INVALID_ARGS;
2124 }
2125
2126 int errCode = E_OK;
2127 auto handle = GetHandle(true, errCode);
2128 if (handle == nullptr) {
2129 return errCode;
2130 }
2131
2132 errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
2133 if (errCode != E_OK) {
2134 LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
2135 }
2136
2137 ReleaseHandle(handle);
2138 HeartBeatForLifeCycle();
2139 return errCode;
2140 }
2141
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const2142 int SQLiteSingleVerNaturalStore::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
2143 {
2144 needCompressOnSync = GetDbProperties().GetBoolProp(KvDBProperties::COMPRESS_ON_SYNC, false);
2145 compressionRate = GetDbProperties().GetIntProp(KvDBProperties::COMPRESSION_RATE,
2146 DBConstant::DEFAULT_COMPTRESS_RATE);
2147 return E_OK;
2148 }
2149
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const2150 int SQLiteSingleVerNaturalStore::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
2151 {
2152 algorithmSet.clear();
2153 DataCompression::GetCompressionAlgo(algorithmSet);
2154 return E_OK;
2155 }
2156
CheckAndInitQueryCondition(QueryObject & query) const2157 int SQLiteSingleVerNaturalStore::CheckAndInitQueryCondition(QueryObject &query) const
2158 {
2159 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
2160 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
2161 // Flatbuffer schema is not support subscribe
2162 return -E_NOT_SUPPORT;
2163 }
2164 query.SetSchema(localSchema);
2165
2166 int errCode = E_OK;
2167 SQLiteSingleVerStorageExecutor *handle = GetHandle(false, errCode);
2168 if (handle == nullptr) {
2169 return errCode;
2170 }
2171
2172 errCode = handle->CheckQueryObjectLegal(query);
2173 if (errCode != E_OK) {
2174 LOGE("Check query condition failed [%d]!", errCode);
2175 }
2176 ReleaseHandle(handle);
2177 return errCode;
2178 }
2179
SetDataInterceptor(const PushDataInterceptor & interceptor)2180 void SQLiteSingleVerNaturalStore::SetDataInterceptor(const PushDataInterceptor &interceptor)
2181 {
2182 std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_);
2183 dataInterceptor_ = interceptor;
2184 }
2185
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID) const2186 int SQLiteSingleVerNaturalStore::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
2187 const std::string &targetID) const
2188 {
2189 PushDataInterceptor interceptor = nullptr;
2190 {
2191 std::shared_lock<std::shared_mutex> lock(dataInterceptorMutex_);
2192 if (dataInterceptor_ == nullptr) {
2193 return E_OK;
2194 }
2195 interceptor = dataInterceptor_;
2196 }
2197
2198 InterceptedDataImpl data(entries, [this](const Value &newValue) -> int {
2199 bool useAmendValue = false;
2200 Value amendValue = newValue;
2201 return this->CheckValueAndAmendIfNeed(ValueSource::FROM_LOCAL, newValue, amendValue, useAmendValue);
2202 }
2203 );
2204
2205 int errCode = interceptor(data, sourceID, targetID);
2206 if (data.IsError()) {
2207 SingleVerKvEntry::Release(entries);
2208 LOGE("Intercept data failed:%d.", errCode);
2209 return -E_INTERCEPT_DATA_FAIL;
2210 }
2211 return E_OK;
2212 }
2213
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)2214 int SQLiteSingleVerNaturalStore::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
2215 bool needCacheSubscribe)
2216 {
2217 const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
2218 if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
2219 // Flatbuffer schema is not support subscribe
2220 return -E_NOT_SUPPORT;
2221 }
2222 QueryObject queryInner = query;
2223 queryInner.SetSchema(localSchema);
2224 if (IsExtendedCacheDBMode() && needCacheSubscribe) { // cache auto subscribe when engine state is in CACHEDB mode
2225 LOGI("Cache subscribe query and return ok when in cacheDB.");
2226 storageEngine_->CacheSubscribe(subscribeId, queryInner);
2227 return E_OK;
2228 }
2229
2230 int errCode = E_OK;
2231 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
2232 if (handle == nullptr) {
2233 return errCode;
2234 }
2235
2236 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2237 if (errCode != E_OK) {
2238 ReleaseHandle(handle);
2239 return errCode;
2240 }
2241
2242 errCode = handle->AddSubscribeTrigger(queryInner, subscribeId);
2243 if (errCode != E_OK) {
2244 LOGE("Add subscribe trigger failed: %d", errCode);
2245 (void)handle->Rollback();
2246 } else {
2247 errCode = handle->Commit();
2248 }
2249 ReleaseHandle(handle);
2250 return errCode;
2251 }
2252
RemoveSubscribe(const std::vector<std::string> & subscribeIds)2253 int SQLiteSingleVerNaturalStore::RemoveSubscribe(const std::vector<std::string> &subscribeIds)
2254 {
2255 int errCode = E_OK;
2256 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
2257 if (handle == nullptr) {
2258 return errCode;
2259 }
2260
2261 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2262 if (errCode != E_OK) {
2263 ReleaseHandle(handle);
2264 return errCode;
2265 }
2266 errCode = handle->RemoveSubscribeTrigger(subscribeIds);
2267 if (errCode != E_OK) {
2268 LOGE("Remove subscribe trigger failed: %d", errCode);
2269 goto ERR;
2270 }
2271 errCode = handle->RemoveSubscribeTriggerWaterMark(subscribeIds);
2272 if (errCode != E_OK) {
2273 LOGE("Remove subscribe data water mark failed: %d", errCode);
2274 }
2275 ERR:
2276 if (errCode == E_OK) {
2277 errCode = handle->Commit();
2278 } else {
2279 (void)handle->Rollback();
2280 }
2281 ReleaseHandle(handle);
2282 return errCode;
2283 }
2284
RemoveSubscribe(const std::string & subscribeId)2285 int SQLiteSingleVerNaturalStore::RemoveSubscribe(const std::string &subscribeId)
2286 {
2287 return RemoveSubscribe(std::vector<std::string> {subscribeId});
2288 }
2289
SetMaxLogSize(uint64_t limit)2290 int SQLiteSingleVerNaturalStore::SetMaxLogSize(uint64_t limit)
2291 {
2292 LOGI("Set the max log size to %" PRIu64, limit);
2293 maxLogSize_.store(limit);
2294 return E_OK;
2295 }
GetMaxLogSize() const2296 uint64_t SQLiteSingleVerNaturalStore::GetMaxLogSize() const
2297 {
2298 return maxLogSize_.load();
2299 }
2300
RemoveAllSubscribe()2301 int SQLiteSingleVerNaturalStore::RemoveAllSubscribe()
2302 {
2303 int errCode = E_OK;
2304 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
2305 if (handle == nullptr) {
2306 return errCode;
2307 }
2308 std::vector<std::string> triggers;
2309 errCode = handle->GetTriggers(DBConstant::SUBSCRIBE_QUERY_PREFIX, triggers);
2310 if (errCode != E_OK) {
2311 LOGE("Get all subscribe triggers failed. %d", errCode);
2312 ReleaseHandle(handle);
2313 return errCode;
2314 }
2315
2316 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
2317 if (errCode != E_OK) {
2318 ReleaseHandle(handle);
2319 return errCode;
2320 }
2321
2322 Key prefixKey;
2323 errCode = handle->RemoveTrigger(triggers);
2324 if (errCode != E_OK) {
2325 LOGE("remove all subscribe triggers failed. %d", errCode);
2326 goto END;
2327 }
2328
2329 DBCommon::StringToVector(DBConstant::SUBSCRIBE_QUERY_PREFIX, prefixKey);
2330 errCode = handle->DeleteMetaDataByPrefixKey(prefixKey);
2331 if (errCode != E_OK) {
2332 LOGE("remove all subscribe water mark failed. %d", errCode);
2333 }
2334 END:
2335 if (errCode == E_OK) {
2336 errCode = handle->Commit();
2337 } else {
2338 (void)handle->Rollback();
2339 }
2340 ReleaseHandle(handle);
2341 return errCode;
2342 }
2343
Dump(int fd)2344 void SQLiteSingleVerNaturalStore::Dump(int fd)
2345 {
2346 std::string userId = MyProp().GetStringProp(DBProperties::USER_ID, "");
2347 std::string appId = MyProp().GetStringProp(DBProperties::APP_ID, "");
2348 std::string storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
2349 std::string label = MyProp().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
2350 label = DBCommon::TransferStringToHex(label);
2351 DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n",
2352 userId.c_str(), appId.c_str(), storeId.c_str(), label.c_str());
2353 SyncAbleKvDB::Dump(fd);
2354 }
2355
RemoveDeviceDataInner(const std::string & hashDev,bool isNeedNotify,bool isInSync)2356 int SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, bool isNeedNotify, bool isInSync)
2357 {
2358 int errCode = E_OK;
2359 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
2360 if (handle == nullptr) {
2361 LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode);
2362 return errCode;
2363 }
2364 uint64_t logFileSize = handle->GetLogFileSize();
2365 ReleaseHandle(handle);
2366 if (logFileSize > GetMaxLogSize()) {
2367 LOGW("[SingleVerNStore] RmDevData log size[%" PRIu64 "] over the limit", logFileSize);
2368 return -E_LOG_OVER_LIMITS;
2369 }
2370
2371 std::set<std::string> removeDevices;
2372 if (hashDev.empty()) {
2373 errCode = GetExistsDeviceList(removeDevices);
2374 if (errCode != E_OK) {
2375 LOGE("[SingleVerNStore] get remove device list failed:%d", errCode);
2376 return errCode;
2377 }
2378 } else {
2379 removeDevices.insert(hashDev);
2380 }
2381
2382 LOGD("[SingleVerNStore] remove device data, size=%zu", removeDevices.size());
2383 for (const auto &iterDevice : removeDevices) {
2384 // Call the syncer module to erase the water mark.
2385 errCode = EraseDeviceWaterMark(iterDevice, false);
2386 if (errCode != E_OK) {
2387 LOGE("[SingleVerNStore] erase water mark failed:%d", errCode);
2388 return errCode;
2389 }
2390 }
2391
2392 if (IsExtendedCacheDBMode()) {
2393 errCode = RemoveDeviceDataInCacheMode(hashDev, isNeedNotify);
2394 } else {
2395 errCode = RemoveDeviceDataNormally(hashDev, isNeedNotify);
2396 }
2397 if (errCode != E_OK) {
2398 LOGE("[SingleVerNStore] RemoveDeviceData failed:%d", errCode);
2399 }
2400
2401 return errCode;
2402 }
2403
GetExistsDeviceList(std::set<std::string> & devices) const2404 int SQLiteSingleVerNaturalStore::GetExistsDeviceList(std::set<std::string> &devices) const
2405 {
2406 int errCode = E_OK;
2407 SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
2408 if (handle == nullptr) {
2409 LOGE("[SingleVerNStore] GetExistsDeviceList get handle failed:%d", errCode);
2410 return errCode;
2411 }
2412 errCode = handle->GetExistsDevicesFromMeta(devices);
2413 if (errCode != E_OK) {
2414 LOGE("[SingleVerNStore] Get remove device list from meta failed. err=%d", errCode);
2415 }
2416 ReleaseHandle(handle);
2417 return errCode;
2418 }
2419 DEFINE_OBJECT_TAG_FACILITIES(SQLiteSingleVerNaturalStore)
2420 }
2421