1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "cloud/cloud_store_types.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "log_print.h"
25 #include "log_table_manager_factory.h"
26 #include "parcel.h"
27 #include "platform_specific.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_single_ver_storage_executor_sql.h"
31
32 namespace DistributedDB {
33 namespace {
34
ResetOrRegetStmt(sqlite3 * db,sqlite3_stmt * & stmt,const std::string & sql)35 int ResetOrRegetStmt(sqlite3 *db, sqlite3_stmt *&stmt, const std::string &sql)
36 {
37 int errCode = E_OK;
38 SQLiteUtils::ResetStatement(stmt, false, errCode);
39 if (errCode != E_OK) {
40 LOGE("[ResetOrRegetStmt] reset stmt failed:%d.", errCode);
41 // Finish current statement and remade one
42 SQLiteUtils::ResetStatement(stmt, true, errCode);
43 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
44 if (errCode != E_OK) {
45 LOGE("[ResetOrRegetStmt] reget failed:%d.", errCode);
46 }
47 }
48 return errCode;
49 }
50
GetEntryFromStatement(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries)51 int GetEntryFromStatement(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries)
52 {
53 Entry entry;
54 int errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key);
55 if (errCode != E_OK) {
56 return errCode;
57 }
58 if (isGetValue) {
59 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value);
60 if (errCode != E_OK) {
61 return errCode;
62 }
63 }
64
65 entries.push_back(std::move(entry));
66 return errCode;
67 }
68 }
69
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb)70 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb)
71 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
72 getSyncStatement_(nullptr),
73 getResultRowIdStatement_(nullptr),
74 getResultEntryStatement_(nullptr),
75 isTransactionOpen_(false),
76 attachMetaMode_(false),
77 executorState_(ExecutorState::INVALID),
78 maxTimestampInMainDB_(0),
79 migrateTimeOffset_(0),
80 isSyncMigrating_(false),
81 conflictResolvePolicy_(DEFAULT_LAST_WIN)
82 {}
83
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb,ExecutorState executorState)84 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb,
85 ExecutorState executorState)
86 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
87 getSyncStatement_(nullptr),
88 getResultRowIdStatement_(nullptr),
89 getResultEntryStatement_(nullptr),
90 isTransactionOpen_(false),
91 attachMetaMode_(false),
92 executorState_(executorState),
93 maxTimestampInMainDB_(0),
94 migrateTimeOffset_(0),
95 isSyncMigrating_(false),
96 conflictResolvePolicy_(DEFAULT_LAST_WIN)
97 {}
98
~SQLiteSingleVerStorageExecutor()99 SQLiteSingleVerStorageExecutor::~SQLiteSingleVerStorageExecutor()
100 {
101 if (isTransactionOpen_) {
102 Rollback();
103 }
104 FinalizeAllStatements();
105 }
106
GetKvData(SingleVerDataType type,const Key & key,Value & value,Timestamp & timestamp) const107 int SQLiteSingleVerStorageExecutor::GetKvData(SingleVerDataType type, const Key &key, Value &value,
108 Timestamp ×tamp) const
109 {
110 std::string sql;
111 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
112 sql = SELECT_LOCAL_VALUE_TIMESTAMP_SQL;
113 } else if (type == SingleVerDataType::SYNC_TYPE) {
114 sql = SELECT_SYNC_VALUE_WTIMESTAMP_SQL;
115 } else if (type == SingleVerDataType::META_TYPE) {
116 if (attachMetaMode_) {
117 sql = SELECT_ATTACH_META_VALUE_SQL;
118 } else {
119 sql = SELECT_META_VALUE_SQL;
120 }
121 } else {
122 return -E_INVALID_ARGS;
123 }
124
125 sqlite3_stmt *statement = nullptr;
126 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
127 if (errCode != E_OK) {
128 goto END;
129 }
130
131 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
132 if (errCode != E_OK) {
133 goto END;
134 }
135
136 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
137 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
138 errCode = -E_NOT_FOUND;
139 goto END;
140 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
141 goto END;
142 }
143
144 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
145
146 // get timestamp
147 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
148 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_LOCAL_TIME_INDEX));
149 } else if (type == SingleVerDataType::SYNC_TYPE) {
150 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_SYNC_TIME_INDEX));
151 }
152
153 END:
154 SQLiteUtils::ResetStatement(statement, true, errCode);
155 return CheckCorruptedStatus(errCode);
156 }
157
BindPutKvData(sqlite3_stmt * statement,const Key & key,const Value & value,Timestamp timestamp,SingleVerDataType type)158 int SQLiteSingleVerStorageExecutor::BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value,
159 Timestamp timestamp, SingleVerDataType type)
160 {
161 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, key, false);
162 if (errCode != E_OK) {
163 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
164 return errCode;
165 }
166
167 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_VAL_INDEX, value, true);
168 if (errCode != E_OK) {
169 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
170 return errCode;
171 }
172
173 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
174 Key hashKey;
175 errCode = DBCommon::CalcValueHash(key, hashKey);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179
180 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_LOCAL_HASH_KEY_INDEX, hashKey, false);
181 if (errCode != E_OK) {
182 LOGE("[SingleVerExe][BindPutKv]Bind hash key error:%d", errCode);
183 return errCode;
184 }
185
186 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_LOCAL_TIMESTAMP_INDEX, timestamp);
187 if (errCode != E_OK) {
188 LOGE("[SingleVerExe][BindPutKv]Bind timestamp error:%d", errCode);
189 return errCode;
190 }
191 }
192 return E_OK;
193 }
194
GetKvDataByHashKey(const Key & hashKey,SingleVerRecord & result) const195 int SQLiteSingleVerStorageExecutor::GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const
196 {
197 sqlite3_stmt *statement = nullptr;
198 std::vector<uint8_t> devVect;
199 std::vector<uint8_t> origDevVect;
200 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_HASH_SQL, statement);
201 if (errCode != E_OK) {
202 goto END;
203 }
204
205 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // bind the first arg hashkey.
206 if (errCode != E_OK) {
207 goto END;
208 }
209
210 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
211 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
212 result.hashKey = hashKey;
213 result.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
214 result.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
215 result.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
216 // get key
217 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, result.key);
218 if (errCode != E_OK) {
219 goto END;
220 }
221 // get value
222 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, result.value);
223 if (errCode != E_OK) {
224 goto END;
225 }
226 // get device
227 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
228 if (errCode != E_OK) {
229 goto END;
230 }
231 result.device = std::string(devVect.begin(), devVect.end());
232 // get original device
233 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
234 if (errCode != E_OK) {
235 goto END;
236 }
237 result.origDevice = std::string(origDevVect.begin(), origDevVect.end());
238 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
239 errCode = -E_NOT_FOUND;
240 goto END;
241 }
242
243 END:
244 SQLiteUtils::ResetStatement(statement, true, errCode);
245 return CheckCorruptedStatus(errCode);
246 }
247
SaveKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp)248 int SQLiteSingleVerStorageExecutor::SaveKvData(SingleVerDataType type, const Key &key, const Value &value,
249 Timestamp timestamp)
250 {
251 sqlite3_stmt *statement = nullptr;
252 std::string sql;
253 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
254 sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ? INSERT_LOCAL_SQL_FROM_CACHEHANDLE :
255 INSERT_LOCAL_SQL);
256 } else {
257 sql = (attachMetaMode_ ? INSERT_ATTACH_META_SQL : INSERT_META_SQL);
258 }
259 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
260 if (errCode != E_OK) {
261 goto ERROR;
262 }
263
264 errCode = BindPutKvData(statement, key, value, timestamp, type);
265 if (errCode != E_OK) {
266 goto ERROR;
267 }
268
269 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
270 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
271 errCode = E_OK;
272 }
273
274 ERROR:
275 SQLiteUtils::ResetStatement(statement, true, errCode);
276 return CheckCorruptedStatus(errCode);
277 }
278
PutKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp,SingleVerNaturalStoreCommitNotifyData * committedData)279 int SQLiteSingleVerStorageExecutor::PutKvData(SingleVerDataType type, const Key &key, const Value &value,
280 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData)
281 {
282 if (type != SingleVerDataType::LOCAL_TYPE_SQLITE && type != SingleVerDataType::META_TYPE) {
283 return -E_INVALID_ARGS;
284 }
285 // committedData is only for local data, not for meta data.
286 bool isLocal = (SingleVerDataType::LOCAL_TYPE_SQLITE == type);
287 Timestamp localTimestamp = 0;
288 Value readValue;
289 bool isExisted = CheckIfKeyExisted(key, isLocal, readValue, localTimestamp);
290 if (isLocal && committedData != nullptr) {
291 ExistStatus existedStatus = isExisted ? ExistStatus::EXIST : ExistStatus::NONE;
292 Key hashKey;
293 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
294 if (innerErrCode != E_OK) {
295 return innerErrCode;
296 }
297 committedData->InitKeyPropRecord(hashKey, existedStatus);
298 }
299 int errCode = SaveKvData(type, key, value, timestamp);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303
304 if (isLocal && committedData != nullptr) {
305 Entry entry = {key, value};
306 committedData->InsertCommittedData(std::move(entry), isExisted ? DataType::UPDATE : DataType::INSERT, true);
307 }
308 return E_OK;
309 }
310
GetEntries(bool isGetValue,SingleVerDataType type,const Key & keyPrefix,std::vector<Entry> & entries) const311 int SQLiteSingleVerStorageExecutor::GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix,
312 std::vector<Entry> &entries) const
313 {
314 if ((type != SingleVerDataType::LOCAL_TYPE_SQLITE) && (type != SingleVerDataType::SYNC_TYPE)) {
315 return -E_INVALID_ARGS;
316 }
317
318 std::string sql;
319 if (type == SingleVerDataType::SYNC_TYPE) {
320 sql = isGetValue ? SELECT_SYNC_PREFIX_SQL : SELECT_SYNC_KEY_PREFIX_SQL;
321 } else {
322 sql = SELECT_LOCAL_PREFIX_SQL;
323 }
324 sqlite3_stmt *statement = nullptr;
325 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
326 if (errCode != E_OK) {
327 goto END;
328 }
329
330 // bind the prefix key for the first and second args.
331 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // first argument is key
332 if (errCode != E_OK) {
333 goto END;
334 }
335
336 errCode = StepForResultEntries(isGetValue, statement, entries);
337
338 END:
339 SQLiteUtils::ResetStatement(statement, true, errCode);
340 return CheckCorruptedStatus(errCode);
341 }
342
GetEntries(QueryObject & queryObj,std::vector<Entry> & entries) const343 int SQLiteSingleVerStorageExecutor::GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const
344 {
345 int errCode = E_OK;
346 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
347 if (errCode != E_OK) {
348 return errCode;
349 }
350
351 sqlite3_stmt *statement = nullptr;
352 errCode = helper.GetQuerySqlStatement(dbHandle_, false, statement);
353 if (errCode == E_OK) {
354 errCode = StepForResultEntries(true, statement, entries);
355 }
356
357 SQLiteUtils::ResetStatement(statement, true, errCode);
358 return CheckCorruptedStatus(errCode);
359 }
360
GetCount(QueryObject & queryObj,int & count) const361 int SQLiteSingleVerStorageExecutor::GetCount(QueryObject &queryObj, int &count) const
362 {
363 if (dbHandle_ == nullptr) {
364 return -E_INVALID_DB;
365 }
366
367 int errCode = E_OK;
368 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
369 if (errCode != E_OK) {
370 return errCode;
371 }
372
373 if (!queryObj.IsCountValid()) {
374 LOGE("GetCount no need limit or orderby");
375 return -E_INVALID_QUERY_FORMAT;
376 }
377
378 std::string countSql;
379 errCode = helper.GetCountQuerySql(countSql);
380 if (errCode != E_OK) {
381 return errCode;
382 }
383
384 sqlite3_stmt *countStatement = nullptr;
385 // get statement for count
386 errCode = helper.GetQuerySqlStatement(dbHandle_, countSql, countStatement);
387 if (errCode != E_OK) {
388 LOGE("Get count bind statement error:%d", errCode);
389 goto END;
390 }
391 // get count value
392 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
393 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
394 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
395 if (readCount > INT32_MAX) {
396 LOGW("total count is beyond the max count");
397 count = 0;
398 errCode = -E_UNEXPECTED_DATA;
399 } else {
400 count = static_cast<int>(readCount);
401 errCode = E_OK;
402 }
403 LOGD("Entry count in this result set is %d", count);
404 } else {
405 errCode = -E_UNEXPECTED_DATA;
406 }
407
408 END:
409 SQLiteUtils::ResetStatement(countStatement, true, errCode);
410 return CheckCorruptedStatus(errCode);
411 }
412
InitCurrentMaxStamp(Timestamp & maxStamp)413 void SQLiteSingleVerStorageExecutor::InitCurrentMaxStamp(Timestamp &maxStamp)
414 {
415 if (dbHandle_ == nullptr) {
416 return;
417 }
418 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
419 SELECT_MAX_TIMESTAMP_SQL_FROM_CACHEHANDLE : SELECT_MAX_TIMESTAMP_SQL);
420 sqlite3_stmt *statement = nullptr;
421 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
422 if (errCode != E_OK) {
423 return;
424 }
425
426 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
427 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
428 maxStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
429 }
430 SQLiteUtils::ResetStatement(statement, true, errCode);
431 }
432
PrepareForSyncDataByTime(Timestamp begin,Timestamp end,sqlite3_stmt * & statement,bool getDeletedData) const433 int SQLiteSingleVerStorageExecutor::PrepareForSyncDataByTime(Timestamp begin, Timestamp end,
434 sqlite3_stmt *&statement, bool getDeletedData) const
435 {
436 if (dbHandle_ == nullptr) {
437 return -E_INVALID_DB;
438 }
439
440 const std::string sql = (getDeletedData ? SELECT_SYNC_DELETED_ENTRIES_SQL : SELECT_SYNC_ENTRIES_SQL);
441 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
442 if (errCode != E_OK) {
443 LOGE("Prepare the sync entries statement error:%d", errCode);
444 return errCode;
445 }
446
447 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_BEGIN_STAMP_INDEX, begin);
448 if (errCode != E_OK) {
449 goto ERROR;
450 }
451
452 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_END_STAMP_INDEX, end);
453
454 ERROR:
455 if (errCode != E_OK) {
456 LOGE("Bind the timestamp for getting sync data error:%d", errCode);
457 SQLiteUtils::ResetStatement(statement, true, errCode);
458 }
459
460 return CheckCorruptedStatus(errCode);
461 }
462
ReleaseContinueStatement()463 void SQLiteSingleVerStorageExecutor::ReleaseContinueStatement()
464 {
465 if (getSyncStatement_ != nullptr) {
466 int errCode = E_OK;
467 SQLiteUtils::ResetStatement(getSyncStatement_, true, errCode);
468 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
469 SetCorruptedStatus();
470 }
471 }
472 }
473
474 namespace {
GetDataItemForSync(sqlite3_stmt * statement,DataItem & dataItem)475 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem)
476 {
477 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
478 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
479 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
480 dataItem.flag &= (~DataItem::LOCAL_FLAG);
481 std::vector<uint8_t> devVect;
482 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
483 if (errCode != E_OK) {
484 return errCode;
485 }
486 dataItem.origDev = std::string(devVect.begin(), devVect.end());
487 int keyIndex = SYNC_RES_KEY_INDEX;
488 // If the data has been deleted, just use the hash key for sync.
489 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
490 keyIndex = SYNC_RES_HASH_KEY_INDEX;
491 }
492 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, dataItem.key);
493 if (errCode != E_OK) {
494 return errCode;
495 }
496 return SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
497 }
498 }
499
GetSyncDataItems(std::vector<DataItem> & dataItems,sqlite3_stmt * statement,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo) const500 int SQLiteSingleVerStorageExecutor::GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
501 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const
502 {
503 int errCode;
504 size_t dataTotalSize = 0;
505 do {
506 DataItem dataItem;
507 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
508 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
509 errCode = GetDataItemForSync(statement, dataItem);
510 if (errCode != E_OK) {
511 LOGE("GetDataItemForSync failed:%d", errCode);
512 return errCode;
513 }
514 } else {
515 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
516 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
517 errCode = -E_FINISHED;
518 } else {
519 LOGE("Get sync data error:%d", errCode);
520 }
521 break;
522 }
523
524 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
525 dataTotalSize += GetDataItemSerialSize(dataItem, appendLength);
526 if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) ||
527 dataItems.size() >= dataSizeInfo.packetSize) {
528 errCode = -E_UNFINISHED;
529 break;
530 } else {
531 dataItems.push_back(std::move(dataItem));
532 }
533 } while (true);
534 return errCode;
535 }
536
GetSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const537 int SQLiteSingleVerStorageExecutor::GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
538 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
539 {
540 sqlite3_stmt *statement = nullptr;
541 int errCode = PrepareForSyncDataByTime(begin, end, statement);
542 if (errCode != E_OK) {
543 return errCode;
544 }
545
546 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
547 SQLiteUtils::ResetStatement(statement, true, errCode);
548 return CheckCorruptedStatus(errCode);
549 }
550
GetDeletedSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const551 int SQLiteSingleVerStorageExecutor::GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
552 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
553 {
554 sqlite3_stmt *statement = nullptr;
555 int errCode = PrepareForSyncDataByTime(begin, end, statement, true);
556 if (errCode != E_OK) {
557 return errCode;
558 }
559
560 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
561 SQLiteUtils::ResetStatement(statement, true, errCode);
562 return CheckCorruptedStatus(errCode);
563 }
564
565 namespace {
AppendDataItem(std::vector<DataItem> & dataItems,const DataItem & item,size_t & dataTotalSize,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo)566 int AppendDataItem(std::vector<DataItem> &dataItems, const DataItem &item, size_t &dataTotalSize, size_t appendLength,
567 const DataSizeSpecInfo &dataSizeInfo)
568 {
569 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
570 size_t appendSize = dataTotalSize + SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength);
571 if ((appendSize > dataSizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= dataSizeInfo.packetSize) {
572 return -E_UNFINISHED;
573 }
574 dataItems.push_back(item);
575 dataTotalSize = appendSize;
576 return E_OK;
577 }
578
GetFullDataStatement(sqlite3 * db,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)579 int GetFullDataStatement(sqlite3 *db, const std::pair<Timestamp, Timestamp> &timeRange, sqlite3_stmt *&stmt)
580 {
581 int errCode = SQLiteUtils::GetStatement(db, SELECT_SYNC_MODIFY_SQL, stmt);
582 if (errCode != E_OK) {
583 LOGE("Get statement failed. %d", errCode);
584 return errCode;
585 }
586 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange.first); // 1 : Bind time rang index start
587 if (errCode != E_OK) {
588 LOGE("Bind time range to statement failed. %d", errCode);
589 goto ERR;
590 }
591 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange.second); // 2 : Bind time rang index end
592 if (errCode != E_OK) {
593 LOGE("Bind time range to statement failed. %d", errCode);
594 goto ERR;
595 }
596 return E_OK; // do not release statement when success
597 ERR:
598 SQLiteUtils::ResetStatement(stmt, true, errCode);
599 return errCode;
600 }
601
GetQueryDataStatement(sqlite3 * db,QueryObject query,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)602 int GetQueryDataStatement(sqlite3 *db, QueryObject query, const std::pair<Timestamp, Timestamp> &timeRange,
603 sqlite3_stmt *&stmt)
604 {
605 int errCode = E_OK;
606 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
607 if (errCode != E_OK) {
608 return errCode;
609 }
610 return helper.GetQuerySyncStatement(db, timeRange.first, timeRange.second, stmt);
611 }
612
GetNextDataItem(sqlite3_stmt * stmt,bool isMemDB,DataItem & item)613 int GetNextDataItem(sqlite3_stmt *stmt, bool isMemDB, DataItem &item)
614 {
615 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
616 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
617 errCode = GetDataItemForSync(stmt, item);
618 }
619 return errCode;
620 }
621 }
622
GetSyncDataWithQuery(const QueryObject & query,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,const std::pair<Timestamp,Timestamp> & timeRange,std::vector<DataItem> & dataItems) const623 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(const QueryObject &query, size_t appendLength,
624 const DataSizeSpecInfo &dataSizeInfo, const std::pair<Timestamp, Timestamp> &timeRange,
625 std::vector<DataItem> &dataItems) const
626 {
627 sqlite3_stmt *fullStmt = nullptr; // statement for get all modified data in the time range
628 sqlite3_stmt *queryStmt = nullptr; // statement for get modified data which is matched query in the time range
629 int errCode = GetQueryDataStatement(dbHandle_, query, timeRange, queryStmt);
630 if (errCode != E_OK) {
631 LOGE("Get query matched data statement failed. %d", errCode);
632 goto END;
633 }
634 if (query.IsQueryOnlyByKey()) {
635 // Query sync by prefixKey only should not deal with REMOTE_DEVICE_DATA_MISS_QUERY. Get the data directly.
636 errCode = GetSyncDataItems(dataItems, queryStmt, appendLength, dataSizeInfo);
637 goto END;
638 }
639 errCode = GetFullDataStatement(dbHandle_, timeRange, fullStmt);
640 if (errCode != E_OK) {
641 LOGE("Get full changed data statement failed. %d", errCode);
642 goto END;
643 }
644 errCode = GetSyncDataWithQuery(fullStmt, queryStmt, appendLength, dataSizeInfo, dataItems);
645 if (errCode != E_OK && errCode != -E_UNFINISHED && errCode != -E_FINISHED) {
646 LOGE("Get sync data with query failed. %d", errCode);
647 }
648 END:
649 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
650 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
651 return CheckCorruptedStatus(errCode);
652 }
653
GetSyncDataWithQuery(sqlite3_stmt * fullStmt,sqlite3_stmt * queryStmt,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,std::vector<DataItem> & dataItems) const654 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
655 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const
656 {
657 int errCode = E_OK;
658 size_t dataTotalSize = 0;
659 DataItem fullItem;
660 DataItem matchItem;
661 bool isFullItemFinished = false;
662 bool isMatchItemFinished = false;
663 while (!isFullItemFinished || !isMatchItemFinished) {
664 errCode = GetNextDataItem(queryStmt, isMemDb_, matchItem);
665 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // query finished
666 isMatchItemFinished = true;
667 } else if (errCode != E_OK) { // step failed or get data failed
668 LOGE("Get next query matched data failed. %d", errCode);
669 return errCode;
670 }
671 while (!isFullItemFinished) {
672 errCode = GetNextDataItem(fullStmt, isMemDb_, fullItem);
673 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // queryStmt is a subset of fullStmt
674 isFullItemFinished = true;
675 break;
676 } else if (errCode != E_OK) { // step failed or get data failed
677 LOGE("Get next changed data failed. %d", errCode);
678 return errCode;
679 }
680 bool matchData = true;
681 if (isMatchItemFinished || matchItem.key != fullItem.key) {
682 matchData = false; // got miss query data
683 DBCommon::CalcValueHash(fullItem.key, fullItem.key); // set and send key with hash_key
684 Value().swap(fullItem.value); // not send value when data miss query
685 fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; // mark with miss query flag
686 }
687 errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo);
688 if (errCode == -E_UNFINISHED) {
689 goto END;
690 }
691 if (matchData) {
692 break; // step to next match data
693 }
694 }
695 }
696 END:
697 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
698 return (isFullItemFinished && isMatchItemFinished) ? -E_FINISHED : errCode;
699 }
700
OpenResultSet(const Key & keyPrefix,int & count)701 int SQLiteSingleVerStorageExecutor::OpenResultSet(const Key &keyPrefix, int &count)
702 {
703 sqlite3_stmt *countStatement = nullptr;
704 if (InitResultSet(keyPrefix, countStatement) != E_OK) {
705 LOGE("Initialize result set stat failed.");
706 return -E_INVALID_DB;
707 }
708
709 int errCode = StartTransaction(TransactType::DEFERRED);
710 if (errCode != E_OK) {
711 goto END;
712 }
713
714 // get count value
715 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
716 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
717 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
718 if (readCount > INT32_MAX) {
719 LOGW("total count is beyond the max count");
720 count = 0;
721 errCode = -E_UNEXPECTED_DATA;
722 } else {
723 count = static_cast<int>(readCount);
724 errCode = E_OK;
725 }
726 LOGD("Entry count in this result set is %d", count);
727 } else {
728 errCode = -E_UNEXPECTED_DATA;
729 }
730
731 END:
732 SQLiteUtils::ResetStatement(countStatement, true, errCode);
733 if (errCode != E_OK) {
734 CloseResultSet();
735 }
736 return CheckCorruptedStatus(errCode);
737 }
738
OpenResultSet(QueryObject & queryObj,int & count)739 int SQLiteSingleVerStorageExecutor::OpenResultSet(QueryObject &queryObj, int &count)
740 {
741 sqlite3_stmt *countStatement = nullptr;
742 int errCode = InitResultSet(queryObj, countStatement);
743 if (errCode != E_OK) {
744 LOGE("Initialize result set stat failed.");
745 return errCode;
746 }
747
748 errCode = StartTransaction(TransactType::DEFERRED);
749 if (errCode != E_OK) {
750 goto END;
751 }
752
753 // get count value
754 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
755 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
756 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
757 if (queryObj.HasLimit()) {
758 int limit = 0;
759 int offset = 0;
760 queryObj.GetLimitVal(limit, offset);
761 offset = (offset < 0) ? 0 : offset;
762 limit = (limit < 0) ? 0 : limit;
763 if (readCount <= static_cast<uint64_t>(offset)) {
764 readCount = 0;
765 } else {
766 readCount = std::min(readCount - offset, static_cast<uint64_t>(limit));
767 }
768 }
769
770 if (readCount > INT32_MAX) {
771 LOGW("total count is beyond the max count");
772 count = 0;
773 errCode = -E_UNEXPECTED_DATA;
774 } else {
775 count = static_cast<int>(readCount);
776 errCode = E_OK;
777 }
778 LOGD("Entry count in this result set is %d", count);
779 } else {
780 errCode = -E_UNEXPECTED_DATA;
781 }
782
783 END:
784 SQLiteUtils::ResetStatement(countStatement, true, errCode);
785 if (errCode != E_OK) {
786 CloseResultSet();
787 }
788 return CheckCorruptedStatus(errCode);
789 }
790
OpenResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)791 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(const Key &keyPrefix,
792 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
793 {
794 if (dbHandle_ == nullptr) {
795 return -E_INVALID_DB;
796 }
797 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
798 if (errCode != E_OK) {
799 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Get rowId stmt fail, errCode=%d", errCode);
800 return CheckCorruptedStatus(errCode);
801 }
802 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument index is 1
803 if (errCode != E_OK) {
804 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Bind rowid stmt fail, errCode=%d", errCode);
805 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
806 return CheckCorruptedStatus(errCode);
807 }
808 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
809 if (errCode != E_OK) {
810 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
811 }
812 return errCode;
813 }
814
OpenResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)815 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(QueryObject &queryObj,
816 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
817 {
818 if (dbHandle_ == nullptr) {
819 return -E_INVALID_DB;
820 }
821
822 int errCode = E_OK;
823 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
824 if (errCode != E_OK) {
825 return errCode;
826 }
827
828 if (!queryObj.IsValid()) {
829 LOGE("[SqlSinExe][OpenResSetRowId][Query] query object not Valid");
830 return -E_INVALID_QUERY_FORMAT;
831 }
832
833 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
834 if (errCode != E_OK) {
835 LOGE("[SqlSinExe][OpenResSetRowId][Query] Get Stmt fail, errCode=%d", errCode);
836 // The GetQuerySqlStatement does not self rollback(BAD...), so we have to reset the stmt here.
837 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
838 return errCode;
839 }
840 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
841 if (errCode != E_OK) {
842 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
843 }
844 return errCode;
845 }
846
ReloadResultSet(const Key & keyPrefix)847 int SQLiteSingleVerStorageExecutor::ReloadResultSet(const Key &keyPrefix)
848 {
849 int errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, SELECT_SYNC_ROWID_PREFIX_SQL);
850 if (errCode != E_OK) {
851 return CheckCorruptedStatus(errCode);
852 }
853
854 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
855 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
856 if (errCode != E_OK) {
857 LOGE("Rebind result set rowid statement of keyPrefix error:%d", errCode);
858 return CheckCorruptedStatus(errCode);
859 }
860 return E_OK;
861 }
862
ReloadResultSet(QueryObject & queryObj)863 int SQLiteSingleVerStorageExecutor::ReloadResultSet(QueryObject &queryObj)
864 {
865 int errCode = E_OK;
866 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
867 if (errCode != E_OK) {
868 return errCode;
869 }
870
871 if (!queryObj.IsValid()) {
872 return -E_INVALID_QUERY_FORMAT;
873 }
874
875 std::string sql;
876 errCode = helper.GetQuerySql(sql, true); // only rowid sql
877 if (errCode != E_OK) {
878 return errCode;
879 }
880
881 errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, sql);
882 if (errCode != E_OK) {
883 return CheckCorruptedStatus(errCode);
884 }
885
886 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
887 // GetQuerySqlStatement will not alter getResultRowIdStatement_ if it is not null
888 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
889 if (errCode != E_OK) {
890 LOGE("Rebind result set rowid statement of query error:%d", errCode);
891 return CheckCorruptedStatus(errCode);
892 }
893 return E_OK;
894 }
895
ReloadResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)896 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(const Key &keyPrefix,
897 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
898 {
899 int errCode = ReloadResultSet(keyPrefix); // Reuse this function(A convenience)
900 if (errCode != E_OK) {
901 return errCode;
902 }
903 int count = 0; // Ignored
904 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
905 if (errCode != E_OK) {
906 LOGE("[SqlSinExe][ReloadResSet][KeyPrefix] Load fail, errCode=%d", errCode);
907 }
908 // We can just return, no need to reset the statement
909 return errCode;
910 }
911
ReloadResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)912 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(QueryObject &queryObj,
913 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
914 {
915 int errCode = ReloadResultSet(queryObj); // Reuse this function(A convenience)
916 if (errCode != E_OK) {
917 return errCode;
918 }
919 int count = 0; // Ignored
920 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
921 if (errCode != E_OK) {
922 LOGE("[SqlSinExe][ReloadResSet][Query] Load fail, errCode=%d", errCode);
923 }
924 // We can just return, no need to reset the statement
925 return errCode;
926 }
927
GetNextEntryFromResultSet(Key & key,Value & value,bool isCopy)928 int SQLiteSingleVerStorageExecutor::GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy)
929 {
930 if (getResultRowIdStatement_ == nullptr || getResultEntryStatement_ == nullptr) {
931 return -E_RESULT_SET_STATUS_INVALID;
932 }
933
934 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
935 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
936 if (!isCopy) {
937 return E_OK;
938 }
939 int64_t rowId = sqlite3_column_int64(getResultRowIdStatement_, 0);
940 errCode = E_OK;
941 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
942 if (errCode != E_OK) {
943 LOGE("[SqlSinExe][GetNext] Reset result set entry statement fail, errCode=%d.", errCode);
944 return CheckCorruptedStatus(errCode);
945 }
946
947 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
948 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
949 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
950 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, key);
951 if (errCode != E_OK) {
952 LOGE("[SqlSinExe][GetNext] Get key failed:%d", errCode);
953 return CheckCorruptedStatus(errCode);
954 }
955 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, value);
956 if (errCode != E_OK) {
957 LOGE("[SqlSinExe][GetNext] Get value failed:%d", errCode);
958 return CheckCorruptedStatus(errCode);
959 }
960 return E_OK;
961 } else {
962 return -E_UNEXPECTED_DATA;
963 }
964 }
965 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
966 return -E_FINISHED;
967 }
968
969 LOGE("SQLite step failed:%d", errCode);
970 return CheckCorruptedStatus(errCode);
971 }
972
GetEntryByRowId(int64_t rowId,Entry & entry)973 int SQLiteSingleVerStorageExecutor::GetEntryByRowId(int64_t rowId, Entry &entry)
974 {
975 if (getResultEntryStatement_ == nullptr) {
976 return -E_RESULT_SET_STATUS_INVALID;
977 }
978 int errCode = E_OK;
979 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
980 if (errCode != E_OK) {
981 LOGE("[SqlSinExe][GetEntryByRowid] Reset result set entry statement fail, errCode=%d.", errCode);
982 return CheckCorruptedStatus(errCode);
983 }
984 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
985 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
986 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
987 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, entry.key);
988 if (errCode != E_OK) {
989 LOGE("[SqlSinExe][GetEntryByRowid] Get key failed, errCode=%d.", errCode);
990 return CheckCorruptedStatus(errCode);
991 }
992 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, entry.value);
993 if (errCode != E_OK) {
994 LOGE("[SqlSinExe][GetEntryByRowid] Get value failed, errCode=%d.", errCode);
995 return CheckCorruptedStatus(errCode);
996 }
997 return E_OK;
998 } else {
999 LOGE("[SqlSinExe][GetEntryByRowid] Step failed, errCode=%d.", errCode);
1000 return -E_UNEXPECTED_DATA;
1001 }
1002 }
1003
CloseResultSet()1004 void SQLiteSingleVerStorageExecutor::CloseResultSet()
1005 {
1006 int errCode = E_OK;
1007 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1008 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1009 SetCorruptedStatus();
1010 }
1011 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1012 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1013 SetCorruptedStatus();
1014 }
1015 if (isTransactionOpen_) {
1016 SQLiteUtils::RollbackTransaction(dbHandle_);
1017 isTransactionOpen_ = false;
1018 }
1019 }
1020
StartTransaction(TransactType type)1021 int SQLiteSingleVerStorageExecutor::StartTransaction(TransactType type)
1022 {
1023 if (dbHandle_ == nullptr) {
1024 LOGE("Begin transaction failed, dbHandle is null.");
1025 return -E_INVALID_DB;
1026 }
1027 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
1028 if (errCode == E_OK) {
1029 isTransactionOpen_ = true;
1030 } else {
1031 LOGE("Begin transaction failed, errCode = %d", errCode);
1032 }
1033 return CheckCorruptedStatus(errCode);
1034 }
1035
Commit()1036 int SQLiteSingleVerStorageExecutor::Commit()
1037 {
1038 if (dbHandle_ == nullptr) {
1039 return -E_INVALID_DB;
1040 }
1041 int errCode = SQLiteUtils::CommitTransaction(dbHandle_);
1042 if (errCode != E_OK) {
1043 LOGE("sqlite single ver storage executor commit fail! errCode = [%d]", errCode);
1044 return CheckCorruptedStatus(errCode);
1045 }
1046 isTransactionOpen_ = false;
1047 return E_OK;
1048 }
1049
Rollback()1050 int SQLiteSingleVerStorageExecutor::Rollback()
1051 {
1052 if (dbHandle_ == nullptr) {
1053 return -E_INVALID_DB;
1054 }
1055 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
1056 if (errCode != E_OK) {
1057 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
1058 return CheckCorruptedStatus(errCode);
1059 }
1060 isTransactionOpen_ = false;
1061 return E_OK;
1062 }
1063
CheckIfKeyExisted(const Key & key,bool isLocal,Value & value,Timestamp & timestamp) const1064 bool SQLiteSingleVerStorageExecutor::CheckIfKeyExisted(const Key &key, bool isLocal,
1065 Value &value, Timestamp ×tamp) const
1066 {
1067 // not local value, no need to get the value.
1068 if (!isLocal) {
1069 return false;
1070 }
1071
1072 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1073 if (errCode != E_OK) {
1074 return false;
1075 }
1076 return true;
1077 }
1078
GetDeviceIdentifier(PragmaEntryDeviceIdentifier * identifier)1079 int SQLiteSingleVerStorageExecutor::GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier)
1080 {
1081 if (identifier == nullptr) {
1082 return -E_INVALID_ARGS;
1083 }
1084
1085 if (dbHandle_ == nullptr) {
1086 return -E_INVALID_DB;
1087 }
1088
1089 sqlite3_stmt *statement = nullptr;
1090 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ENTRY_DEVICE, statement);
1091 if (errCode != E_OK) {
1092 return errCode;
1093 }
1094
1095 int keyIndex = identifier->origDevice ? BIND_ORI_DEVICE_ID : BIND_PRE_DEVICE_ID;
1096 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, identifier->key, false);
1097 if (errCode != E_OK) {
1098 goto END;
1099 }
1100
1101 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1102 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1103 std::vector<uint8_t> deviceId;
1104 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, deviceId);
1105 identifier->deviceIdentifier.assign(deviceId.begin(), deviceId.end());
1106 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1107 errCode = -E_NOT_FOUND;
1108 }
1109
1110 END:
1111 SQLiteUtils::ResetStatement(statement, true, errCode);
1112 return CheckCorruptedStatus(errCode);
1113 }
1114
PutIntoCommittedData(const DataItem & itemPut,const DataItem & itemGet,const DataOperStatus & status,SingleVerNaturalStoreCommitNotifyData * committedData)1115 void SQLiteSingleVerStorageExecutor::PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet,
1116 const DataOperStatus &status, SingleVerNaturalStoreCommitNotifyData *committedData)
1117 {
1118 if (committedData == nullptr) {
1119 return;
1120 }
1121
1122 Entry entry;
1123 int errCode;
1124 if (!status.isDeleted) {
1125 entry.key = itemPut.key;
1126 entry.value = itemPut.value;
1127 DataType dataType = (status.preStatus == DataStatus::EXISTED) ? DataType::UPDATE : DataType::INSERT;
1128 errCode = committedData->InsertCommittedData(std::move(entry), dataType, true);
1129 } else {
1130 entry.key = itemGet.key;
1131 entry.value = itemGet.value;
1132 errCode = committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1133 }
1134
1135 if (errCode != E_OK) {
1136 LOGE("[SingleVerExe][PutCommitData]Insert failed:%d", errCode);
1137 }
1138 }
1139
PrepareForSavingData(const std::string & readSql,const std::string & insertSql,const std::string & updateSql,SaveRecordStatements & statements) const1140 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
1141 const std::string &updateSql, SaveRecordStatements &statements) const
1142 {
1143 int errCode = SQLiteUtils::GetStatement(dbHandle_, readSql, statements.queryStatement);
1144 if (errCode != E_OK) {
1145 LOGE("Get query statement failed. errCode = [%d]", errCode);
1146 goto ERR;
1147 }
1148
1149 errCode = SQLiteUtils::GetStatement(dbHandle_, insertSql, statements.insertStatement);
1150 if (errCode != E_OK) {
1151 LOGE("Get insert statement failed. errCode = [%d]", errCode);
1152 goto ERR;
1153 }
1154
1155 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, statements.updateStatement);
1156 if (errCode != E_OK) {
1157 LOGE("Get update statement failed. errCode = [%d]", errCode);
1158 goto ERR;
1159 }
1160 return E_OK;
1161 ERR:
1162 (void)statements.ResetStatement();
1163 return errCode;
1164 }
1165
PrepareForSavingData(SingleVerDataType type)1166 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(SingleVerDataType type)
1167 {
1168 int errCode = -E_NOT_SUPPORT;
1169 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1170 // currently, Local type has not been optimized, so pass updateSql parameter with INSERT_LOCAL_SQL
1171 errCode = PrepareForSavingData(SELECT_LOCAL_HASH_SQL, INSERT_LOCAL_SQL, INSERT_LOCAL_SQL, saveLocalStatements_);
1172 } else if (type == SingleVerDataType::SYNC_TYPE) {
1173 errCode = PrepareForSavingData(SELECT_SYNC_HASH_SQL, INSERT_SYNC_SQL, UPDATE_SYNC_SQL, saveSyncStatements_);
1174 }
1175 return CheckCorruptedStatus(errCode);
1176 }
1177
ResetForSavingData(SingleVerDataType type)1178 int SQLiteSingleVerStorageExecutor::ResetForSavingData(SingleVerDataType type)
1179 {
1180 int errCode = E_OK;
1181 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1182 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
1183 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
1184 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
1185 } else if (type == SingleVerDataType::SYNC_TYPE) {
1186 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1187 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1188 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1189 }
1190 return CheckCorruptedStatus(errCode);
1191 }
1192
GetOriginDevName(const DataItem & dataItem,const std::string & origDevGet)1193 std::string SQLiteSingleVerStorageExecutor::GetOriginDevName(const DataItem &dataItem,
1194 const std::string &origDevGet)
1195 {
1196 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
1197 return origDevGet;
1198 }
1199 return dataItem.origDev;
1200 }
1201
SaveSyncDataToDatabase(const DataItem & dataItem,const Key & hashKey,const std::string & origDev,const std::string & deviceName,bool isUpdate)1202 int SQLiteSingleVerStorageExecutor::SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey,
1203 const std::string &origDev, const std::string &deviceName, bool isUpdate)
1204 {
1205 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1206 LOGD("Find query data missing, erase local data.");
1207 return EraseSyncData(hashKey);
1208 }
1209 auto statement = saveSyncStatements_.GetDataSaveStatement(isUpdate);
1210 if (statement == nullptr) {
1211 return -E_INVALID_ARGS;
1212 }
1213
1214 std::string devName = DBCommon::TransferHashString(deviceName);
1215 int errCode = BindSavedSyncData(statement, dataItem, hashKey, {origDev, devName}, isUpdate);
1216 if (errCode != E_OK) {
1217 return errCode;
1218 }
1219
1220 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1221 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1222 errCode = E_OK;
1223 }
1224 if (errCode == E_OK) {
1225 errCode = RemoveCloudUploadFlag(hashKey);
1226 }
1227 return errCode;
1228 }
1229
JudgeSyncSaveType(DataItem & dataItem,const DataItem & itemGet,const DeviceInfo & deviceInfo,bool isHashKeyExisted,bool isPermitForceWrite)1230 DataOperStatus SQLiteSingleVerStorageExecutor::JudgeSyncSaveType(DataItem &dataItem,
1231 const DataItem &itemGet, const DeviceInfo &deviceInfo, bool isHashKeyExisted, bool isPermitForceWrite)
1232 {
1233 DataOperStatus status;
1234 status.isDeleted = ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1235 (dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY);
1236 if (isHashKeyExisted) {
1237 if ((itemGet.flag & DataItem::DELETE_FLAG) != 0) {
1238 status.preStatus = DataStatus::DELETED;
1239 } else {
1240 status.preStatus = DataStatus::EXISTED;
1241 }
1242 std::string deviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1243 if (itemGet.writeTimestamp >= dataItem.writeTimestamp) {
1244 // for multi user mode, no permit to forcewrite
1245 if (((!deviceName.empty()) && IsFromDataOwner(itemGet, deviceName) && isPermitForceWrite) ||
1246 deviceInfo.isLocal) {
1247 LOGI("Force overwrite the data:%" PRIu64 " vs %" PRIu64 " isLocal %d",
1248 itemGet.writeTimestamp, dataItem.writeTimestamp, static_cast<int>(deviceInfo.isLocal));
1249 status.isDefeated = false;
1250 dataItem.writeTimestamp = itemGet.writeTimestamp + 1;
1251 dataItem.timestamp = itemGet.timestamp;
1252 } else {
1253 status.isDefeated = true;
1254 }
1255 }
1256 }
1257 return status;
1258 }
1259
GetSyncDataItemExt(const DataItem & dataItem,DataItem & itemGet,const DataOperStatus & dataStatus) const1260 int SQLiteSingleVerStorageExecutor::GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet,
1261 const DataOperStatus &dataStatus) const
1262 {
1263 if (dataStatus.preStatus != DataStatus::EXISTED) {
1264 return E_OK;
1265 }
1266 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1267 // only deleted item need origin value.
1268 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1269 if (errCode != E_OK) {
1270 return errCode;
1271 }
1272
1273 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, itemGet.value);
1274 if (errCode != E_OK) {
1275 LOGE("Get column value data failed:%d", errCode);
1276 }
1277
1278 return errCode;
1279 }
1280
ResetSaveSyncStatements(int errCode)1281 int SQLiteSingleVerStorageExecutor::ResetSaveSyncStatements(int errCode)
1282 {
1283 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1284 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1285 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1286 return CheckCorruptedStatus(errCode);
1287 }
1288
1289 namespace {
IsNeedIgnoredData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & devInfo,bool isHashKeyExisted,int policy)1290 inline bool IsNeedIgnoredData(const DataItem &itemPut, const DataItem &itemGet,
1291 const DeviceInfo &devInfo, bool isHashKeyExisted, int policy)
1292 {
1293 // deny the data synced from other dev which the origin dev is current or the existed value is current dev data.
1294 return (((itemGet.origDev.empty() && isHashKeyExisted) || itemPut.origDev.empty()) &&
1295 (!devInfo.isLocal && policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA));
1296 }
1297 }
1298
PrepareForNotifyConflictAndObserver(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)1299 int SQLiteSingleVerStorageExecutor::PrepareForNotifyConflictAndObserver(DataItem &dataItem,
1300 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
1301 {
1302 // Check sava data existed info
1303 int errCode = GetSyncDataItemPre(dataItem, notify.getData, notify.hashKey);
1304 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1305 LOGD("[SingleVerExe][PrepareForNotifyConflictAndObserver] failed:%d", errCode);
1306 if (isSyncMigrating_) {
1307 ResetForMigrateCacheData();
1308 return errCode;
1309 }
1310 return ResetSaveSyncStatements(errCode);
1311 }
1312
1313 bool isHashKeyExisted = (errCode != -E_NOT_FOUND);
1314 if (IsNeedIgnoredData(dataItem, notify.getData, deviceInfo, isHashKeyExisted, conflictResolvePolicy_)) {
1315 LOGD("[SingleVerExe] Ignore the sync data.");
1316 if (isSyncMigrating_) {
1317 ResetForMigrateCacheData();
1318 return -E_IGNORE_DATA;
1319 }
1320 return ResetSaveSyncStatements(-E_IGNORE_DATA);
1321 }
1322
1323 notify.dataStatus = JudgeSyncSaveType(dataItem, notify.getData, deviceInfo, isHashKeyExisted,
1324 isPermitForceWrite);
1325 InitCommitNotifyDataKeyStatus(notify.committedData, notify.hashKey, notify.dataStatus);
1326
1327 // Nonexistent data, but deleted by local.
1328 if ((notify.dataStatus.preStatus == DataStatus::DELETED || notify.dataStatus.preStatus == DataStatus::NOEXISTED) &&
1329 (dataItem.flag & DataItem::DELETE_FLAG) != 0 &&
1330 (dataItem.flag & DataItem::LOCAL_FLAG) != 0) {
1331 // For delete item in cacheDB, which not in mainDB. Cannot notify, but this is not error.
1332 errCode = -E_NOT_FOUND;
1333 LOGD("Nonexistent data, but deleted by local");
1334 if (isSyncMigrating_) {
1335 ResetForMigrateCacheData();
1336 return errCode;
1337 }
1338 return ResetSaveSyncStatements(errCode);
1339 }
1340
1341 // get key and value from ori database
1342 errCode = GetSyncDataItemExt(dataItem, notify.getData, notify.dataStatus);
1343 if (errCode != E_OK) {
1344 LOGD("GetSyncDataItemExt failed:%d", errCode);
1345 if (isSyncMigrating_) {
1346 ResetForMigrateCacheData();
1347 return errCode;
1348 }
1349 return ResetSaveSyncStatements(errCode);
1350 }
1351
1352 return E_OK;
1353 }
1354
SaveSyncDataItem(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,SingleVerNaturalStoreCommitNotifyData * committedData,bool isPermitForceWrite)1355 int SQLiteSingleVerStorageExecutor::SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
1356 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite)
1357 {
1358 NotifyConflictAndObserverData notify = {
1359 .committedData = committedData
1360 };
1361
1362 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
1363 if (errCode != E_OK) {
1364 if (errCode == -E_IGNORE_DATA) {
1365 errCode = E_OK;
1366 }
1367 return errCode;
1368 }
1369
1370 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, committedData);
1371 if (notify.dataStatus.isDefeated) {
1372 LOGE("Data status is defeated:%d", errCode);
1373 return ResetSaveSyncStatements(errCode);
1374 }
1375
1376 bool isUpdate = (notify.dataStatus.preStatus != DataStatus::NOEXISTED);
1377 std::string origDev = GetOriginDevName(dataItem, notify.getData.origDev);
1378 errCode = SaveSyncDataToDatabase(dataItem, notify.hashKey, origDev, deviceInfo.deviceName, isUpdate);
1379 if (errCode == E_OK) {
1380 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, committedData);
1381 maxStamp = std::max(dataItem.timestamp, maxStamp);
1382 } else {
1383 LOGE("Save sync data to db failed:%d", errCode);
1384 }
1385 return ResetSaveSyncStatements(errCode);
1386 }
1387
GetAllMetaKeys(std::vector<Key> & keys) const1388 int SQLiteSingleVerStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
1389 {
1390 sqlite3_stmt *statement = nullptr;
1391 const std::string &sqlStr = (attachMetaMode_ ? SELECT_ATTACH_ALL_META_KEYS : SELECT_ALL_META_KEYS);
1392 int errCode = SQLiteUtils::GetStatement(dbHandle_, sqlStr, statement);
1393 if (errCode != E_OK) {
1394 LOGE("[SingleVerExe][GetAllKey] Get statement failed:%d", errCode);
1395 return errCode;
1396 }
1397
1398 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
1399 SQLiteUtils::ResetStatement(statement, true, errCode);
1400 return errCode;
1401 }
1402
GetAllSyncedEntries(const std::string & hashDev,std::vector<Entry> & entries) const1403 int SQLiteSingleVerStorageExecutor::GetAllSyncedEntries(const std::string &hashDev,
1404 std::vector<Entry> &entries) const
1405 {
1406 int errCode = E_OK;
1407 sqlite3_stmt *statement = nullptr;
1408 if (hashDev.empty()) {
1409 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1410 SELECT_ALL_SYNC_ENTRIES_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES);
1411 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1412 if (errCode != E_OK) {
1413 LOGE("Get all entries statement failed:%d", errCode);
1414 return errCode;
1415 }
1416 } else {
1417 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1418 SELECT_ALL_SYNC_ENTRIES_BY_DEV_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES_BY_DEV);
1419 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1420 if (errCode != E_OK) {
1421 LOGE("Get all entries statement failed:%d", errCode);
1422 return errCode;
1423 }
1424
1425 // deviceName always hash string
1426 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
1427 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, devVect, true); // bind the 1st to device.
1428 if (errCode != E_OK) {
1429 LOGE("Failed to bind the synced device for all entries:%d", errCode);
1430 goto END;
1431 }
1432 }
1433
1434 errCode = GetAllEntries(statement, entries);
1435 END:
1436 SQLiteUtils::ResetStatement(statement, true, errCode);
1437 return errCode;
1438 }
1439
GetAllEntries(sqlite3_stmt * statement,std::vector<Entry> & entries) const1440 int SQLiteSingleVerStorageExecutor::GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const
1441 {
1442 if (statement == nullptr) {
1443 return -E_INVALID_DB;
1444 }
1445 int errCode;
1446 do {
1447 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1448 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1449 Entry entry;
1450 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key); // No.0 is the key
1451 if (errCode != E_OK) {
1452 break;
1453 }
1454 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value); // No.1 is the value
1455 if (errCode != E_OK) {
1456 break;
1457 }
1458
1459 entries.push_back(std::move(entry));
1460 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1461 errCode = E_OK;
1462 break;
1463 } else {
1464 LOGE("SQLite step for all entries failed:%d", errCode);
1465 break;
1466 }
1467 } while (true);
1468
1469 return errCode;
1470 }
1471
BindSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,const SyncDataDevices & devices,bool isUpdate)1472 int SQLiteSingleVerStorageExecutor::BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1473 const Key &hashKey, const SyncDataDevices &devices, bool isUpdate)
1474 {
1475 const int hashKeyIndex = isUpdate ? BIND_SYNC_UPDATE_HASH_KEY_INDEX : BIND_SYNC_HASH_KEY_INDEX;
1476 int errCode = SQLiteUtils::BindBlobToStatement(statement, hashKeyIndex, hashKey, false);
1477 if (errCode != E_OK) {
1478 LOGE("Bind saved sync data hash key failed:%d", errCode);
1479 return errCode;
1480 }
1481
1482 // if delete flag is set, just use the hash key instead of the key
1483 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1484 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_SYNC_KEY_INDEX, -1));
1485 } else {
1486 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_KEY_INDEX, dataItem.key, false);
1487 }
1488
1489 if (errCode != E_OK) {
1490 LOGE("Bind saved sync data key failed:%d", errCode);
1491 return errCode;
1492 }
1493
1494 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_VAL_INDEX, dataItem.value, true);
1495 if (errCode != E_OK) {
1496 LOGE("Bind saved sync data value failed:%d", errCode);
1497 return errCode;
1498 }
1499
1500 errCode = BindSyncDataTime(statement, dataItem, isUpdate);
1501 if (errCode != E_OK) {
1502 return errCode;
1503 }
1504 return BindDevForSavedSyncData(statement, dataItem, devices.origDev, devices.dev);
1505 }
1506
PutConflictData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & deviceInfo,const DataOperStatus & dataStatus,SingleVerNaturalStoreCommitNotifyData * commitData)1507 void SQLiteSingleVerStorageExecutor::PutConflictData(const DataItem &itemPut, const DataItem &itemGet,
1508 const DeviceInfo &deviceInfo, const DataOperStatus &dataStatus,
1509 SingleVerNaturalStoreCommitNotifyData *commitData)
1510 {
1511 if (commitData == nullptr) {
1512 return;
1513 }
1514
1515 bool conflictNotifyMatch = commitData->IsConflictedNotifyMatched(itemPut, itemGet);
1516 if (!conflictNotifyMatch) {
1517 return;
1518 }
1519
1520 if (dataStatus.preStatus == DataStatus::NOEXISTED ||
1521 ((dataStatus.preStatus == DataStatus::DELETED) && dataStatus.isDeleted)) {
1522 return;
1523 }
1524
1525 Key origKey;
1526 if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1527 (itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1528 origKey = itemGet.key;
1529 } else {
1530 origKey = itemPut.key;
1531 }
1532
1533 // insert db original entry
1534 std::vector<uint8_t> getDevVect(itemGet.dev.begin(), itemGet.dev.end());
1535 DataItemInfo orgItemInfo = {itemGet, true, getDevVect};
1536 orgItemInfo.dataItem.key = origKey;
1537 commitData->InsertConflictedItem(orgItemInfo, true);
1538
1539 // insert conflict entry
1540 std::string putDeviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1541 std::vector<uint8_t> putDevVect(putDeviceName.begin(), putDeviceName.end());
1542
1543 DataItemInfo newItemInfo = {itemPut, deviceInfo.isLocal, putDevVect};
1544 newItemInfo.dataItem.key = origKey;
1545 commitData->InsertConflictedItem(newItemInfo, false);
1546 }
1547
Reset()1548 int SQLiteSingleVerStorageExecutor::Reset()
1549 {
1550 if (isTransactionOpen_) {
1551 Rollback();
1552 }
1553
1554 int errCode = ResetForSavingData(SingleVerDataType::SYNC_TYPE);
1555 if (errCode != E_OK) {
1556 LOGE("Finalize the sync resources for saving sync data failed: %d", errCode);
1557 }
1558
1559 errCode = ResetForSavingData(SingleVerDataType::LOCAL_TYPE_SQLITE);
1560 if (errCode != E_OK) {
1561 LOGE("Finalize the local resources for saving sync data failed: %d", errCode);
1562 }
1563 return SQLiteStorageExecutor::Reset();
1564 }
1565
GetSyncDataItemPre(const DataItem & itemPut,DataItem & itemGet,Key & hashKey) const1566 int SQLiteSingleVerStorageExecutor::GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet,
1567 Key &hashKey) const
1568 {
1569 if (isSyncMigrating_) {
1570 hashKey = itemPut.hashKey;
1571 } else if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1572 ((itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY)) {
1573 hashKey = itemPut.key;
1574 } else {
1575 int errCode = DBCommon::CalcValueHash(itemPut.key, hashKey);
1576 if (errCode != E_OK) {
1577 return errCode;
1578 }
1579 }
1580
1581 return GetSyncDataPreByHashKey(hashKey, itemGet);
1582 }
1583
GetSyncDataPreByHashKey(const Key & hashKey,DataItem & itemGet) const1584 int SQLiteSingleVerStorageExecutor::GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const
1585 {
1586 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1587 int errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // 1st arg.
1588 if (errCode != E_OK) {
1589 return errCode;
1590 }
1591
1592 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1593 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // no find the key
1594 errCode = -E_NOT_FOUND;
1595 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1596 itemGet.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1597 itemGet.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1598 itemGet.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1599 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1600 if (errCode != E_OK) {
1601 return errCode;
1602 }
1603 std::vector<uint8_t> devVect;
1604 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1605 if (errCode != E_OK) {
1606 return errCode;
1607 }
1608
1609 std::vector<uint8_t> origDevVect;
1610 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
1611 if (errCode != E_OK) {
1612 return errCode;
1613 }
1614 itemGet.dev.assign(devVect.begin(), devVect.end());
1615 itemGet.origDev.assign(origDevVect.begin(), origDevVect.end());
1616 }
1617 return errCode;
1618 }
1619
DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData * committedData,const Key & key,const Value & value)1620 int SQLiteSingleVerStorageExecutor::DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
1621 const Key &key, const Value &value)
1622 {
1623 if (committedData != nullptr) {
1624 Key hashKey;
1625 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
1626 if (innerErrCode != E_OK) {
1627 return innerErrCode;
1628 }
1629 committedData->InitKeyPropRecord(hashKey, ExistStatus::EXIST);
1630 }
1631
1632 std::string sql = DELETE_LOCAL_SQL;
1633 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
1634 sql = DELETE_LOCAL_SQL_FROM_CACHEHANDLE;
1635 }
1636 sqlite3_stmt *statement = nullptr;
1637 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1638 if (errCode != E_OK) {
1639 goto ERROR;
1640 }
1641
1642 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);
1643 if (errCode != E_OK) {
1644 LOGE("Bind the key error(%d) when delete kv data.", errCode);
1645 goto ERROR;
1646 }
1647
1648 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1649 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1650 if (sqlite3_changes(dbHandle_) > 0) {
1651 if (committedData != nullptr) {
1652 Entry entry = {key, value};
1653 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1654 } else {
1655 LOGE("DeleteLocalKvData failed to do commit notify because of OOM.");
1656 }
1657 errCode = E_OK;
1658 }
1659 }
1660
1661 ERROR:
1662 SQLiteUtils::ResetStatement(statement, true, errCode);
1663 return CheckCorruptedStatus(errCode);
1664 }
1665
DeleteLocalKvData(const Key & key,SingleVerNaturalStoreCommitNotifyData * committedData,Value & value,Timestamp & timestamp)1666 int SQLiteSingleVerStorageExecutor::DeleteLocalKvData(const Key &key,
1667 SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, Timestamp ×tamp)
1668 {
1669 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1670 if (errCode != E_OK) {
1671 return CheckCorruptedStatus(errCode);
1672 }
1673
1674 return DeleteLocalDataInner(committedData, key, value);
1675 }
1676
EraseSyncData(const Key & hashKey)1677 int SQLiteSingleVerStorageExecutor::EraseSyncData(const Key &hashKey)
1678 {
1679 sqlite3_stmt *stmt = nullptr;
1680 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
1681 DELETE_SYNC_DATA_WITH_HASHKEY_FROM_CACHEHANDLE : DELETE_SYNC_DATA_WITH_HASHKEY;
1682 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1683 if (errCode != E_OK) {
1684 LOGE("get erase statement failed:%d", errCode);
1685 return errCode;
1686 }
1687
1688 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey, false);
1689 if (errCode != E_OK) {
1690 LOGE("bind hashKey failed:%d", errCode);
1691 goto END;
1692 }
1693
1694 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1695 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1696 errCode = E_OK;
1697 } else {
1698 LOGE("erase data failed:%d", errCode);
1699 }
1700 END:
1701 SQLiteUtils::ResetStatement(stmt, true, errCode);
1702 return CheckCorruptedStatus(errCode);
1703 }
1704
RemoveDeviceData(const std::string & deviceName)1705 int SQLiteSingleVerStorageExecutor::RemoveDeviceData(const std::string &deviceName)
1706 {
1707 int errCode = E_OK;
1708 bool isCreate = false;
1709 int ret = SQLiteUtils::CheckTableExists(dbHandle_, NATURALBASE_KV_AUX_SYNC_DATA_LOG_TABLE_NAME, isCreate);
1710 bool isTableExists = (ret == E_OK && isCreate);
1711 if (deviceName.empty()) {
1712 if (isTableExists) {
1713 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_ALL_LOG_DATA_SQL, "", "");
1714 }
1715 errCode = CloudExcuteRemoveOrUpdate(REMOVE_ALL_DEV_DATA_SQL, "", "");
1716 } else {
1717 if (isTableExists) {
1718 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_LOG_DATA_BY_DEVID_SQL, deviceName, "");
1719 }
1720 errCode = CloudExcuteRemoveOrUpdate(REMOVE_DEV_DATA_SQL, deviceName, "");
1721 }
1722 return CheckCorruptedStatus(errCode);
1723 }
1724
StepForResultEntries(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries) const1725 int SQLiteSingleVerStorageExecutor::StepForResultEntries(bool isGetValue, sqlite3_stmt *statement,
1726 std::vector<Entry> &entries) const
1727 {
1728 entries.clear();
1729 entries.shrink_to_fit();
1730 int errCode = E_OK;
1731 do {
1732 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1733 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1734 errCode = GetEntryFromStatement(isGetValue, statement, entries);
1735 if (errCode != E_OK) {
1736 return errCode;
1737 }
1738 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1739 errCode = E_OK;
1740 break;
1741 } else {
1742 LOGE("SQLite step failed:%d", errCode);
1743 return errCode;
1744 }
1745 } while (true);
1746
1747 // if select no result, return the -E_NOT_FOUND.
1748 if (entries.empty()) {
1749 errCode = -E_NOT_FOUND;
1750 }
1751
1752 return errCode;
1753 }
1754
BindDevForSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const std::string & origDev,const std::string & deviceName)1755 int SQLiteSingleVerStorageExecutor::BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1756 const std::string &origDev, const std::string &deviceName)
1757 {
1758 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_FLAG_INDEX,
1759 static_cast<int64_t>(dataItem.flag));
1760 if (errCode != E_OK) {
1761 LOGE("Bind saved sync data flag failed:%d", errCode);
1762 return errCode;
1763 }
1764
1765 std::vector<uint8_t> devVect(deviceName.begin(), deviceName.end());
1766 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_DEV_INDEX, devVect, true);
1767 if (errCode != E_OK) {
1768 LOGE("Bind dev for sync data failed:%d", errCode);
1769 return errCode;
1770 }
1771
1772 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
1773 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_ORI_DEV_INDEX, origDevVect, true);
1774 if (errCode != E_OK) {
1775 LOGE("Bind orig dev for sync data failed:%d", errCode);
1776 }
1777
1778 return errCode;
1779 }
1780
GetDataItemSerialSize(const DataItem & item,size_t appendLen)1781 size_t SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(const DataItem &item, size_t appendLen)
1782 {
1783 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
1784 // the size would not be very large.
1785 static const size_t maxOrigDevLength = 40;
1786 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
1787 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
1788 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
1789 return dataSize;
1790 }
1791
InitResultSet(const Key & keyPrefix,sqlite3_stmt * & countStmt)1792 int SQLiteSingleVerStorageExecutor::InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt)
1793 {
1794 if (dbHandle_ == nullptr) {
1795 return -E_INVALID_DB;
1796 }
1797 // bind statement for count
1798 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_COUNT_SYNC_PREFIX_SQL, countStmt);
1799 if (errCode != E_OK) {
1800 LOGE("Get count statement for resultset error:%d", errCode);
1801 return errCode;
1802 }
1803
1804 errCode = SQLiteUtils::BindPrefixKey(countStmt, 1, keyPrefix); // first argument is key
1805 if (errCode != E_OK) {
1806 LOGE("Bind count key error:%d", errCode);
1807 goto ERROR;
1808 }
1809 // bind statement for result set
1810 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
1811 if (errCode != E_OK) {
1812 LOGE("Get result set rowid statement error:%d", errCode);
1813 goto ERROR;
1814 }
1815
1816 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1817 if (errCode != E_OK) {
1818 LOGE("Get result set entry statement error:%d", errCode);
1819 goto ERROR;
1820 }
1821
1822 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
1823 if (errCode != E_OK) {
1824 LOGE("Bind result set rowid statement error:%d", errCode);
1825 goto ERROR;
1826 }
1827 return E_OK;
1828
1829 ERROR:
1830 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1831 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1832 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1833 return CheckCorruptedStatus(errCode);
1834 }
1835
InitResultSetCount(QueryObject & queryObj,sqlite3_stmt * & countStmt)1836 int SQLiteSingleVerStorageExecutor::InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1837 {
1838 if (dbHandle_ == nullptr) {
1839 return -E_INVALID_DB;
1840 }
1841
1842 int errCode = E_OK;
1843 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1844 if (errCode != E_OK) {
1845 return errCode;
1846 }
1847
1848 errCode = helper.GetCountSqlStatement(dbHandle_, countStmt);
1849 if (errCode != E_OK) {
1850 LOGE("Get count bind statement error:%d", errCode);
1851 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1852 }
1853 return errCode;
1854 }
1855
InitResultSetContent(QueryObject & queryObj)1856 int SQLiteSingleVerStorageExecutor::InitResultSetContent(QueryObject &queryObj)
1857 {
1858 int errCode = E_OK;
1859 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1860 if (errCode != E_OK) {
1861 return errCode;
1862 }
1863
1864 // bind statement for result set
1865 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
1866 if (errCode != E_OK) {
1867 LOGE("[SqlSinExe][InitResSetContent] Bind result set rowid statement of query error:%d", errCode);
1868 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1869 return errCode;
1870 }
1871 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1872 if (errCode != E_OK) {
1873 LOGE("[SqlSinExe][InitResSetContent] Get result set entry statement of query error:%d", errCode);
1874 return CheckCorruptedStatus(errCode);
1875 }
1876 return errCode;
1877 }
1878
InitResultSet(QueryObject & queryObj,sqlite3_stmt * & countStmt)1879 int SQLiteSingleVerStorageExecutor::InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1880 {
1881 if (dbHandle_ == nullptr) {
1882 return -E_INVALID_DB;
1883 }
1884
1885 int errCode = E_OK;
1886 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1887 if (errCode != E_OK) {
1888 return errCode;
1889 }
1890
1891 if (!queryObj.IsValid()) {
1892 return -E_INVALID_QUERY_FORMAT;
1893 }
1894
1895 errCode = InitResultSetCount(queryObj, countStmt);
1896 if (errCode != E_OK) {
1897 return CheckCorruptedStatus(errCode);
1898 }
1899
1900 errCode = InitResultSetContent(queryObj);
1901 if (errCode != E_OK) {
1902 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1903 }
1904 return CheckCorruptedStatus(errCode);
1905 }
1906
UpdateLocalDataTimestamp(Timestamp timestamp)1907 int SQLiteSingleVerStorageExecutor::UpdateLocalDataTimestamp(Timestamp timestamp)
1908 {
1909 const std::string updateSql = "UPDATE local_data SET timestamp=";
1910 std::string sql = updateSql + std::to_string(timestamp) + " WHERE timestamp=0;";
1911 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1912 return CheckCorruptedStatus(errCode);
1913 }
1914
SetAttachMetaMode(bool attachMetaMode)1915 void SQLiteSingleVerStorageExecutor::SetAttachMetaMode(bool attachMetaMode)
1916 {
1917 attachMetaMode_ = attachMetaMode;
1918 }
1919
GetOneRawDataItem(sqlite3_stmt * statement,DataItem & dataItem,uint64_t & verInCurCacheDb,bool isCacheDb) const1920 int SQLiteSingleVerStorageExecutor::GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
1921 uint64_t &verInCurCacheDb, bool isCacheDb) const
1922 {
1923 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, dataItem.key);
1924 if (errCode != E_OK) {
1925 return errCode;
1926 }
1927
1928 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
1929 if (errCode != E_OK) {
1930 return errCode;
1931 }
1932
1933 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1934 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1935
1936 std::vector<uint8_t> devVect;
1937 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1938 if (errCode != E_OK) {
1939 return errCode;
1940 }
1941 dataItem.dev = std::string(devVect.begin(), devVect.end());
1942
1943 devVect.clear();
1944 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
1945 if (errCode != E_OK) {
1946 return errCode;
1947 }
1948 dataItem.origDev = std::string(devVect.begin(), devVect.end());
1949
1950 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_HASH_KEY_INDEX, dataItem.hashKey);
1951 if (errCode != E_OK) {
1952 return errCode;
1953 }
1954 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1955 if (errCode != E_OK) {
1956 return errCode;
1957 }
1958 if (isCacheDb) {
1959 verInCurCacheDb = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_VERSION_INDEX));
1960 }
1961 return E_OK;
1962 }
1963
GetAllDataItems(sqlite3_stmt * statement,std::vector<DataItem> & dataItems,uint64_t & verInCurCacheDb,bool isCacheDb) const1964 int SQLiteSingleVerStorageExecutor::GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
1965 uint64_t &verInCurCacheDb, bool isCacheDb) const
1966 {
1967 dataItems.clear();
1968 dataItems.shrink_to_fit();
1969 DataItem dataItem;
1970 int errCode;
1971 do {
1972 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1973 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1974 errCode = GetOneRawDataItem(statement, dataItem, verInCurCacheDb, isCacheDb);
1975 if (errCode != E_OK) {
1976 return errCode;
1977 }
1978 dataItems.push_back(std::move(dataItem));
1979 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1980 errCode = E_OK;
1981 break;
1982 } else {
1983 LOGE("SQLite step failed:%d", errCode);
1984 break;
1985 }
1986 } while (true);
1987
1988 return CheckCorruptedStatus(errCode);
1989 }
1990
OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)1991 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache,
1992 uint32_t cacheLimit, int &count)
1993 {
1994 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1995 if (errCode != E_OK) {
1996 LOGE("[SqlSinExe][OpenResSetRowId][Common] Get entry stmt fail, errCode=%d", errCode);
1997 return CheckCorruptedStatus(errCode);
1998 }
1999 errCode = StartTransaction(TransactType::DEFERRED);
2000 if (errCode != E_OK) {
2001 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2002 return CheckCorruptedStatus(errCode);
2003 }
2004 // Now Ready To Execute
2005 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, 0, count);
2006 if (errCode != E_OK) {
2007 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2008 Rollback();
2009 return CheckCorruptedStatus(errCode);
2010 }
2011 // Consider finalize getResultRowIdStatement_ here if count equal to size of rowIdCache.
2012 return E_OK;
2013 }
2014
ResultSetLoadRowIdCache(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos,int & count)2015 int SQLiteSingleVerStorageExecutor::ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
2016 uint32_t cacheStartPos, int &count)
2017 {
2018 rowIdCache.clear();
2019 count = 0;
2020 while (true) {
2021 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
2022 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2023 if (count >= static_cast<int>(cacheStartPos) && rowIdCache.size() < cacheLimit) {
2024 // If we can start cache, and, if we can still cache
2025 int64_t rowid = sqlite3_column_int64(getResultRowIdStatement_, 0);
2026 rowIdCache.push_back(rowid);
2027 }
2028 // Always increase the count
2029 count++;
2030 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2031 break;
2032 } else {
2033 LOGE("[SqlSinExe][ResSetLoadCache] Step fail, errCode=%d", errCode);
2034 rowIdCache.clear();
2035 count = 0;
2036 return CheckCorruptedStatus(errCode);
2037 }
2038 }
2039 return E_OK;
2040 }
2041
ResetStatement()2042 int SQLiteSingleVerStorageExecutor::SaveRecordStatements::ResetStatement()
2043 {
2044 int errCode = E_OK;
2045 SQLiteUtils::ResetStatement(insertStatement, true, errCode);
2046 if (errCode != E_OK) {
2047 LOGE("Finalize insert statements failed, error: %d", errCode);
2048 }
2049
2050 SQLiteUtils::ResetStatement(updateStatement, true, errCode);
2051 if (errCode != E_OK) {
2052 LOGE("Finalize update statements failed, error: %d", errCode);
2053 }
2054
2055 SQLiteUtils::ResetStatement(queryStatement, true, errCode);
2056 if (errCode != E_OK) {
2057 LOGE("Finalize query statement failed, error: %d", errCode);
2058 }
2059 return errCode;
2060 }
2061
FinalizeAllStatements()2062 void SQLiteSingleVerStorageExecutor::FinalizeAllStatements()
2063 {
2064 int errCode = saveLocalStatements_.ResetStatement();
2065 if (errCode != E_OK) {
2066 LOGE("Finalize saveLocal statements failed, error: %d", errCode);
2067 }
2068
2069 errCode = saveSyncStatements_.ResetStatement();
2070 if (errCode != E_OK) {
2071 LOGE("Finalize saveSync statement failed, error: %d", errCode);
2072 }
2073
2074 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
2075 if (errCode != E_OK) {
2076 LOGE("Finalize getResultRowIdStatement_ failed, error: %d", errCode);
2077 }
2078
2079 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2080 if (errCode != E_OK) {
2081 LOGE("Finalize getResultEntryStatement_ failed, error: %d", errCode);
2082 }
2083
2084 errCode = migrateSyncStatements_.ResetStatement();
2085 if (errCode != E_OK) {
2086 LOGE("Finalize migrateSync statements failed, error: %d", errCode);
2087 }
2088
2089 ReleaseContinueStatement();
2090 }
2091
SetConflictResolvePolicy(int policy)2092 void SQLiteSingleVerStorageExecutor::SetConflictResolvePolicy(int policy)
2093 {
2094 if (policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA || policy == DEFAULT_LAST_WIN) {
2095 conflictResolvePolicy_ = policy;
2096 }
2097 }
2098
CheckIntegrity() const2099 int SQLiteSingleVerStorageExecutor::CheckIntegrity() const
2100 {
2101 if (dbHandle_ == nullptr) {
2102 return -E_INVALID_DB;
2103 }
2104
2105 return SQLiteUtils::CheckIntegrity(dbHandle_, CHECK_DB_INTEGRITY_SQL);
2106 }
2107
ForceCheckPoint() const2108 int SQLiteSingleVerStorageExecutor::ForceCheckPoint() const
2109 {
2110 if (dbHandle_ == nullptr) {
2111 return -E_INVALID_DB;
2112 }
2113 SQLiteUtils::ExecuteCheckPoint(dbHandle_);
2114 return E_OK;
2115 }
2116
GetLogFileSize() const2117 uint64_t SQLiteSingleVerStorageExecutor::GetLogFileSize() const
2118 {
2119 if (isMemDb_) {
2120 return 0;
2121 }
2122
2123 const char *fileName = sqlite3_db_filename(dbHandle_, "main");
2124 if (fileName == nullptr) {
2125 return 0;
2126 }
2127 std::string walName = std::string(fileName) + "-wal";
2128 uint64_t fileSize = 0;
2129 int result = OS::CalFileSize(std::string(walName), fileSize);
2130 if (result != E_OK) {
2131 return 0;
2132 }
2133 return fileSize;
2134 }
2135
GetExistsDevicesFromMeta(std::set<std::string> & devices)2136 int SQLiteSingleVerStorageExecutor::GetExistsDevicesFromMeta(std::set<std::string> &devices)
2137 {
2138 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_,
2139 attachMetaMode_ ? SqliteMetaExecutor::MetaMode::KV_ATTACH : SqliteMetaExecutor::MetaMode::KV,
2140 isMemDb_, devices);
2141 }
2142
UpdateKey(const UpdateKeyCallback & callback)2143 int SQLiteSingleVerStorageExecutor::UpdateKey(const UpdateKeyCallback &callback)
2144 {
2145 if (dbHandle_ == nullptr) {
2146 return -E_INVALID_DB;
2147 }
2148 UpdateContext context;
2149 context.callback = callback;
2150 int errCode = CreateFuncUpdateKey(context, &Translate, &CalHashKey);
2151 if (errCode != E_OK) {
2152 return errCode;
2153 }
2154 int executeErrCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, UPDATE_SYNC_DATA_KEY_SQL);
2155 context.callback = nullptr;
2156 errCode = CreateFuncUpdateKey(context, nullptr, nullptr);
2157 if (context.errCode != E_OK) {
2158 return context.errCode;
2159 }
2160 if (executeErrCode != E_OK) {
2161 return executeErrCode;
2162 }
2163 if (errCode != E_OK) {
2164 return errCode;
2165 }
2166 return E_OK;
2167 }
2168
CreateFuncUpdateKey(UpdateContext & context,void (* translateFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv),void (* calHashFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const2169 int SQLiteSingleVerStorageExecutor::CreateFuncUpdateKey(UpdateContext &context,
2170 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
2171 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
2172 {
2173 int errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_TRANSLATE_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2174 &context, translateFunc, nullptr, nullptr, nullptr);
2175 if (errCode != SQLITE_OK) {
2176 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2177 return SQLiteUtils::MapSQLiteErrno(errCode);
2178 }
2179 errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_CAL_HASH_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2180 &context, calHashFunc, nullptr, nullptr, nullptr);
2181 if (errCode != SQLITE_OK) {
2182 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2183 return SQLiteUtils::MapSQLiteErrno(errCode);
2184 }
2185 return E_OK;
2186 }
2187
Translate(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2188 void SQLiteSingleVerStorageExecutor::Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2189 {
2190 if (ctx == nullptr || argc != 1 || argv == nullptr) { // i parameters, which are key
2191 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2192 return;
2193 }
2194 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2195 auto keyBlob = static_cast<const uint8_t *>(sqlite3_value_blob(argv[0]));
2196 int keyBlobLen = sqlite3_value_bytes(argv[0]);
2197 Key oldKey;
2198 if (keyBlob != nullptr && keyBlobLen > 0) {
2199 oldKey = Key(keyBlob, keyBlob + keyBlobLen);
2200 }
2201 Key newKey;
2202 context->callback(oldKey, newKey);
2203 if (newKey.size() >= DBConstant::MAX_KEY_SIZE || newKey.empty()) {
2204 LOGE("[SqlSinExe][Translate] invalid key len=%zu", newKey.size());
2205 context->errCode = -E_INVALID_ARGS;
2206 sqlite3_result_error(ctx, "Update key is invalid", -1);
2207 return;
2208 }
2209 context->newKey = newKey;
2210 sqlite3_result_blob(ctx, newKey.data(), static_cast<int>(newKey.size()), SQLITE_TRANSIENT);
2211 }
2212
CalHashKey(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2213 void SQLiteSingleVerStorageExecutor::CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2214 {
2215 if (ctx == nullptr || argc != 1 || argv == nullptr) {
2216 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2217 return;
2218 }
2219 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2220 Key hashKey;
2221 DBCommon::CalcValueHash(context->newKey, hashKey);
2222 sqlite3_result_blob(ctx, hashKey.data(), static_cast<int>(hashKey.size()), SQLITE_TRANSIENT);
2223 }
2224
BindSyncDataTime(sqlite3_stmt * statement,const DataItem & dataItem,bool isUpdate)2225 int SQLiteSingleVerStorageExecutor::BindSyncDataTime(sqlite3_stmt *statement, const DataItem &dataItem, bool isUpdate)
2226 {
2227 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_STAMP_INDEX, dataItem.timestamp);
2228 if (errCode != E_OK) {
2229 LOGE("Bind saved sync data stamp failed:%d", errCode);
2230 return errCode;
2231 }
2232
2233 const int writeTimeIndex = isUpdate ? BIND_SYNC_UPDATE_W_TIME_INDEX : BIND_SYNC_W_TIME_INDEX;
2234 errCode = SQLiteUtils::BindInt64ToStatement(statement, writeTimeIndex, dataItem.writeTimestamp);
2235 if (errCode != E_OK) {
2236 LOGE("Bind saved sync data write stamp failed:%d", errCode);
2237 return errCode;
2238 }
2239
2240 const int modifyTimeIndex = isUpdate ? BIND_SYNC_UPDATE_MODIFY_TIME_INDEX : BIND_SYNC_MODIFY_TIME_INDEX;
2241 errCode = SQLiteUtils::BindInt64ToStatement(statement, modifyTimeIndex, dataItem.modifyTime);
2242 if (errCode != E_OK) {
2243 LOGE("Bind saved sync data modify time failed:%d", errCode);
2244 return errCode;
2245 }
2246
2247 const int createTimeIndex = isUpdate ? BIND_SYNC_UPDATE_CREATE_TIME_INDEX : BIND_SYNC_CREATE_TIME_INDEX;
2248 errCode = SQLiteUtils::BindInt64ToStatement(statement, createTimeIndex, dataItem.createTime);
2249 if (errCode != E_OK) {
2250 LOGE("Bind saved sync data create time failed:%d", errCode);
2251 return errCode;
2252 }
2253
2254 LOGI("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 " modifyTime:%" PRIu64 " createTime:%"
2255 PRIu64 ", key size:%" PRIu32 ", value size:%" PRIu32, dataItem.writeTimestamp, dataItem.timestamp,
2256 dataItem.flag, dataItem.modifyTime, dataItem.createTime, dataItem.key.size(), dataItem.value.size());
2257 return errCode;
2258 }
2259
CreateCloudLogTable()2260 int SQLiteSingleVerStorageExecutor::CreateCloudLogTable()
2261 {
2262 if (dbHandle_ == nullptr) {
2263 return -E_INVALID_DB;
2264 }
2265 return SqliteLogTableManager::CreateKvSyncLogTable(dbHandle_);
2266 }
2267 } // namespace DistributedDB
2268