1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "cloud/cloud_store_types.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "log_print.h"
25 #include "log_table_manager_factory.h"
26 #include "parcel.h"
27 #include "platform_specific.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_single_ver_storage_executor_sql.h"
31
32 namespace DistributedDB {
33 namespace {
34
ResetOrRegetStmt(sqlite3 * db,sqlite3_stmt * & stmt,const std::string & sql)35 int ResetOrRegetStmt(sqlite3 *db, sqlite3_stmt *&stmt, const std::string &sql)
36 {
37 int errCode = E_OK;
38 SQLiteUtils::ResetStatement(stmt, false, errCode);
39 if (errCode != E_OK) {
40 LOGE("[ResetOrRegetStmt] reset stmt failed:%d.", errCode);
41 // Finish current statement and remade one
42 SQLiteUtils::ResetStatement(stmt, true, errCode);
43 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
44 if (errCode != E_OK) {
45 LOGE("[ResetOrRegetStmt] reget failed:%d.", errCode);
46 }
47 }
48 return errCode;
49 }
50
GetEntryFromStatement(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries)51 int GetEntryFromStatement(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries)
52 {
53 Entry entry;
54 int errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key);
55 if (errCode != E_OK) {
56 return errCode;
57 }
58 if (isGetValue) {
59 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value);
60 if (errCode != E_OK) {
61 return errCode;
62 }
63 }
64
65 entries.push_back(std::move(entry));
66 return errCode;
67 }
68 }
69
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb)70 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb)
71 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
72 getSyncStatement_(nullptr),
73 getResultRowIdStatement_(nullptr),
74 getResultEntryStatement_(nullptr),
75 isTransactionOpen_(false),
76 attachMetaMode_(false),
77 executorState_(ExecutorState::INVALID),
78 maxTimestampInMainDB_(0),
79 migrateTimeOffset_(0),
80 isSyncMigrating_(false),
81 conflictResolvePolicy_(DEFAULT_LAST_WIN)
82 {}
83
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb,ExecutorState executorState)84 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb,
85 ExecutorState executorState)
86 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
87 getSyncStatement_(nullptr),
88 getResultRowIdStatement_(nullptr),
89 getResultEntryStatement_(nullptr),
90 isTransactionOpen_(false),
91 attachMetaMode_(false),
92 executorState_(executorState),
93 maxTimestampInMainDB_(0),
94 migrateTimeOffset_(0),
95 isSyncMigrating_(false),
96 conflictResolvePolicy_(DEFAULT_LAST_WIN)
97 {}
98
~SQLiteSingleVerStorageExecutor()99 SQLiteSingleVerStorageExecutor::~SQLiteSingleVerStorageExecutor()
100 {
101 if (isTransactionOpen_) {
102 Rollback();
103 }
104 FinalizeAllStatements();
105 }
106
GetKvData(SingleVerDataType type,const Key & key,Value & value,Timestamp & timestamp) const107 int SQLiteSingleVerStorageExecutor::GetKvData(SingleVerDataType type, const Key &key, Value &value,
108 Timestamp ×tamp) const
109 {
110 std::string sql;
111 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
112 sql = SELECT_LOCAL_VALUE_TIMESTAMP_SQL;
113 } else if (type == SingleVerDataType::SYNC_TYPE) {
114 sql = SELECT_SYNC_VALUE_WTIMESTAMP_SQL;
115 } else if (type == SingleVerDataType::META_TYPE) {
116 if (attachMetaMode_) {
117 sql = SELECT_ATTACH_META_VALUE_SQL;
118 } else {
119 sql = SELECT_META_VALUE_SQL;
120 }
121 } else {
122 return -E_INVALID_ARGS;
123 }
124
125 sqlite3_stmt *statement = nullptr;
126 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
127 if (errCode != E_OK) {
128 goto END;
129 }
130
131 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
132 if (errCode != E_OK) {
133 goto END;
134 }
135
136 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
137 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
138 errCode = -E_NOT_FOUND;
139 goto END;
140 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
141 goto END;
142 }
143
144 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
145
146 // get timestamp
147 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
148 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_LOCAL_TIME_INDEX));
149 } else if (type == SingleVerDataType::SYNC_TYPE) {
150 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_SYNC_TIME_INDEX));
151 }
152
153 END:
154 SQLiteUtils::ResetStatement(statement, true, errCode);
155 return CheckCorruptedStatus(errCode);
156 }
157
BindPutKvData(sqlite3_stmt * statement,const Key & key,const Value & value,Timestamp timestamp,SingleVerDataType type)158 int SQLiteSingleVerStorageExecutor::BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value,
159 Timestamp timestamp, SingleVerDataType type)
160 {
161 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, key, false);
162 if (errCode != E_OK) {
163 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
164 return errCode;
165 }
166
167 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_VAL_INDEX, value, true);
168 if (errCode != E_OK) {
169 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
170 return errCode;
171 }
172
173 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
174 Key hashKey;
175 errCode = DBCommon::CalcValueHash(key, hashKey);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179
180 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_LOCAL_HASH_KEY_INDEX, hashKey, false);
181 if (errCode != E_OK) {
182 LOGE("[SingleVerExe][BindPutKv]Bind hash key error:%d", errCode);
183 return errCode;
184 }
185
186 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_LOCAL_TIMESTAMP_INDEX, timestamp);
187 if (errCode != E_OK) {
188 LOGE("[SingleVerExe][BindPutKv]Bind timestamp error:%d", errCode);
189 return errCode;
190 }
191 }
192 return E_OK;
193 }
194
GetKvDataByHashKey(const Key & hashKey,SingleVerRecord & result) const195 int SQLiteSingleVerStorageExecutor::GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const
196 {
197 sqlite3_stmt *statement = nullptr;
198 std::vector<uint8_t> devVect;
199 std::vector<uint8_t> origDevVect;
200 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_HASH_SQL, statement);
201 if (errCode != E_OK) {
202 goto END;
203 }
204
205 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // bind the first arg hashkey.
206 if (errCode != E_OK) {
207 goto END;
208 }
209
210 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
211 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
212 result.hashKey = hashKey;
213 result.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
214 result.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
215 result.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
216 // get key
217 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, result.key);
218 if (errCode != E_OK) {
219 goto END;
220 }
221 // get value
222 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, result.value);
223 if (errCode != E_OK) {
224 goto END;
225 }
226 // get device
227 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
228 if (errCode != E_OK) {
229 goto END;
230 }
231 result.device = std::string(devVect.begin(), devVect.end());
232 // get original device
233 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
234 if (errCode != E_OK) {
235 goto END;
236 }
237 result.origDevice = std::string(origDevVect.begin(), origDevVect.end());
238 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
239 errCode = -E_NOT_FOUND;
240 goto END;
241 }
242
243 END:
244 SQLiteUtils::ResetStatement(statement, true, errCode);
245 return CheckCorruptedStatus(errCode);
246 }
247
SaveKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp)248 int SQLiteSingleVerStorageExecutor::SaveKvData(SingleVerDataType type, const Key &key, const Value &value,
249 Timestamp timestamp)
250 {
251 sqlite3_stmt *statement = nullptr;
252 std::string sql;
253 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
254 sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ? INSERT_LOCAL_SQL_FROM_CACHEHANDLE :
255 INSERT_LOCAL_SQL);
256 } else {
257 sql = (attachMetaMode_ ? INSERT_ATTACH_META_SQL : INSERT_META_SQL);
258 }
259 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
260 if (errCode != E_OK) {
261 goto ERROR;
262 }
263
264 errCode = BindPutKvData(statement, key, value, timestamp, type);
265 if (errCode != E_OK) {
266 goto ERROR;
267 }
268
269 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
270 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
271 errCode = E_OK;
272 }
273
274 ERROR:
275 SQLiteUtils::ResetStatement(statement, true, errCode);
276 return CheckCorruptedStatus(errCode);
277 }
278
PutKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp,SingleVerNaturalStoreCommitNotifyData * committedData)279 int SQLiteSingleVerStorageExecutor::PutKvData(SingleVerDataType type, const Key &key, const Value &value,
280 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData)
281 {
282 if (type != SingleVerDataType::LOCAL_TYPE_SQLITE && type != SingleVerDataType::META_TYPE) {
283 return -E_INVALID_ARGS;
284 }
285 // committedData is only for local data, not for meta data.
286 bool isLocal = (SingleVerDataType::LOCAL_TYPE_SQLITE == type);
287 Timestamp localTimestamp = 0;
288 Value readValue;
289 bool isExisted = CheckIfKeyExisted(key, isLocal, readValue, localTimestamp);
290 if (isLocal && committedData != nullptr) {
291 ExistStatus existedStatus = isExisted ? ExistStatus::EXIST : ExistStatus::NONE;
292 Key hashKey;
293 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
294 if (innerErrCode != E_OK) {
295 return innerErrCode;
296 }
297 committedData->InitKeyPropRecord(hashKey, existedStatus);
298 }
299 int errCode = SaveKvData(type, key, value, timestamp);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303
304 if (isLocal && committedData != nullptr) {
305 Entry entry = {key, value};
306 committedData->InsertCommittedData(std::move(entry), isExisted ? DataType::UPDATE : DataType::INSERT, true);
307 }
308 return E_OK;
309 }
310
GetEntries(bool isGetValue,SingleVerDataType type,const Key & keyPrefix,std::vector<Entry> & entries) const311 int SQLiteSingleVerStorageExecutor::GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix,
312 std::vector<Entry> &entries) const
313 {
314 if ((type != SingleVerDataType::LOCAL_TYPE_SQLITE) && (type != SingleVerDataType::SYNC_TYPE)) {
315 return -E_INVALID_ARGS;
316 }
317
318 std::string sql;
319 if (type == SingleVerDataType::SYNC_TYPE) {
320 sql = isGetValue ? SELECT_SYNC_PREFIX_SQL : SELECT_SYNC_KEY_PREFIX_SQL;
321 } else {
322 sql = SELECT_LOCAL_PREFIX_SQL;
323 }
324 sqlite3_stmt *statement = nullptr;
325 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
326 if (errCode != E_OK) {
327 goto END;
328 }
329
330 // bind the prefix key for the first and second args.
331 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // first argument is key
332 if (errCode != E_OK) {
333 goto END;
334 }
335
336 errCode = StepForResultEntries(isGetValue, statement, entries);
337
338 END:
339 SQLiteUtils::ResetStatement(statement, true, errCode);
340 return CheckCorruptedStatus(errCode);
341 }
342
GetEntries(QueryObject & queryObj,std::vector<Entry> & entries) const343 int SQLiteSingleVerStorageExecutor::GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const
344 {
345 int errCode = E_OK;
346 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
347 if (errCode != E_OK) {
348 return errCode;
349 }
350
351 sqlite3_stmt *statement = nullptr;
352 errCode = helper.GetQuerySqlStatement(dbHandle_, false, statement);
353 if (errCode == E_OK) {
354 errCode = StepForResultEntries(true, statement, entries);
355 }
356
357 SQLiteUtils::ResetStatement(statement, true, errCode);
358 return CheckCorruptedStatus(errCode);
359 }
360
GetCount(QueryObject & queryObj,int & count) const361 int SQLiteSingleVerStorageExecutor::GetCount(QueryObject &queryObj, int &count) const
362 {
363 if (dbHandle_ == nullptr) {
364 return -E_INVALID_DB;
365 }
366
367 int errCode = E_OK;
368 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
369 if (errCode != E_OK) {
370 return errCode;
371 }
372
373 if (!queryObj.IsCountValid()) {
374 LOGE("GetCount no need limit or orderby");
375 return -E_INVALID_QUERY_FORMAT;
376 }
377
378 std::string countSql;
379 errCode = helper.GetCountQuerySql(countSql);
380 if (errCode != E_OK) {
381 return errCode;
382 }
383
384 sqlite3_stmt *countStatement = nullptr;
385 // get statement for count
386 errCode = helper.GetQuerySqlStatement(dbHandle_, countSql, countStatement);
387 if (errCode != E_OK) {
388 LOGE("Get count bind statement error:%d", errCode);
389 goto END;
390 }
391 // get count value
392 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
393 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
394 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
395 if (readCount > INT32_MAX) {
396 LOGW("total count is beyond the max count");
397 count = 0;
398 errCode = -E_UNEXPECTED_DATA;
399 } else {
400 count = static_cast<int>(readCount);
401 errCode = E_OK;
402 }
403 LOGD("Entry count in this result set is %d", count);
404 } else {
405 errCode = -E_UNEXPECTED_DATA;
406 }
407
408 END:
409 SQLiteUtils::ResetStatement(countStatement, true, errCode);
410 return CheckCorruptedStatus(errCode);
411 }
412
InitCurrentMaxStamp(Timestamp & maxStamp)413 void SQLiteSingleVerStorageExecutor::InitCurrentMaxStamp(Timestamp &maxStamp)
414 {
415 if (dbHandle_ == nullptr) {
416 return;
417 }
418 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
419 SELECT_MAX_TIMESTAMP_SQL_FROM_CACHEHANDLE : SELECT_MAX_TIMESTAMP_SQL);
420 sqlite3_stmt *statement = nullptr;
421 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
422 if (errCode != E_OK) {
423 return;
424 }
425
426 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
427 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
428 maxStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
429 }
430 SQLiteUtils::ResetStatement(statement, true, errCode);
431 }
432
PrepareForSyncDataByTime(Timestamp begin,Timestamp end,sqlite3_stmt * & statement,bool getDeletedData) const433 int SQLiteSingleVerStorageExecutor::PrepareForSyncDataByTime(Timestamp begin, Timestamp end,
434 sqlite3_stmt *&statement, bool getDeletedData) const
435 {
436 if (dbHandle_ == nullptr) {
437 return -E_INVALID_DB;
438 }
439
440 const std::string sql = (getDeletedData ? SELECT_SYNC_DELETED_ENTRIES_SQL : SELECT_SYNC_ENTRIES_SQL);
441 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
442 if (errCode != E_OK) {
443 LOGE("Prepare the sync entries statement error:%d", errCode);
444 return errCode;
445 }
446
447 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_BEGIN_STAMP_INDEX, begin);
448 if (errCode != E_OK) {
449 goto ERROR;
450 }
451
452 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_END_STAMP_INDEX, end);
453
454 ERROR:
455 if (errCode != E_OK) {
456 LOGE("Bind the timestamp for getting sync data error:%d", errCode);
457 SQLiteUtils::ResetStatement(statement, true, errCode);
458 }
459
460 return CheckCorruptedStatus(errCode);
461 }
462
ReleaseContinueStatement()463 void SQLiteSingleVerStorageExecutor::ReleaseContinueStatement()
464 {
465 if (getSyncStatement_ != nullptr) {
466 int errCode = E_OK;
467 SQLiteUtils::ResetStatement(getSyncStatement_, true, errCode);
468 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
469 SetCorruptedStatus();
470 }
471 }
472 }
473
474 namespace {
GetDataItemForSync(sqlite3_stmt * statement,DataItem & dataItem)475 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem)
476 {
477 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
478 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
479 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
480 dataItem.flag &= (~DataItem::LOCAL_FLAG);
481 std::vector<uint8_t> devVect;
482 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
483 if (errCode != E_OK) {
484 return errCode;
485 }
486 dataItem.origDev = std::string(devVect.begin(), devVect.end());
487 int keyIndex = SYNC_RES_KEY_INDEX;
488 // If the data has been deleted, just use the hash key for sync.
489 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
490 keyIndex = SYNC_RES_HASH_KEY_INDEX;
491 }
492 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, dataItem.key);
493 if (errCode != E_OK) {
494 return errCode;
495 }
496 return SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
497 }
498 }
499
GetSyncDataItems(std::vector<DataItem> & dataItems,sqlite3_stmt * statement,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo) const500 int SQLiteSingleVerStorageExecutor::GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
501 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const
502 {
503 int errCode;
504 size_t dataTotalSize = 0;
505 do {
506 DataItem dataItem;
507 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
508 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
509 errCode = GetDataItemForSync(statement, dataItem);
510 if (errCode != E_OK) {
511 LOGE("GetDataItemForSync failed:%d", errCode);
512 return errCode;
513 }
514 } else {
515 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
516 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
517 errCode = -E_FINISHED;
518 } else {
519 LOGE("Get sync data error:%d", errCode);
520 }
521 break;
522 }
523
524 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
525 dataTotalSize += GetDataItemSerialSize(dataItem, appendLength);
526 if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) ||
527 dataItems.size() >= dataSizeInfo.packetSize) {
528 errCode = -E_UNFINISHED;
529 break;
530 } else {
531 dataItems.push_back(std::move(dataItem));
532 }
533 } while (true);
534 return errCode;
535 }
536
GetSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const537 int SQLiteSingleVerStorageExecutor::GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
538 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
539 {
540 sqlite3_stmt *statement = nullptr;
541 int errCode = PrepareForSyncDataByTime(begin, end, statement);
542 if (errCode != E_OK) {
543 return errCode;
544 }
545
546 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
547 SQLiteUtils::ResetStatement(statement, true, errCode);
548 return CheckCorruptedStatus(errCode);
549 }
550
GetDeletedSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const551 int SQLiteSingleVerStorageExecutor::GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
552 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
553 {
554 sqlite3_stmt *statement = nullptr;
555 int errCode = PrepareForSyncDataByTime(begin, end, statement, true);
556 if (errCode != E_OK) {
557 return errCode;
558 }
559
560 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
561 SQLiteUtils::ResetStatement(statement, true, errCode);
562 return CheckCorruptedStatus(errCode);
563 }
564
565 namespace {
AppendDataItem(std::vector<DataItem> & dataItems,const DataItem & item,size_t & dataTotalSize,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo)566 int AppendDataItem(std::vector<DataItem> &dataItems, const DataItem &item, size_t &dataTotalSize, size_t appendLength,
567 const DataSizeSpecInfo &dataSizeInfo)
568 {
569 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
570 size_t appendSize = dataTotalSize + SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength);
571 if ((appendSize > dataSizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= dataSizeInfo.packetSize) {
572 return -E_UNFINISHED;
573 }
574 dataItems.push_back(item);
575 dataTotalSize = appendSize;
576 return E_OK;
577 }
578
GetFullDataStatement(sqlite3 * db,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)579 int GetFullDataStatement(sqlite3 *db, const std::pair<Timestamp, Timestamp> &timeRange, sqlite3_stmt *&stmt)
580 {
581 int errCode = SQLiteUtils::GetStatement(db, SELECT_SYNC_MODIFY_SQL, stmt);
582 if (errCode != E_OK) {
583 LOGE("Get statement failed. %d", errCode);
584 return errCode;
585 }
586 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange.first); // 1 : Bind time rang index start
587 if (errCode != E_OK) {
588 LOGE("Bind time range to statement failed. %d", errCode);
589 goto ERR;
590 }
591 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange.second); // 2 : Bind time rang index end
592 if (errCode != E_OK) {
593 LOGE("Bind time range to statement failed. %d", errCode);
594 goto ERR;
595 }
596 return E_OK; // do not release statement when success
597 ERR:
598 SQLiteUtils::ResetStatement(stmt, true, errCode);
599 return errCode;
600 }
601
GetQueryDataStatement(sqlite3 * db,QueryObject query,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)602 int GetQueryDataStatement(sqlite3 *db, QueryObject query, const std::pair<Timestamp, Timestamp> &timeRange,
603 sqlite3_stmt *&stmt)
604 {
605 int errCode = E_OK;
606 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
607 if (errCode != E_OK) {
608 return errCode;
609 }
610 return helper.GetQuerySyncStatement(db, timeRange.first, timeRange.second, stmt);
611 }
612
GetNextDataItem(sqlite3_stmt * stmt,bool isMemDB,DataItem & item)613 int GetNextDataItem(sqlite3_stmt *stmt, bool isMemDB, DataItem &item)
614 {
615 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
616 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
617 errCode = GetDataItemForSync(stmt, item);
618 }
619 return errCode;
620 }
621 }
622
GetSyncDataWithQuery(const QueryObject & query,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,const std::pair<Timestamp,Timestamp> & timeRange,std::vector<DataItem> & dataItems) const623 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(const QueryObject &query, size_t appendLength,
624 const DataSizeSpecInfo &dataSizeInfo, const std::pair<Timestamp, Timestamp> &timeRange,
625 std::vector<DataItem> &dataItems) const
626 {
627 sqlite3_stmt *fullStmt = nullptr; // statement for get all modified data in the time range
628 sqlite3_stmt *queryStmt = nullptr; // statement for get modified data which is matched query in the time range
629 int errCode = GetQueryDataStatement(dbHandle_, query, timeRange, queryStmt);
630 if (errCode != E_OK) {
631 LOGE("Get query matched data statement failed. %d", errCode);
632 goto END;
633 }
634 if (query.IsQueryOnlyByKey()) {
635 // Query sync by prefixKey only should not deal with REMOTE_DEVICE_DATA_MISS_QUERY. Get the data directly.
636 errCode = GetSyncDataItems(dataItems, queryStmt, appendLength, dataSizeInfo);
637 goto END;
638 }
639 errCode = GetFullDataStatement(dbHandle_, timeRange, fullStmt);
640 if (errCode != E_OK) {
641 LOGE("Get full changed data statement failed. %d", errCode);
642 goto END;
643 }
644 errCode = GetSyncDataWithQuery(fullStmt, queryStmt, appendLength, dataSizeInfo, dataItems);
645 if (errCode != E_OK && errCode != -E_UNFINISHED && errCode != -E_FINISHED) {
646 LOGE("Get sync data with query failed. %d", errCode);
647 }
648 END:
649 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
650 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
651 return CheckCorruptedStatus(errCode);
652 }
653
GetSyncDataWithQuery(sqlite3_stmt * fullStmt,sqlite3_stmt * queryStmt,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,std::vector<DataItem> & dataItems) const654 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
655 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const
656 {
657 int errCode = E_OK;
658 size_t dataTotalSize = 0;
659 DataItem fullItem;
660 DataItem matchItem;
661 bool isFullItemFinished = false;
662 bool isMatchItemFinished = false;
663 while (!isFullItemFinished || !isMatchItemFinished) {
664 errCode = GetNextDataItem(queryStmt, isMemDb_, matchItem);
665 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // query finished
666 isMatchItemFinished = true;
667 } else if (errCode != E_OK) { // step failed or get data failed
668 LOGE("Get next query matched data failed. %d", errCode);
669 return errCode;
670 }
671 while (!isFullItemFinished) {
672 errCode = GetNextDataItem(fullStmt, isMemDb_, fullItem);
673 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // queryStmt is a subset of fullStmt
674 isFullItemFinished = true;
675 break;
676 } else if (errCode != E_OK) { // step failed or get data failed
677 LOGE("Get next changed data failed. %d", errCode);
678 return errCode;
679 }
680 bool matchData = true;
681 if (isMatchItemFinished || matchItem.key != fullItem.key) {
682 matchData = false; // got miss query data
683 DBCommon::CalcValueHash(fullItem.key, fullItem.key); // set and send key with hash_key
684 Value().swap(fullItem.value); // not send value when data miss query
685 fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; // mark with miss query flag
686 }
687 errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo);
688 if (errCode == -E_UNFINISHED) {
689 goto END;
690 }
691 if (matchData) {
692 break; // step to next match data
693 }
694 }
695 }
696 END:
697 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
698 return (isFullItemFinished && isMatchItemFinished) ? -E_FINISHED : errCode;
699 }
700
OpenResultSet(const Key & keyPrefix,int & count)701 int SQLiteSingleVerStorageExecutor::OpenResultSet(const Key &keyPrefix, int &count)
702 {
703 sqlite3_stmt *countStatement = nullptr;
704 if (InitResultSet(keyPrefix, countStatement) != E_OK) {
705 LOGE("Initialize result set stat failed.");
706 return -E_INVALID_DB;
707 }
708
709 int errCode = StartTransaction(TransactType::DEFERRED);
710 if (errCode != E_OK) {
711 goto END;
712 }
713
714 // get count value
715 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
716 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
717 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
718 if (readCount > INT32_MAX) {
719 LOGW("total count is beyond the max count");
720 count = 0;
721 errCode = -E_UNEXPECTED_DATA;
722 } else {
723 count = static_cast<int>(readCount);
724 errCode = E_OK;
725 }
726 LOGD("Entry count in this result set is %d", count);
727 } else {
728 errCode = -E_UNEXPECTED_DATA;
729 }
730
731 END:
732 SQLiteUtils::ResetStatement(countStatement, true, errCode);
733 if (errCode != E_OK) {
734 CloseResultSet();
735 }
736 return CheckCorruptedStatus(errCode);
737 }
738
OpenResultSet(QueryObject & queryObj,int & count)739 int SQLiteSingleVerStorageExecutor::OpenResultSet(QueryObject &queryObj, int &count)
740 {
741 sqlite3_stmt *countStatement = nullptr;
742 int errCode = InitResultSet(queryObj, countStatement);
743 if (errCode != E_OK) {
744 LOGE("Initialize result set stat failed.");
745 return errCode;
746 }
747
748 errCode = StartTransaction(TransactType::DEFERRED);
749 if (errCode != E_OK) {
750 goto END;
751 }
752
753 // get count value
754 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
755 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
756 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
757 if (queryObj.HasLimit()) {
758 int limit = 0;
759 int offset = 0;
760 queryObj.GetLimitVal(limit, offset);
761 offset = (offset < 0) ? 0 : offset;
762 limit = (limit < 0) ? 0 : limit;
763 if (readCount <= static_cast<uint64_t>(offset)) {
764 readCount = 0;
765 } else {
766 readCount = std::min(readCount - offset, static_cast<uint64_t>(limit));
767 }
768 }
769
770 if (readCount > INT32_MAX) {
771 LOGW("total count is beyond the max count");
772 count = 0;
773 errCode = -E_UNEXPECTED_DATA;
774 } else {
775 count = static_cast<int>(readCount);
776 errCode = E_OK;
777 }
778 LOGD("Entry count in this result set is %d", count);
779 } else {
780 errCode = -E_UNEXPECTED_DATA;
781 }
782
783 END:
784 SQLiteUtils::ResetStatement(countStatement, true, errCode);
785 if (errCode != E_OK) {
786 CloseResultSet();
787 }
788 return CheckCorruptedStatus(errCode);
789 }
790
OpenResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)791 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(const Key &keyPrefix,
792 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
793 {
794 if (dbHandle_ == nullptr) {
795 return -E_INVALID_DB;
796 }
797 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
798 if (errCode != E_OK) {
799 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Get rowId stmt fail, errCode=%d", errCode);
800 return CheckCorruptedStatus(errCode);
801 }
802 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument index is 1
803 if (errCode != E_OK) {
804 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Bind rowid stmt fail, errCode=%d", errCode);
805 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
806 return CheckCorruptedStatus(errCode);
807 }
808 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
809 if (errCode != E_OK) {
810 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
811 }
812 return errCode;
813 }
814
OpenResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)815 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(QueryObject &queryObj,
816 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
817 {
818 if (dbHandle_ == nullptr) {
819 return -E_INVALID_DB;
820 }
821
822 int errCode = E_OK;
823 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
824 if (errCode != E_OK) {
825 return errCode;
826 }
827
828 if (!queryObj.IsValid()) {
829 LOGE("[SqlSinExe][OpenResSetRowId][Query] query object not Valid");
830 return -E_INVALID_QUERY_FORMAT;
831 }
832
833 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
834 if (errCode != E_OK) {
835 LOGE("[SqlSinExe][OpenResSetRowId][Query] Get Stmt fail, errCode=%d", errCode);
836 // The GetQuerySqlStatement does not self rollback(BAD...), so we have to reset the stmt here.
837 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
838 return errCode;
839 }
840 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
841 if (errCode != E_OK) {
842 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
843 }
844 return errCode;
845 }
846
ReloadResultSet(const Key & keyPrefix)847 int SQLiteSingleVerStorageExecutor::ReloadResultSet(const Key &keyPrefix)
848 {
849 int errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, SELECT_SYNC_ROWID_PREFIX_SQL);
850 if (errCode != E_OK) {
851 return CheckCorruptedStatus(errCode);
852 }
853
854 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
855 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
856 if (errCode != E_OK) {
857 LOGE("Rebind result set rowid statement of keyPrefix error:%d", errCode);
858 return CheckCorruptedStatus(errCode);
859 }
860 return E_OK;
861 }
862
ReloadResultSet(QueryObject & queryObj)863 int SQLiteSingleVerStorageExecutor::ReloadResultSet(QueryObject &queryObj)
864 {
865 int errCode = E_OK;
866 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
867 if (errCode != E_OK) {
868 return errCode;
869 }
870
871 if (!queryObj.IsValid()) {
872 return -E_INVALID_QUERY_FORMAT;
873 }
874
875 std::string sql;
876 errCode = helper.GetQuerySql(sql, true); // only rowid sql
877 if (errCode != E_OK) {
878 return errCode;
879 }
880
881 errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, sql);
882 if (errCode != E_OK) {
883 return CheckCorruptedStatus(errCode);
884 }
885
886 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
887 // GetQuerySqlStatement will not alter getResultRowIdStatement_ if it is not null
888 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
889 if (errCode != E_OK) {
890 LOGE("Rebind result set rowid statement of query error:%d", errCode);
891 return CheckCorruptedStatus(errCode);
892 }
893 return E_OK;
894 }
895
ReloadResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)896 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(const Key &keyPrefix,
897 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
898 {
899 int errCode = ReloadResultSet(keyPrefix); // Reuse this function(A convenience)
900 if (errCode != E_OK) {
901 return errCode;
902 }
903 int count = 0; // Ignored
904 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
905 if (errCode != E_OK) {
906 LOGE("[SqlSinExe][ReloadResSet][KeyPrefix] Load fail, errCode=%d", errCode);
907 }
908 // We can just return, no need to reset the statement
909 return errCode;
910 }
911
ReloadResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)912 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(QueryObject &queryObj,
913 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
914 {
915 int errCode = ReloadResultSet(queryObj); // Reuse this function(A convenience)
916 if (errCode != E_OK) {
917 return errCode;
918 }
919 int count = 0; // Ignored
920 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
921 if (errCode != E_OK) {
922 LOGE("[SqlSinExe][ReloadResSet][Query] Load fail, errCode=%d", errCode);
923 }
924 // We can just return, no need to reset the statement
925 return errCode;
926 }
927
GetNextEntryFromResultSet(Key & key,Value & value,bool isCopy)928 int SQLiteSingleVerStorageExecutor::GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy)
929 {
930 if (getResultRowIdStatement_ == nullptr || getResultEntryStatement_ == nullptr) {
931 return -E_RESULT_SET_STATUS_INVALID;
932 }
933
934 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
935 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
936 if (!isCopy) {
937 return E_OK;
938 }
939 int64_t rowId = sqlite3_column_int64(getResultRowIdStatement_, 0);
940 errCode = E_OK;
941 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
942 if (errCode != E_OK) {
943 LOGE("[SqlSinExe][GetNext] Reset result set entry statement fail, errCode=%d.", errCode);
944 return CheckCorruptedStatus(errCode);
945 }
946
947 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
948 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
949 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
950 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, key);
951 if (errCode != E_OK) {
952 LOGE("[SqlSinExe][GetNext] Get key failed:%d", errCode);
953 return CheckCorruptedStatus(errCode);
954 }
955 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, value);
956 if (errCode != E_OK) {
957 LOGE("[SqlSinExe][GetNext] Get value failed:%d", errCode);
958 return CheckCorruptedStatus(errCode);
959 }
960 return E_OK;
961 } else {
962 return -E_UNEXPECTED_DATA;
963 }
964 }
965 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
966 return -E_FINISHED;
967 }
968
969 LOGE("SQLite step failed:%d", errCode);
970 return CheckCorruptedStatus(errCode);
971 }
972
GetEntryByRowId(int64_t rowId,Entry & entry)973 int SQLiteSingleVerStorageExecutor::GetEntryByRowId(int64_t rowId, Entry &entry)
974 {
975 if (getResultEntryStatement_ == nullptr) {
976 return -E_RESULT_SET_STATUS_INVALID;
977 }
978 int errCode = E_OK;
979 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
980 if (errCode != E_OK) {
981 LOGE("[SqlSinExe][GetEntryByRowid] Reset result set entry statement fail, errCode=%d.", errCode);
982 return CheckCorruptedStatus(errCode);
983 }
984 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
985 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
986 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
987 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, entry.key);
988 if (errCode != E_OK) {
989 LOGE("[SqlSinExe][GetEntryByRowid] Get key failed, errCode=%d.", errCode);
990 return CheckCorruptedStatus(errCode);
991 }
992 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, entry.value);
993 if (errCode != E_OK) {
994 LOGE("[SqlSinExe][GetEntryByRowid] Get value failed, errCode=%d.", errCode);
995 return CheckCorruptedStatus(errCode);
996 }
997 return E_OK;
998 } else {
999 LOGE("[SqlSinExe][GetEntryByRowid] Step failed, errCode=%d.", errCode);
1000 return -E_UNEXPECTED_DATA;
1001 }
1002 }
1003
CloseResultSet()1004 void SQLiteSingleVerStorageExecutor::CloseResultSet()
1005 {
1006 int errCode = E_OK;
1007 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1008 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1009 SetCorruptedStatus();
1010 }
1011 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1012 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1013 SetCorruptedStatus();
1014 }
1015 if (isTransactionOpen_) {
1016 SQLiteUtils::RollbackTransaction(dbHandle_);
1017 isTransactionOpen_ = false;
1018 }
1019 }
1020
StartTransaction(TransactType type)1021 int SQLiteSingleVerStorageExecutor::StartTransaction(TransactType type)
1022 {
1023 if (dbHandle_ == nullptr) {
1024 LOGE("Begin transaction failed, dbHandle is null.");
1025 return -E_INVALID_DB;
1026 }
1027 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
1028 if (errCode == E_OK) {
1029 isTransactionOpen_ = true;
1030 } else {
1031 LOGE("Begin transaction failed, errCode = %d", errCode);
1032 }
1033 return CheckCorruptedStatus(errCode);
1034 }
1035
Commit()1036 int SQLiteSingleVerStorageExecutor::Commit()
1037 {
1038 if (dbHandle_ == nullptr) {
1039 return -E_INVALID_DB;
1040 }
1041 int errCode = SQLiteUtils::CommitTransaction(dbHandle_);
1042 if (errCode != E_OK) {
1043 return CheckCorruptedStatus(errCode);
1044 }
1045 isTransactionOpen_ = false;
1046 return E_OK;
1047 }
1048
Rollback()1049 int SQLiteSingleVerStorageExecutor::Rollback()
1050 {
1051 if (dbHandle_ == nullptr) {
1052 return -E_INVALID_DB;
1053 }
1054 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
1055 if (errCode != E_OK) {
1056 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
1057 return CheckCorruptedStatus(errCode);
1058 }
1059 isTransactionOpen_ = false;
1060 return E_OK;
1061 }
1062
CheckIfKeyExisted(const Key & key,bool isLocal,Value & value,Timestamp & timestamp) const1063 bool SQLiteSingleVerStorageExecutor::CheckIfKeyExisted(const Key &key, bool isLocal,
1064 Value &value, Timestamp ×tamp) const
1065 {
1066 // not local value, no need to get the value.
1067 if (!isLocal) {
1068 return false;
1069 }
1070
1071 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1072 if (errCode != E_OK) {
1073 return false;
1074 }
1075 return true;
1076 }
1077
GetDeviceIdentifier(PragmaEntryDeviceIdentifier * identifier)1078 int SQLiteSingleVerStorageExecutor::GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier)
1079 {
1080 if (identifier == nullptr) {
1081 return -E_INVALID_ARGS;
1082 }
1083
1084 if (dbHandle_ == nullptr) {
1085 return -E_INVALID_DB;
1086 }
1087
1088 sqlite3_stmt *statement = nullptr;
1089 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ENTRY_DEVICE, statement);
1090 if (errCode != E_OK) {
1091 return errCode;
1092 }
1093
1094 int keyIndex = identifier->origDevice ? BIND_ORI_DEVICE_ID : BIND_PRE_DEVICE_ID;
1095 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, identifier->key, false);
1096 if (errCode != E_OK) {
1097 goto END;
1098 }
1099
1100 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1101 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1102 std::vector<uint8_t> deviceId;
1103 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, deviceId);
1104 identifier->deviceIdentifier.assign(deviceId.begin(), deviceId.end());
1105 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1106 errCode = -E_NOT_FOUND;
1107 }
1108
1109 END:
1110 SQLiteUtils::ResetStatement(statement, true, errCode);
1111 return CheckCorruptedStatus(errCode);
1112 }
1113
PutIntoCommittedData(const DataItem & itemPut,const DataItem & itemGet,const DataOperStatus & status,SingleVerNaturalStoreCommitNotifyData * committedData)1114 void SQLiteSingleVerStorageExecutor::PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet,
1115 const DataOperStatus &status, SingleVerNaturalStoreCommitNotifyData *committedData)
1116 {
1117 if (committedData == nullptr) {
1118 return;
1119 }
1120
1121 Entry entry;
1122 int errCode;
1123 if (!status.isDeleted) {
1124 entry.key = itemPut.key;
1125 entry.value = itemPut.value;
1126 DataType dataType = (status.preStatus == DataStatus::EXISTED) ? DataType::UPDATE : DataType::INSERT;
1127 errCode = committedData->InsertCommittedData(std::move(entry), dataType, true);
1128 } else {
1129 entry.key = itemGet.key;
1130 entry.value = itemGet.value;
1131 errCode = committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1132 }
1133
1134 if (errCode != E_OK) {
1135 LOGE("[SingleVerExe][PutCommitData]Insert failed:%d", errCode);
1136 }
1137 }
1138
PrepareForSavingData(const std::string & readSql,const std::string & insertSql,const std::string & updateSql,SaveRecordStatements & statements) const1139 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
1140 const std::string &updateSql, SaveRecordStatements &statements) const
1141 {
1142 int errCode = SQLiteUtils::GetStatement(dbHandle_, readSql, statements.queryStatement);
1143 if (errCode != E_OK) {
1144 LOGE("Get query statement failed. errCode = [%d]", errCode);
1145 goto ERR;
1146 }
1147
1148 errCode = SQLiteUtils::GetStatement(dbHandle_, insertSql, statements.insertStatement);
1149 if (errCode != E_OK) {
1150 LOGE("Get insert statement failed. errCode = [%d]", errCode);
1151 goto ERR;
1152 }
1153
1154 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, statements.updateStatement);
1155 if (errCode != E_OK) {
1156 LOGE("Get update statement failed. errCode = [%d]", errCode);
1157 goto ERR;
1158 }
1159 return E_OK;
1160 ERR:
1161 (void)statements.ResetStatement();
1162 return errCode;
1163 }
1164
PrepareForSavingData(SingleVerDataType type)1165 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(SingleVerDataType type)
1166 {
1167 int errCode = -E_NOT_SUPPORT;
1168 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1169 // currently, Local type has not been optimized, so pass updateSql parameter with INSERT_LOCAL_SQL
1170 errCode = PrepareForSavingData(SELECT_LOCAL_HASH_SQL, INSERT_LOCAL_SQL, INSERT_LOCAL_SQL, saveLocalStatements_);
1171 } else if (type == SingleVerDataType::SYNC_TYPE) {
1172 errCode = PrepareForSavingData(SELECT_SYNC_HASH_SQL, INSERT_SYNC_SQL, UPDATE_SYNC_SQL, saveSyncStatements_);
1173 }
1174 return CheckCorruptedStatus(errCode);
1175 }
1176
ResetForSavingData(SingleVerDataType type)1177 int SQLiteSingleVerStorageExecutor::ResetForSavingData(SingleVerDataType type)
1178 {
1179 int errCode = E_OK;
1180 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1181 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
1182 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
1183 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
1184 } else if (type == SingleVerDataType::SYNC_TYPE) {
1185 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1186 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1187 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1188 }
1189 return CheckCorruptedStatus(errCode);
1190 }
1191
GetOriginDevName(const DataItem & dataItem,const std::string & origDevGet)1192 std::string SQLiteSingleVerStorageExecutor::GetOriginDevName(const DataItem &dataItem,
1193 const std::string &origDevGet)
1194 {
1195 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
1196 return origDevGet;
1197 }
1198 return dataItem.origDev;
1199 }
1200
SaveSyncDataToDatabase(const DataItem & dataItem,const Key & hashKey,const std::string & origDev,const std::string & deviceName,bool isUpdate)1201 int SQLiteSingleVerStorageExecutor::SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey,
1202 const std::string &origDev, const std::string &deviceName, bool isUpdate)
1203 {
1204 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1205 LOGD("Find query data missing, erase local data.");
1206 return EraseSyncData(hashKey);
1207 }
1208 auto statement = saveSyncStatements_.GetDataSaveStatement(isUpdate);
1209 if (statement == nullptr) {
1210 return -E_INVALID_ARGS;
1211 }
1212
1213 std::string devName = DBCommon::TransferHashString(deviceName);
1214 int errCode = BindSavedSyncData(statement, dataItem, hashKey, {origDev, devName}, isUpdate);
1215 if (errCode != E_OK) {
1216 return errCode;
1217 }
1218
1219 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1220 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1221 errCode = E_OK;
1222 }
1223 if (errCode == E_OK) {
1224 errCode = RemoveCloudUploadFlag(hashKey);
1225 }
1226 return errCode;
1227 }
1228
JudgeSyncSaveType(DataItem & dataItem,const DataItem & itemGet,const std::string & devName,bool isHashKeyExisted,bool isPermitForceWrite)1229 DataOperStatus SQLiteSingleVerStorageExecutor::JudgeSyncSaveType(DataItem &dataItem,
1230 const DataItem &itemGet, const std::string &devName, bool isHashKeyExisted, bool isPermitForceWrite)
1231 {
1232 DataOperStatus status;
1233 status.isDeleted = ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1234 (dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY);
1235 if (isHashKeyExisted) {
1236 if ((itemGet.flag & DataItem::DELETE_FLAG) != 0) {
1237 status.preStatus = DataStatus::DELETED;
1238 } else {
1239 status.preStatus = DataStatus::EXISTED;
1240 }
1241 std::string deviceName = DBCommon::TransferHashString(devName);
1242 if (itemGet.writeTimestamp >= dataItem.writeTimestamp) {
1243 // for multi user mode, no permit to forcewrite
1244 if ((!deviceName.empty()) && IsFromDataOwner(itemGet, deviceName) && isPermitForceWrite) {
1245 LOGI("Force overwrite the data:%" PRIu64 " vs %" PRIu64,
1246 itemGet.writeTimestamp, dataItem.writeTimestamp);
1247 status.isDefeated = false;
1248 dataItem.writeTimestamp = itemGet.writeTimestamp + 1;
1249 dataItem.timestamp = itemGet.timestamp;
1250 } else {
1251 status.isDefeated = true;
1252 }
1253 }
1254 }
1255 return status;
1256 }
1257
GetSyncDataItemExt(const DataItem & dataItem,DataItem & itemGet,const DataOperStatus & dataStatus) const1258 int SQLiteSingleVerStorageExecutor::GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet,
1259 const DataOperStatus &dataStatus) const
1260 {
1261 if (dataStatus.preStatus != DataStatus::EXISTED) {
1262 return E_OK;
1263 }
1264 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1265 // only deleted item need origin value.
1266 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1267 if (errCode != E_OK) {
1268 return errCode;
1269 }
1270
1271 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, itemGet.value);
1272 if (errCode != E_OK) {
1273 LOGE("Get column value data failed:%d", errCode);
1274 }
1275
1276 return errCode;
1277 }
1278
ResetSaveSyncStatements(int errCode)1279 int SQLiteSingleVerStorageExecutor::ResetSaveSyncStatements(int errCode)
1280 {
1281 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1282 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1283 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1284 return CheckCorruptedStatus(errCode);
1285 }
1286
1287 namespace {
IsNeedIgnoredData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & devInfo,bool isHashKeyExisted,int policy)1288 inline bool IsNeedIgnoredData(const DataItem &itemPut, const DataItem &itemGet,
1289 const DeviceInfo &devInfo, bool isHashKeyExisted, int policy)
1290 {
1291 // deny the data synced from other dev which the origin dev is current or the existed value is current dev data.
1292 return (((itemGet.origDev.empty() && isHashKeyExisted) || itemPut.origDev.empty()) &&
1293 (!devInfo.isLocal && policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA));
1294 }
1295 }
1296
PrepareForNotifyConflictAndObserver(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)1297 int SQLiteSingleVerStorageExecutor::PrepareForNotifyConflictAndObserver(DataItem &dataItem,
1298 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
1299 {
1300 // Check sava data existed info
1301 int errCode = GetSyncDataItemPre(dataItem, notify.getData, notify.hashKey);
1302 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1303 LOGD("[SingleVerExe][PrepareForNotifyConflictAndObserver] failed:%d", errCode);
1304 if (isSyncMigrating_) {
1305 ResetForMigrateCacheData();
1306 return errCode;
1307 }
1308 return ResetSaveSyncStatements(errCode);
1309 }
1310
1311 bool isHashKeyExisted = (errCode != -E_NOT_FOUND);
1312 if (IsNeedIgnoredData(dataItem, notify.getData, deviceInfo, isHashKeyExisted, conflictResolvePolicy_)) {
1313 LOGD("[SingleVerExe] Ignore the sync data.");
1314 if (isSyncMigrating_) {
1315 ResetForMigrateCacheData();
1316 return -E_IGNORE_DATA;
1317 }
1318 return ResetSaveSyncStatements(-E_IGNORE_DATA);
1319 }
1320
1321 notify.dataStatus = JudgeSyncSaveType(dataItem, notify.getData, deviceInfo.deviceName, isHashKeyExisted,
1322 isPermitForceWrite);
1323 InitCommitNotifyDataKeyStatus(notify.committedData, notify.hashKey, notify.dataStatus);
1324
1325 // Nonexistent data, but deleted by local.
1326 if ((notify.dataStatus.preStatus == DataStatus::DELETED || notify.dataStatus.preStatus == DataStatus::NOEXISTED) &&
1327 (dataItem.flag & DataItem::DELETE_FLAG) != 0 &&
1328 (dataItem.flag & DataItem::LOCAL_FLAG) != 0) {
1329 // For delete item in cacheDB, which not in mainDB. Cannot notify, but this is not error.
1330 errCode = -E_NOT_FOUND;
1331 LOGD("Nonexistent data, but deleted by local");
1332 if (isSyncMigrating_) {
1333 ResetForMigrateCacheData();
1334 return errCode;
1335 }
1336 return ResetSaveSyncStatements(errCode);
1337 }
1338
1339 // get key and value from ori database
1340 errCode = GetSyncDataItemExt(dataItem, notify.getData, notify.dataStatus);
1341 if (errCode != E_OK) {
1342 LOGD("GetSyncDataItemExt failed:%d", errCode);
1343 if (isSyncMigrating_) {
1344 ResetForMigrateCacheData();
1345 return errCode;
1346 }
1347 return ResetSaveSyncStatements(errCode);
1348 }
1349
1350 return E_OK;
1351 }
1352
SaveSyncDataItem(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,SingleVerNaturalStoreCommitNotifyData * committedData,bool isPermitForceWrite)1353 int SQLiteSingleVerStorageExecutor::SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
1354 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite)
1355 {
1356 NotifyConflictAndObserverData notify = {
1357 .committedData = committedData
1358 };
1359
1360 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
1361 if (errCode != E_OK) {
1362 if (errCode == -E_IGNORE_DATA) {
1363 errCode = E_OK;
1364 }
1365 return errCode;
1366 }
1367
1368 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, committedData);
1369 if (notify.dataStatus.isDefeated) {
1370 LOGE("Data status is defeated:%d", errCode);
1371 return ResetSaveSyncStatements(errCode);
1372 }
1373
1374 bool isUpdate = (notify.dataStatus.preStatus != DataStatus::NOEXISTED);
1375 std::string origDev = GetOriginDevName(dataItem, notify.getData.origDev);
1376 errCode = SaveSyncDataToDatabase(dataItem, notify.hashKey, origDev, deviceInfo.deviceName, isUpdate);
1377 if (errCode == E_OK) {
1378 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, committedData);
1379 maxStamp = std::max(dataItem.timestamp, maxStamp);
1380 } else {
1381 LOGE("Save sync data to db failed:%d", errCode);
1382 }
1383 return ResetSaveSyncStatements(errCode);
1384 }
1385
GetAllMetaKeys(std::vector<Key> & keys) const1386 int SQLiteSingleVerStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
1387 {
1388 sqlite3_stmt *statement = nullptr;
1389 const std::string &sqlStr = (attachMetaMode_ ? SELECT_ATTACH_ALL_META_KEYS : SELECT_ALL_META_KEYS);
1390 int errCode = SQLiteUtils::GetStatement(dbHandle_, sqlStr, statement);
1391 if (errCode != E_OK) {
1392 LOGE("[SingleVerExe][GetAllKey] Get statement failed:%d", errCode);
1393 return errCode;
1394 }
1395
1396 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
1397 SQLiteUtils::ResetStatement(statement, true, errCode);
1398 return errCode;
1399 }
1400
GetAllSyncedEntries(const std::string & hashDev,std::vector<Entry> & entries) const1401 int SQLiteSingleVerStorageExecutor::GetAllSyncedEntries(const std::string &hashDev,
1402 std::vector<Entry> &entries) const
1403 {
1404 int errCode = E_OK;
1405 sqlite3_stmt *statement = nullptr;
1406 if (hashDev.empty()) {
1407 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1408 SELECT_ALL_SYNC_ENTRIES_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES);
1409 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1410 if (errCode != E_OK) {
1411 LOGE("Get all entries statement failed:%d", errCode);
1412 return errCode;
1413 }
1414 } else {
1415 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1416 SELECT_ALL_SYNC_ENTRIES_BY_DEV_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES_BY_DEV);
1417 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1418 if (errCode != E_OK) {
1419 LOGE("Get all entries statement failed:%d", errCode);
1420 return errCode;
1421 }
1422
1423 // deviceName always hash string
1424 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
1425 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, devVect, true); // bind the 1st to device.
1426 if (errCode != E_OK) {
1427 LOGE("Failed to bind the synced device for all entries:%d", errCode);
1428 goto END;
1429 }
1430 }
1431
1432 errCode = GetAllEntries(statement, entries);
1433 END:
1434 SQLiteUtils::ResetStatement(statement, true, errCode);
1435 return errCode;
1436 }
1437
GetAllEntries(sqlite3_stmt * statement,std::vector<Entry> & entries) const1438 int SQLiteSingleVerStorageExecutor::GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const
1439 {
1440 if (statement == nullptr) {
1441 return -E_INVALID_DB;
1442 }
1443 int errCode;
1444 do {
1445 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1446 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1447 Entry entry;
1448 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key); // No.0 is the key
1449 if (errCode != E_OK) {
1450 break;
1451 }
1452 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value); // No.1 is the value
1453 if (errCode != E_OK) {
1454 break;
1455 }
1456
1457 entries.push_back(std::move(entry));
1458 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1459 errCode = E_OK;
1460 break;
1461 } else {
1462 LOGE("SQLite step for all entries failed:%d", errCode);
1463 break;
1464 }
1465 } while (true);
1466
1467 return errCode;
1468 }
1469
BindSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,const SyncDataDevices & devices,bool isUpdate)1470 int SQLiteSingleVerStorageExecutor::BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1471 const Key &hashKey, const SyncDataDevices &devices, bool isUpdate)
1472 {
1473 const int hashKeyIndex = isUpdate ? BIND_SYNC_UPDATE_HASH_KEY_INDEX : BIND_SYNC_HASH_KEY_INDEX;
1474 int errCode = SQLiteUtils::BindBlobToStatement(statement, hashKeyIndex, hashKey, false);
1475 if (errCode != E_OK) {
1476 LOGE("Bind saved sync data hash key failed:%d", errCode);
1477 return errCode;
1478 }
1479
1480 // if delete flag is set, just use the hash key instead of the key
1481 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1482 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_SYNC_KEY_INDEX, -1));
1483 } else {
1484 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_KEY_INDEX, dataItem.key, false);
1485 }
1486
1487 if (errCode != E_OK) {
1488 LOGE("Bind saved sync data key failed:%d", errCode);
1489 return errCode;
1490 }
1491
1492 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_VAL_INDEX, dataItem.value, true);
1493 if (errCode != E_OK) {
1494 LOGE("Bind saved sync data value failed:%d", errCode);
1495 return errCode;
1496 }
1497
1498 errCode = BindSyncDataTime(statement, dataItem, isUpdate);
1499 if (errCode != E_OK) {
1500 return errCode;
1501 }
1502 return BindDevForSavedSyncData(statement, dataItem, devices.origDev, devices.dev);
1503 }
1504
PutConflictData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & deviceInfo,const DataOperStatus & dataStatus,SingleVerNaturalStoreCommitNotifyData * commitData)1505 void SQLiteSingleVerStorageExecutor::PutConflictData(const DataItem &itemPut, const DataItem &itemGet,
1506 const DeviceInfo &deviceInfo, const DataOperStatus &dataStatus,
1507 SingleVerNaturalStoreCommitNotifyData *commitData)
1508 {
1509 if (commitData == nullptr) {
1510 return;
1511 }
1512
1513 bool conflictNotifyMatch = commitData->IsConflictedNotifyMatched(itemPut, itemGet);
1514 if (!conflictNotifyMatch) {
1515 return;
1516 }
1517
1518 if (dataStatus.preStatus == DataStatus::NOEXISTED ||
1519 ((dataStatus.preStatus == DataStatus::DELETED) && dataStatus.isDeleted)) {
1520 return;
1521 }
1522
1523 Key origKey;
1524 if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1525 (itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1526 origKey = itemGet.key;
1527 } else {
1528 origKey = itemPut.key;
1529 }
1530
1531 // insert db original entry
1532 std::vector<uint8_t> getDevVect(itemGet.dev.begin(), itemGet.dev.end());
1533 DataItemInfo orgItemInfo = {itemGet, true, getDevVect};
1534 orgItemInfo.dataItem.key = origKey;
1535 commitData->InsertConflictedItem(orgItemInfo, true);
1536
1537 // insert conflict entry
1538 std::string putDeviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1539 std::vector<uint8_t> putDevVect(putDeviceName.begin(), putDeviceName.end());
1540
1541 DataItemInfo newItemInfo = {itemPut, deviceInfo.isLocal, putDevVect};
1542 newItemInfo.dataItem.key = origKey;
1543 commitData->InsertConflictedItem(newItemInfo, false);
1544 }
1545
Reset()1546 int SQLiteSingleVerStorageExecutor::Reset()
1547 {
1548 if (isTransactionOpen_) {
1549 Rollback();
1550 }
1551
1552 int errCode = ResetForSavingData(SingleVerDataType::SYNC_TYPE);
1553 if (errCode != E_OK) {
1554 LOGE("Finalize the sync resources for saving sync data failed: %d", errCode);
1555 }
1556
1557 errCode = ResetForSavingData(SingleVerDataType::LOCAL_TYPE_SQLITE);
1558 if (errCode != E_OK) {
1559 LOGE("Finalize the local resources for saving sync data failed: %d", errCode);
1560 }
1561 return SQLiteStorageExecutor::Reset();
1562 }
1563
GetSyncDataItemPre(const DataItem & itemPut,DataItem & itemGet,Key & hashKey) const1564 int SQLiteSingleVerStorageExecutor::GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet,
1565 Key &hashKey) const
1566 {
1567 if (isSyncMigrating_) {
1568 hashKey = itemPut.hashKey;
1569 } else if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1570 ((itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY)) {
1571 hashKey = itemPut.key;
1572 } else {
1573 int errCode = DBCommon::CalcValueHash(itemPut.key, hashKey);
1574 if (errCode != E_OK) {
1575 return errCode;
1576 }
1577 }
1578
1579 return GetSyncDataPreByHashKey(hashKey, itemGet);
1580 }
1581
GetSyncDataPreByHashKey(const Key & hashKey,DataItem & itemGet) const1582 int SQLiteSingleVerStorageExecutor::GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const
1583 {
1584 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1585 int errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // 1st arg.
1586 if (errCode != E_OK) {
1587 return errCode;
1588 }
1589
1590 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1591 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // no find the key
1592 errCode = -E_NOT_FOUND;
1593 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1594 itemGet.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1595 itemGet.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1596 itemGet.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1597 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1598 if (errCode != E_OK) {
1599 return errCode;
1600 }
1601 std::vector<uint8_t> devVect;
1602 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1603 if (errCode != E_OK) {
1604 return errCode;
1605 }
1606
1607 std::vector<uint8_t> origDevVect;
1608 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
1609 if (errCode != E_OK) {
1610 return errCode;
1611 }
1612 itemGet.dev.assign(devVect.begin(), devVect.end());
1613 itemGet.origDev.assign(origDevVect.begin(), origDevVect.end());
1614 }
1615 return errCode;
1616 }
1617
DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData * committedData,const Key & key,const Value & value)1618 int SQLiteSingleVerStorageExecutor::DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
1619 const Key &key, const Value &value)
1620 {
1621 if (committedData != nullptr) {
1622 Key hashKey;
1623 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
1624 if (innerErrCode != E_OK) {
1625 return innerErrCode;
1626 }
1627 committedData->InitKeyPropRecord(hashKey, ExistStatus::EXIST);
1628 }
1629
1630 std::string sql = DELETE_LOCAL_SQL;
1631 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
1632 sql = DELETE_LOCAL_SQL_FROM_CACHEHANDLE;
1633 }
1634 sqlite3_stmt *statement = nullptr;
1635 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1636 if (errCode != E_OK) {
1637 goto ERROR;
1638 }
1639
1640 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);
1641 if (errCode != E_OK) {
1642 LOGE("Bind the key error(%d) when delete kv data.", errCode);
1643 goto ERROR;
1644 }
1645
1646 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1647 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1648 if (sqlite3_changes(dbHandle_) > 0) {
1649 if (committedData != nullptr) {
1650 Entry entry = {key, value};
1651 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1652 } else {
1653 LOGE("DeleteLocalKvData failed to do commit notify because of OOM.");
1654 }
1655 errCode = E_OK;
1656 }
1657 }
1658
1659 ERROR:
1660 SQLiteUtils::ResetStatement(statement, true, errCode);
1661 return CheckCorruptedStatus(errCode);
1662 }
1663
DeleteLocalKvData(const Key & key,SingleVerNaturalStoreCommitNotifyData * committedData,Value & value,Timestamp & timestamp)1664 int SQLiteSingleVerStorageExecutor::DeleteLocalKvData(const Key &key,
1665 SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, Timestamp ×tamp)
1666 {
1667 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1668 if (errCode != E_OK) {
1669 return CheckCorruptedStatus(errCode);
1670 }
1671
1672 return DeleteLocalDataInner(committedData, key, value);
1673 }
1674
EraseSyncData(const Key & hashKey)1675 int SQLiteSingleVerStorageExecutor::EraseSyncData(const Key &hashKey)
1676 {
1677 sqlite3_stmt *stmt = nullptr;
1678 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
1679 DELETE_SYNC_DATA_WITH_HASHKEY_FROM_CACHEHANDLE : DELETE_SYNC_DATA_WITH_HASHKEY;
1680 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1681 if (errCode != E_OK) {
1682 LOGE("get erase statement failed:%d", errCode);
1683 return errCode;
1684 }
1685
1686 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey, false);
1687 if (errCode != E_OK) {
1688 LOGE("bind hashKey failed:%d", errCode);
1689 goto END;
1690 }
1691
1692 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1693 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1694 errCode = E_OK;
1695 } else {
1696 LOGE("erase data failed:%d", errCode);
1697 }
1698 END:
1699 SQLiteUtils::ResetStatement(stmt, true, errCode);
1700 return CheckCorruptedStatus(errCode);
1701 }
1702
RemoveDeviceData(const std::string & deviceName)1703 int SQLiteSingleVerStorageExecutor::RemoveDeviceData(const std::string &deviceName)
1704 {
1705 int errCode = E_OK;
1706 bool isCreate = false;
1707 int ret = SQLiteUtils::CheckTableExists(dbHandle_, NATURALBASE_KV_AUX_SYNC_DATA_LOG_TABLE_NAME, isCreate);
1708 bool isTableExists = (ret == E_OK && isCreate);
1709 if (deviceName.empty()) {
1710 if (isTableExists) {
1711 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_ALL_LOG_DATA_SQL, "", "");
1712 }
1713 errCode = CloudExcuteRemoveOrUpdate(REMOVE_ALL_DEV_DATA_SQL, "", "");
1714 } else {
1715 if (isTableExists) {
1716 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_LOG_DATA_BY_DEVID_SQL, deviceName, "");
1717 }
1718 errCode = CloudExcuteRemoveOrUpdate(REMOVE_DEV_DATA_SQL, deviceName, "");
1719 }
1720 return CheckCorruptedStatus(errCode);
1721 }
1722
StepForResultEntries(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries) const1723 int SQLiteSingleVerStorageExecutor::StepForResultEntries(bool isGetValue, sqlite3_stmt *statement,
1724 std::vector<Entry> &entries) const
1725 {
1726 entries.clear();
1727 entries.shrink_to_fit();
1728 int errCode = E_OK;
1729 do {
1730 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1731 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1732 errCode = GetEntryFromStatement(isGetValue, statement, entries);
1733 if (errCode != E_OK) {
1734 return errCode;
1735 }
1736 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1737 errCode = E_OK;
1738 break;
1739 } else {
1740 LOGE("SQLite step failed:%d", errCode);
1741 return errCode;
1742 }
1743 } while (true);
1744
1745 // if select no result, return the -E_NOT_FOUND.
1746 if (entries.empty()) {
1747 errCode = -E_NOT_FOUND;
1748 }
1749
1750 return errCode;
1751 }
1752
BindDevForSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const std::string & origDev,const std::string & deviceName)1753 int SQLiteSingleVerStorageExecutor::BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1754 const std::string &origDev, const std::string &deviceName)
1755 {
1756 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_FLAG_INDEX,
1757 static_cast<int64_t>(dataItem.flag));
1758 if (errCode != E_OK) {
1759 LOGE("Bind saved sync data flag failed:%d", errCode);
1760 return errCode;
1761 }
1762
1763 std::vector<uint8_t> devVect(deviceName.begin(), deviceName.end());
1764 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_DEV_INDEX, devVect, true);
1765 if (errCode != E_OK) {
1766 LOGE("Bind dev for sync data failed:%d", errCode);
1767 return errCode;
1768 }
1769
1770 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
1771 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_ORI_DEV_INDEX, origDevVect, true);
1772 if (errCode != E_OK) {
1773 LOGE("Bind orig dev for sync data failed:%d", errCode);
1774 }
1775
1776 return errCode;
1777 }
1778
GetDataItemSerialSize(const DataItem & item,size_t appendLen)1779 size_t SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(const DataItem &item, size_t appendLen)
1780 {
1781 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
1782 // the size would not be very large.
1783 static const size_t maxOrigDevLength = 40;
1784 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
1785 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
1786 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
1787 return dataSize;
1788 }
1789
InitResultSet(const Key & keyPrefix,sqlite3_stmt * & countStmt)1790 int SQLiteSingleVerStorageExecutor::InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt)
1791 {
1792 if (dbHandle_ == nullptr) {
1793 return -E_INVALID_DB;
1794 }
1795 // bind statement for count
1796 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_COUNT_SYNC_PREFIX_SQL, countStmt);
1797 if (errCode != E_OK) {
1798 LOGE("Get count statement for resultset error:%d", errCode);
1799 return errCode;
1800 }
1801
1802 errCode = SQLiteUtils::BindPrefixKey(countStmt, 1, keyPrefix); // first argument is key
1803 if (errCode != E_OK) {
1804 LOGE("Bind count key error:%d", errCode);
1805 goto ERROR;
1806 }
1807 // bind statement for result set
1808 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
1809 if (errCode != E_OK) {
1810 LOGE("Get result set rowid statement error:%d", errCode);
1811 goto ERROR;
1812 }
1813
1814 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1815 if (errCode != E_OK) {
1816 LOGE("Get result set entry statement error:%d", errCode);
1817 goto ERROR;
1818 }
1819
1820 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
1821 if (errCode != E_OK) {
1822 LOGE("Bind result set rowid statement error:%d", errCode);
1823 goto ERROR;
1824 }
1825 return E_OK;
1826
1827 ERROR:
1828 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1829 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1830 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1831 return CheckCorruptedStatus(errCode);
1832 }
1833
InitResultSetCount(QueryObject & queryObj,sqlite3_stmt * & countStmt)1834 int SQLiteSingleVerStorageExecutor::InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1835 {
1836 if (dbHandle_ == nullptr) {
1837 return -E_INVALID_DB;
1838 }
1839
1840 int errCode = E_OK;
1841 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1842 if (errCode != E_OK) {
1843 return errCode;
1844 }
1845
1846 errCode = helper.GetCountSqlStatement(dbHandle_, countStmt);
1847 if (errCode != E_OK) {
1848 LOGE("Get count bind statement error:%d", errCode);
1849 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1850 }
1851 return errCode;
1852 }
1853
InitResultSetContent(QueryObject & queryObj)1854 int SQLiteSingleVerStorageExecutor::InitResultSetContent(QueryObject &queryObj)
1855 {
1856 int errCode = E_OK;
1857 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1858 if (errCode != E_OK) {
1859 return errCode;
1860 }
1861
1862 // bind statement for result set
1863 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
1864 if (errCode != E_OK) {
1865 LOGE("[SqlSinExe][InitResSetContent] Bind result set rowid statement of query error:%d", errCode);
1866 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1867 return errCode;
1868 }
1869 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1870 if (errCode != E_OK) {
1871 LOGE("[SqlSinExe][InitResSetContent] Get result set entry statement of query error:%d", errCode);
1872 return CheckCorruptedStatus(errCode);
1873 }
1874 return errCode;
1875 }
1876
InitResultSet(QueryObject & queryObj,sqlite3_stmt * & countStmt)1877 int SQLiteSingleVerStorageExecutor::InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1878 {
1879 if (dbHandle_ == nullptr) {
1880 return -E_INVALID_DB;
1881 }
1882
1883 int errCode = E_OK;
1884 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1885 if (errCode != E_OK) {
1886 return errCode;
1887 }
1888
1889 if (!queryObj.IsValid()) {
1890 return -E_INVALID_QUERY_FORMAT;
1891 }
1892
1893 errCode = InitResultSetCount(queryObj, countStmt);
1894 if (errCode != E_OK) {
1895 return CheckCorruptedStatus(errCode);
1896 }
1897
1898 errCode = InitResultSetContent(queryObj);
1899 if (errCode != E_OK) {
1900 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1901 }
1902 return CheckCorruptedStatus(errCode);
1903 }
1904
UpdateLocalDataTimestamp(Timestamp timestamp)1905 int SQLiteSingleVerStorageExecutor::UpdateLocalDataTimestamp(Timestamp timestamp)
1906 {
1907 const std::string updateSql = "UPDATE local_data SET timestamp=";
1908 std::string sql = updateSql + std::to_string(timestamp) + " WHERE timestamp=0;";
1909 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1910 return CheckCorruptedStatus(errCode);
1911 }
1912
SetAttachMetaMode(bool attachMetaMode)1913 void SQLiteSingleVerStorageExecutor::SetAttachMetaMode(bool attachMetaMode)
1914 {
1915 attachMetaMode_ = attachMetaMode;
1916 }
1917
GetOneRawDataItem(sqlite3_stmt * statement,DataItem & dataItem,uint64_t & verInCurCacheDb,bool isCacheDb) const1918 int SQLiteSingleVerStorageExecutor::GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
1919 uint64_t &verInCurCacheDb, bool isCacheDb) const
1920 {
1921 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, dataItem.key);
1922 if (errCode != E_OK) {
1923 return errCode;
1924 }
1925
1926 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
1927 if (errCode != E_OK) {
1928 return errCode;
1929 }
1930
1931 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1932 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1933
1934 std::vector<uint8_t> devVect;
1935 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1936 if (errCode != E_OK) {
1937 return errCode;
1938 }
1939 dataItem.dev = std::string(devVect.begin(), devVect.end());
1940
1941 devVect.clear();
1942 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
1943 if (errCode != E_OK) {
1944 return errCode;
1945 }
1946 dataItem.origDev = std::string(devVect.begin(), devVect.end());
1947
1948 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_HASH_KEY_INDEX, dataItem.hashKey);
1949 if (errCode != E_OK) {
1950 return errCode;
1951 }
1952 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1953 if (errCode != E_OK) {
1954 return errCode;
1955 }
1956 if (isCacheDb) {
1957 verInCurCacheDb = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_VERSION_INDEX));
1958 }
1959 return E_OK;
1960 }
1961
GetAllDataItems(sqlite3_stmt * statement,std::vector<DataItem> & dataItems,uint64_t & verInCurCacheDb,bool isCacheDb) const1962 int SQLiteSingleVerStorageExecutor::GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
1963 uint64_t &verInCurCacheDb, bool isCacheDb) const
1964 {
1965 dataItems.clear();
1966 dataItems.shrink_to_fit();
1967 DataItem dataItem;
1968 int errCode;
1969 do {
1970 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1971 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1972 errCode = GetOneRawDataItem(statement, dataItem, verInCurCacheDb, isCacheDb);
1973 if (errCode != E_OK) {
1974 return errCode;
1975 }
1976 dataItems.push_back(std::move(dataItem));
1977 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1978 errCode = E_OK;
1979 break;
1980 } else {
1981 LOGE("SQLite step failed:%d", errCode);
1982 break;
1983 }
1984 } while (true);
1985
1986 return CheckCorruptedStatus(errCode);
1987 }
1988
OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)1989 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache,
1990 uint32_t cacheLimit, int &count)
1991 {
1992 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1993 if (errCode != E_OK) {
1994 LOGE("[SqlSinExe][OpenResSetRowId][Common] Get entry stmt fail, errCode=%d", errCode);
1995 return CheckCorruptedStatus(errCode);
1996 }
1997 errCode = StartTransaction(TransactType::DEFERRED);
1998 if (errCode != E_OK) {
1999 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2000 return CheckCorruptedStatus(errCode);
2001 }
2002 // Now Ready To Execute
2003 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, 0, count);
2004 if (errCode != E_OK) {
2005 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2006 Rollback();
2007 return CheckCorruptedStatus(errCode);
2008 }
2009 // Consider finalize getResultRowIdStatement_ here if count equal to size of rowIdCache.
2010 return E_OK;
2011 }
2012
ResultSetLoadRowIdCache(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos,int & count)2013 int SQLiteSingleVerStorageExecutor::ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
2014 uint32_t cacheStartPos, int &count)
2015 {
2016 rowIdCache.clear();
2017 count = 0;
2018 while (true) {
2019 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
2020 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2021 if (count >= static_cast<int>(cacheStartPos) && rowIdCache.size() < cacheLimit) {
2022 // If we can start cache, and, if we can still cache
2023 int64_t rowid = sqlite3_column_int64(getResultRowIdStatement_, 0);
2024 rowIdCache.push_back(rowid);
2025 }
2026 // Always increase the count
2027 count++;
2028 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2029 break;
2030 } else {
2031 LOGE("[SqlSinExe][ResSetLoadCache] Step fail, errCode=%d", errCode);
2032 rowIdCache.clear();
2033 count = 0;
2034 return CheckCorruptedStatus(errCode);
2035 }
2036 }
2037 return E_OK;
2038 }
2039
ResetStatement()2040 int SQLiteSingleVerStorageExecutor::SaveRecordStatements::ResetStatement()
2041 {
2042 int errCode = E_OK;
2043 SQLiteUtils::ResetStatement(insertStatement, true, errCode);
2044 if (errCode != E_OK) {
2045 LOGE("Finalize insert statements failed, error: %d", errCode);
2046 }
2047
2048 SQLiteUtils::ResetStatement(updateStatement, true, errCode);
2049 if (errCode != E_OK) {
2050 LOGE("Finalize update statements failed, error: %d", errCode);
2051 }
2052
2053 SQLiteUtils::ResetStatement(queryStatement, true, errCode);
2054 if (errCode != E_OK) {
2055 LOGE("Finalize query statement failed, error: %d", errCode);
2056 }
2057 return errCode;
2058 }
2059
FinalizeAllStatements()2060 void SQLiteSingleVerStorageExecutor::FinalizeAllStatements()
2061 {
2062 int errCode = saveLocalStatements_.ResetStatement();
2063 if (errCode != E_OK) {
2064 LOGE("Finalize saveLocal statements failed, error: %d", errCode);
2065 }
2066
2067 errCode = saveSyncStatements_.ResetStatement();
2068 if (errCode != E_OK) {
2069 LOGE("Finalize saveSync statement failed, error: %d", errCode);
2070 }
2071
2072 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
2073 if (errCode != E_OK) {
2074 LOGE("Finalize getResultRowIdStatement_ failed, error: %d", errCode);
2075 }
2076
2077 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2078 if (errCode != E_OK) {
2079 LOGE("Finalize getResultEntryStatement_ failed, error: %d", errCode);
2080 }
2081
2082 errCode = migrateSyncStatements_.ResetStatement();
2083 if (errCode != E_OK) {
2084 LOGE("Finalize migrateSync statements failed, error: %d", errCode);
2085 }
2086
2087 ReleaseContinueStatement();
2088 }
2089
SetConflictResolvePolicy(int policy)2090 void SQLiteSingleVerStorageExecutor::SetConflictResolvePolicy(int policy)
2091 {
2092 if (policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA || policy == DEFAULT_LAST_WIN) {
2093 conflictResolvePolicy_ = policy;
2094 }
2095 }
2096
CheckIntegrity() const2097 int SQLiteSingleVerStorageExecutor::CheckIntegrity() const
2098 {
2099 if (dbHandle_ == nullptr) {
2100 return -E_INVALID_DB;
2101 }
2102
2103 return SQLiteUtils::CheckIntegrity(dbHandle_, CHECK_DB_INTEGRITY_SQL);
2104 }
2105
ForceCheckPoint() const2106 int SQLiteSingleVerStorageExecutor::ForceCheckPoint() const
2107 {
2108 if (dbHandle_ == nullptr) {
2109 return -E_INVALID_DB;
2110 }
2111 SQLiteUtils::ExecuteCheckPoint(dbHandle_);
2112 return E_OK;
2113 }
2114
GetLogFileSize() const2115 uint64_t SQLiteSingleVerStorageExecutor::GetLogFileSize() const
2116 {
2117 if (isMemDb_) {
2118 return 0;
2119 }
2120
2121 const char *fileName = sqlite3_db_filename(dbHandle_, "main");
2122 if (fileName == nullptr) {
2123 return 0;
2124 }
2125 std::string walName = std::string(fileName) + "-wal";
2126 uint64_t fileSize = 0;
2127 int result = OS::CalFileSize(std::string(walName), fileSize);
2128 if (result != E_OK) {
2129 return 0;
2130 }
2131 return fileSize;
2132 }
2133
GetExistsDevicesFromMeta(std::set<std::string> & devices)2134 int SQLiteSingleVerStorageExecutor::GetExistsDevicesFromMeta(std::set<std::string> &devices)
2135 {
2136 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_,
2137 attachMetaMode_ ? SqliteMetaExecutor::MetaMode::KV_ATTACH : SqliteMetaExecutor::MetaMode::KV,
2138 isMemDb_, devices);
2139 }
2140
UpdateKey(const UpdateKeyCallback & callback)2141 int SQLiteSingleVerStorageExecutor::UpdateKey(const UpdateKeyCallback &callback)
2142 {
2143 if (dbHandle_ == nullptr) {
2144 return -E_INVALID_DB;
2145 }
2146 UpdateContext context;
2147 context.callback = callback;
2148 int errCode = CreateFuncUpdateKey(context, &Translate, &CalHashKey);
2149 if (errCode != E_OK) {
2150 return errCode;
2151 }
2152 int executeErrCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, UPDATE_SYNC_DATA_KEY_SQL);
2153 context.callback = nullptr;
2154 errCode = CreateFuncUpdateKey(context, nullptr, nullptr);
2155 if (context.errCode != E_OK) {
2156 return context.errCode;
2157 }
2158 if (executeErrCode != E_OK) {
2159 return executeErrCode;
2160 }
2161 if (errCode != E_OK) {
2162 return errCode;
2163 }
2164 return E_OK;
2165 }
2166
CreateFuncUpdateKey(UpdateContext & context,void (* translateFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv),void (* calHashFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const2167 int SQLiteSingleVerStorageExecutor::CreateFuncUpdateKey(UpdateContext &context,
2168 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
2169 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
2170 {
2171 int errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_TRANSLATE_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2172 &context, translateFunc, nullptr, nullptr, nullptr);
2173 if (errCode != SQLITE_OK) {
2174 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2175 return SQLiteUtils::MapSQLiteErrno(errCode);
2176 }
2177 errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_CAL_HASH_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2178 &context, calHashFunc, nullptr, nullptr, nullptr);
2179 if (errCode != SQLITE_OK) {
2180 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2181 return SQLiteUtils::MapSQLiteErrno(errCode);
2182 }
2183 return E_OK;
2184 }
2185
Translate(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2186 void SQLiteSingleVerStorageExecutor::Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2187 {
2188 if (ctx == nullptr || argc != 1 || argv == nullptr) { // i parameters, which are key
2189 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2190 return;
2191 }
2192 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2193 auto keyBlob = static_cast<const uint8_t *>(sqlite3_value_blob(argv[0]));
2194 int keyBlobLen = sqlite3_value_bytes(argv[0]);
2195 Key oldKey;
2196 if (keyBlob != nullptr && keyBlobLen > 0) {
2197 oldKey = Key(keyBlob, keyBlob + keyBlobLen);
2198 }
2199 Key newKey;
2200 context->callback(oldKey, newKey);
2201 if (newKey.size() >= DBConstant::MAX_KEY_SIZE || newKey.empty()) {
2202 LOGE("[SqlSinExe][Translate] invalid key len=%zu", newKey.size());
2203 context->errCode = -E_INVALID_ARGS;
2204 sqlite3_result_error(ctx, "Update key is invalid", -1);
2205 return;
2206 }
2207 context->newKey = newKey;
2208 sqlite3_result_blob(ctx, newKey.data(), static_cast<int>(newKey.size()), SQLITE_TRANSIENT);
2209 }
2210
CalHashKey(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2211 void SQLiteSingleVerStorageExecutor::CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2212 {
2213 if (ctx == nullptr || argc != 1 || argv == nullptr) {
2214 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2215 return;
2216 }
2217 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2218 Key hashKey;
2219 DBCommon::CalcValueHash(context->newKey, hashKey);
2220 sqlite3_result_blob(ctx, hashKey.data(), static_cast<int>(hashKey.size()), SQLITE_TRANSIENT);
2221 }
2222
BindSyncDataTime(sqlite3_stmt * statement,const DataItem & dataItem,bool isUpdate)2223 int SQLiteSingleVerStorageExecutor::BindSyncDataTime(sqlite3_stmt *statement, const DataItem &dataItem, bool isUpdate)
2224 {
2225 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_STAMP_INDEX, dataItem.timestamp);
2226 if (errCode != E_OK) {
2227 LOGE("Bind saved sync data stamp failed:%d", errCode);
2228 return errCode;
2229 }
2230
2231 const int writeTimeIndex = isUpdate ? BIND_SYNC_UPDATE_W_TIME_INDEX : BIND_SYNC_W_TIME_INDEX;
2232 errCode = SQLiteUtils::BindInt64ToStatement(statement, writeTimeIndex, dataItem.writeTimestamp);
2233 if (errCode != E_OK) {
2234 LOGE("Bind saved sync data write stamp failed:%d", errCode);
2235 return errCode;
2236 }
2237
2238 const int modifyTimeIndex = isUpdate ? BIND_SYNC_UPDATE_MODIFY_TIME_INDEX : BIND_SYNC_MODIFY_TIME_INDEX;
2239 errCode = SQLiteUtils::BindInt64ToStatement(statement, modifyTimeIndex, dataItem.modifyTime);
2240 if (errCode != E_OK) {
2241 LOGE("Bind saved sync data modify time failed:%d", errCode);
2242 return errCode;
2243 }
2244
2245 const int createTimeIndex = isUpdate ? BIND_SYNC_UPDATE_CREATE_TIME_INDEX : BIND_SYNC_CREATE_TIME_INDEX;
2246 errCode = SQLiteUtils::BindInt64ToStatement(statement, createTimeIndex, dataItem.createTime);
2247 if (errCode != E_OK) {
2248 LOGE("Bind saved sync data create time failed:%d", errCode);
2249 return errCode;
2250 }
2251
2252 LOGI("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 " modifyTime:%" PRIu64 " createTime:%"
2253 PRIu64, dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, dataItem.modifyTime, dataItem.createTime);
2254 return errCode;
2255 }
2256
CreateCloudLogTable()2257 int SQLiteSingleVerStorageExecutor::CreateCloudLogTable()
2258 {
2259 if (dbHandle_ == nullptr) {
2260 return -E_INVALID_DB;
2261 }
2262 return SqliteLogTableManager::CreateKvSyncLogTable(dbHandle_);
2263 }
2264 } // namespace DistributedDB
2265