1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "db_errno.h"
23 #include "log_print.h"
24 #include "log_table_manager_factory.h"
25 #include "parcel.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "sqlite_meta_executor.h"
29 #include "sqlite_single_ver_storage_executor_sql.h"
30
31 namespace DistributedDB {
32 namespace {
33
ResetOrRegetStmt(sqlite3 * db,sqlite3_stmt * & stmt,const std::string & sql)34 int ResetOrRegetStmt(sqlite3 *db, sqlite3_stmt *&stmt, const std::string &sql)
35 {
36 int errCode = E_OK;
37 SQLiteUtils::ResetStatement(stmt, false, errCode);
38 if (errCode != E_OK) {
39 LOGE("[ResetOrRegetStmt] reset stmt failed:%d.", errCode);
40 // Finish current statement and remade one
41 SQLiteUtils::ResetStatement(stmt, true, errCode);
42 errCode = SQLiteUtils::GetStatement(db, sql, stmt);
43 if (errCode != E_OK) {
44 LOGE("[ResetOrRegetStmt] reget failed:%d.", errCode);
45 }
46 }
47 return errCode;
48 }
49
GetEntryFromStatement(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries)50 int GetEntryFromStatement(bool isGetValue, sqlite3_stmt *statement, std::vector<Entry> &entries)
51 {
52 Entry entry;
53 int errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key);
54 if (errCode != E_OK) {
55 return errCode;
56 }
57 if (isGetValue) {
58 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value);
59 if (errCode != E_OK) {
60 return errCode;
61 }
62 }
63
64 entries.push_back(std::move(entry));
65 return errCode;
66 }
67 }
68
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb)69 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb)
70 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
71 getSyncStatement_(nullptr),
72 getResultRowIdStatement_(nullptr),
73 getResultEntryStatement_(nullptr),
74 isTransactionOpen_(false),
75 attachMetaMode_(false),
76 executorState_(ExecutorState::INVALID),
77 maxTimestampInMainDB_(0),
78 migrateTimeOffset_(0),
79 isSyncMigrating_(false),
80 conflictResolvePolicy_(DEFAULT_LAST_WIN)
81 {}
82
SQLiteSingleVerStorageExecutor(sqlite3 * dbHandle,bool writable,bool isMemDb,ExecutorState executorState)83 SQLiteSingleVerStorageExecutor::SQLiteSingleVerStorageExecutor(sqlite3 *dbHandle, bool writable, bool isMemDb,
84 ExecutorState executorState)
85 : SQLiteStorageExecutor(dbHandle, writable, isMemDb),
86 getSyncStatement_(nullptr),
87 getResultRowIdStatement_(nullptr),
88 getResultEntryStatement_(nullptr),
89 isTransactionOpen_(false),
90 attachMetaMode_(false),
91 executorState_(executorState),
92 maxTimestampInMainDB_(0),
93 migrateTimeOffset_(0),
94 isSyncMigrating_(false),
95 conflictResolvePolicy_(DEFAULT_LAST_WIN)
96 {}
97
~SQLiteSingleVerStorageExecutor()98 SQLiteSingleVerStorageExecutor::~SQLiteSingleVerStorageExecutor()
99 {
100 if (isTransactionOpen_) {
101 Rollback();
102 }
103 FinalizeAllStatements();
104 }
105
GetKvData(SingleVerDataType type,const Key & key,Value & value,Timestamp & timestamp) const106 int SQLiteSingleVerStorageExecutor::GetKvData(SingleVerDataType type, const Key &key, Value &value,
107 Timestamp ×tamp) const
108 {
109 std::string sql;
110 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
111 sql = SELECT_LOCAL_VALUE_TIMESTAMP_SQL;
112 } else if (type == SingleVerDataType::SYNC_TYPE) {
113 sql = SELECT_SYNC_VALUE_WTIMESTAMP_SQL;
114 } else if (type == SingleVerDataType::META_TYPE) {
115 if (attachMetaMode_) {
116 sql = SELECT_ATTACH_META_VALUE_SQL;
117 } else {
118 sql = SELECT_META_VALUE_SQL;
119 }
120 } else {
121 return -E_INVALID_ARGS;
122 }
123
124 sqlite3_stmt *statement = nullptr;
125 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
126 if (errCode != E_OK) {
127 goto END;
128 }
129
130 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
131 if (errCode != E_OK) {
132 goto END;
133 }
134
135 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
136 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
137 errCode = -E_NOT_FOUND;
138 goto END;
139 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
140 goto END;
141 }
142
143 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
144
145 // get timestamp
146 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
147 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_LOCAL_TIME_INDEX));
148 } else if (type == SingleVerDataType::SYNC_TYPE) {
149 timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, GET_KV_RES_SYNC_TIME_INDEX));
150 }
151
152 END:
153 int ret = E_OK;
154 SQLiteUtils::ResetStatement(statement, true, ret);
155 return CheckCorruptedStatus(errCode);
156 }
157
BindPutKvData(sqlite3_stmt * statement,const Key & key,const Value & value,Timestamp timestamp,SingleVerDataType type)158 int SQLiteSingleVerStorageExecutor::BindPutKvData(sqlite3_stmt *statement, const Key &key, const Value &value,
159 Timestamp timestamp, SingleVerDataType type)
160 {
161 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, key, false);
162 if (errCode != E_OK) {
163 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
164 return errCode;
165 }
166
167 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_VAL_INDEX, value, true);
168 if (errCode != E_OK) {
169 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
170 return errCode;
171 }
172
173 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
174 Key hashKey;
175 errCode = DBCommon::CalcValueHash(key, hashKey);
176 if (errCode != E_OK) {
177 return errCode;
178 }
179
180 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_LOCAL_HASH_KEY_INDEX, hashKey, false);
181 if (errCode != E_OK) {
182 LOGE("[SingleVerExe][BindPutKv]Bind hash key error:%d", errCode);
183 return errCode;
184 }
185
186 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_LOCAL_TIMESTAMP_INDEX, timestamp);
187 if (errCode != E_OK) {
188 LOGE("[SingleVerExe][BindPutKv]Bind timestamp error:%d", errCode);
189 return errCode;
190 }
191 }
192 return E_OK;
193 }
194
GetKvDataByHashKey(const Key & hashKey,SingleVerRecord & result) const195 int SQLiteSingleVerStorageExecutor::GetKvDataByHashKey(const Key &hashKey, SingleVerRecord &result) const
196 {
197 sqlite3_stmt *statement = nullptr;
198 std::vector<uint8_t> devVect;
199 std::vector<uint8_t> origDevVect;
200 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_HASH_SQL, statement);
201 if (errCode != E_OK) {
202 goto END;
203 }
204
205 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // bind the first arg hashkey.
206 if (errCode != E_OK) {
207 goto END;
208 }
209
210 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
211 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
212 result.hashKey = hashKey;
213 result.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
214 result.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
215 result.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
216 // get key
217 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, result.key);
218 if (errCode != E_OK) {
219 goto END;
220 }
221 // get value
222 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, result.value);
223 if (errCode != E_OK) {
224 goto END;
225 }
226 // get device
227 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
228 if (errCode != E_OK) {
229 goto END;
230 }
231 result.device = std::string(devVect.begin(), devVect.end());
232 // get original device
233 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
234 if (errCode != E_OK) {
235 goto END;
236 }
237 result.origDevice = std::string(origDevVect.begin(), origDevVect.end());
238 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
239 errCode = -E_NOT_FOUND;
240 }
241
242 END:
243 int ret = E_OK;
244 SQLiteUtils::ResetStatement(statement, true, ret);
245 return CheckCorruptedStatus(errCode);
246 }
247
SaveKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp)248 int SQLiteSingleVerStorageExecutor::SaveKvData(SingleVerDataType type, const Key &key, const Value &value,
249 Timestamp timestamp)
250 {
251 sqlite3_stmt *statement = nullptr;
252 std::string sql;
253 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
254 sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ? INSERT_LOCAL_SQL_FROM_CACHEHANDLE :
255 INSERT_LOCAL_SQL);
256 } else {
257 sql = (attachMetaMode_ ? INSERT_ATTACH_META_SQL : INSERT_META_SQL);
258 }
259 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
260 if (errCode != E_OK) {
261 goto ERROR;
262 }
263
264 errCode = BindPutKvData(statement, key, value, timestamp, type);
265 if (errCode != E_OK) {
266 goto ERROR;
267 }
268
269 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
270 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
271 errCode = E_OK;
272 }
273
274 ERROR:
275 int ret = E_OK;
276 SQLiteUtils::ResetStatement(statement, true, ret);
277 return CheckCorruptedStatus(errCode);
278 }
279
PutKvData(SingleVerDataType type,const Key & key,const Value & value,Timestamp timestamp,SingleVerNaturalStoreCommitNotifyData * committedData)280 int SQLiteSingleVerStorageExecutor::PutKvData(SingleVerDataType type, const Key &key, const Value &value,
281 Timestamp timestamp, SingleVerNaturalStoreCommitNotifyData *committedData)
282 {
283 if (type != SingleVerDataType::LOCAL_TYPE_SQLITE && type != SingleVerDataType::META_TYPE) {
284 return -E_INVALID_ARGS;
285 }
286 // committedData is only for local data, not for meta data.
287 bool isLocal = (SingleVerDataType::LOCAL_TYPE_SQLITE == type);
288 Timestamp localTimestamp = 0;
289 Value readValue;
290 bool isExisted = CheckIfKeyExisted(key, isLocal, readValue, localTimestamp);
291 if (isLocal && committedData != nullptr) {
292 ExistStatus existedStatus = isExisted ? ExistStatus::EXIST : ExistStatus::NONE;
293 Key hashKey;
294 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
295 if (innerErrCode != E_OK) {
296 return innerErrCode;
297 }
298 committedData->InitKeyPropRecord(hashKey, existedStatus);
299 }
300 int errCode = SaveKvData(type, key, value, timestamp);
301 if (errCode != E_OK) {
302 return errCode;
303 }
304
305 if (isLocal && committedData != nullptr) {
306 Entry entry = {key, value};
307 committedData->InsertCommittedData(std::move(entry), isExisted ? DataType::UPDATE : DataType::INSERT, true);
308 }
309 return E_OK;
310 }
311
GetEntries(bool isGetValue,SingleVerDataType type,const Key & keyPrefix,std::vector<Entry> & entries) const312 int SQLiteSingleVerStorageExecutor::GetEntries(bool isGetValue, SingleVerDataType type, const Key &keyPrefix,
313 std::vector<Entry> &entries) const
314 {
315 if ((type != SingleVerDataType::LOCAL_TYPE_SQLITE) && (type != SingleVerDataType::SYNC_TYPE)) {
316 return -E_INVALID_ARGS;
317 }
318
319 std::string sql;
320 if (type == SingleVerDataType::SYNC_TYPE) {
321 sql = isGetValue ? SELECT_SYNC_PREFIX_SQL : SELECT_SYNC_KEY_PREFIX_SQL;
322 } else {
323 sql = SELECT_LOCAL_PREFIX_SQL;
324 }
325 sqlite3_stmt *statement = nullptr;
326 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
327 if (errCode != E_OK) {
328 goto END;
329 }
330
331 // bind the prefix key for the first and second args.
332 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // first argument is key
333 if (errCode != E_OK) {
334 goto END;
335 }
336
337 errCode = StepForResultEntries(isGetValue, statement, entries);
338
339 END:
340 int ret = E_OK;
341 SQLiteUtils::ResetStatement(statement, true, ret);
342 return CheckCorruptedStatus(errCode);
343 }
344
GetEntries(QueryObject & queryObj,std::vector<Entry> & entries) const345 int SQLiteSingleVerStorageExecutor::GetEntries(QueryObject &queryObj, std::vector<Entry> &entries) const
346 {
347 int errCode = E_OK;
348 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
349 if (errCode != E_OK) {
350 return errCode;
351 }
352
353 sqlite3_stmt *statement = nullptr;
354 errCode = helper.GetQuerySqlStatement(dbHandle_, false, statement);
355 if (errCode == E_OK) {
356 errCode = StepForResultEntries(true, statement, entries);
357 }
358
359 int ret = E_OK;
360 SQLiteUtils::ResetStatement(statement, true, ret);
361 return CheckCorruptedStatus(errCode);
362 }
363
GetCount(QueryObject & queryObj,int & count) const364 int SQLiteSingleVerStorageExecutor::GetCount(QueryObject &queryObj, int &count) const
365 {
366 if (dbHandle_ == nullptr) {
367 return -E_INVALID_DB;
368 }
369
370 int errCode = E_OK;
371 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
372 if (errCode != E_OK) {
373 return errCode;
374 }
375
376 if (!queryObj.IsCountValid()) {
377 LOGE("GetCount no need limit or orderby");
378 return -E_INVALID_QUERY_FORMAT;
379 }
380
381 std::string countSql;
382 errCode = helper.GetCountQuerySql(countSql);
383 if (errCode != E_OK) {
384 return errCode;
385 }
386
387 sqlite3_stmt *countStatement = nullptr;
388 // get statement for count
389 errCode = helper.GetQuerySqlStatement(dbHandle_, countSql, countStatement);
390 if (errCode != E_OK) {
391 LOGE("Get count bind statement error:%d", errCode);
392 goto END;
393 }
394 // get count value
395 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
396 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
397 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
398 if (readCount > INT32_MAX) {
399 LOGW("total count is beyond the max count");
400 count = 0;
401 errCode = -E_UNEXPECTED_DATA;
402 } else {
403 count = static_cast<int>(readCount);
404 errCode = E_OK;
405 }
406 LOGD("Entry count in this result set is %d", count);
407 } else {
408 errCode = -E_UNEXPECTED_DATA;
409 }
410
411 END:
412 int ret = E_OK;
413 SQLiteUtils::ResetStatement(countStatement, true, ret);
414 return CheckCorruptedStatus(errCode);
415 }
416
InitCurrentMaxStamp(Timestamp & maxStamp)417 void SQLiteSingleVerStorageExecutor::InitCurrentMaxStamp(Timestamp &maxStamp)
418 {
419 if (dbHandle_ == nullptr) {
420 return;
421 }
422 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
423 SELECT_MAX_TIMESTAMP_SQL_FROM_CACHEHANDLE : SELECT_MAX_TIMESTAMP_SQL);
424 sqlite3_stmt *statement = nullptr;
425 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
426 if (errCode != E_OK) {
427 return;
428 }
429
430 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
431 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
432 maxStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
433 }
434 SQLiteUtils::ResetStatement(statement, true, errCode);
435 }
436
PrepareForSyncDataByTime(Timestamp begin,Timestamp end,sqlite3_stmt * & statement,bool getDeletedData) const437 int SQLiteSingleVerStorageExecutor::PrepareForSyncDataByTime(Timestamp begin, Timestamp end,
438 sqlite3_stmt *&statement, bool getDeletedData) const
439 {
440 if (dbHandle_ == nullptr) {
441 return -E_INVALID_DB;
442 }
443
444 const std::string sql = (getDeletedData ? SELECT_SYNC_DELETED_ENTRIES_SQL : SELECT_SYNC_ENTRIES_SQL);
445 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
446 if (errCode != E_OK) {
447 LOGE("Prepare the sync entries statement error:%d", errCode);
448 return errCode;
449 }
450
451 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_BEGIN_STAMP_INDEX, begin);
452 if (errCode != E_OK) {
453 goto ERROR;
454 }
455
456 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_END_STAMP_INDEX, end);
457
458 ERROR:
459 if (errCode != E_OK) {
460 LOGE("Bind the timestamp for getting sync data error:%d", errCode);
461 int ret = E_OK;
462 SQLiteUtils::ResetStatement(statement, true, ret);
463 }
464
465 return CheckCorruptedStatus(errCode);
466 }
467
ReleaseContinueStatement()468 void SQLiteSingleVerStorageExecutor::ReleaseContinueStatement()
469 {
470 if (getSyncStatement_ != nullptr) {
471 int errCode = E_OK;
472 SQLiteUtils::ResetStatement(getSyncStatement_, true, errCode);
473 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
474 SetCorruptedStatus();
475 }
476 }
477 }
478
479 namespace {
GetDataItemForSync(sqlite3_stmt * statement,DataItem & dataItem)480 int GetDataItemForSync(sqlite3_stmt *statement, DataItem &dataItem)
481 {
482 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
483 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
484 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
485 dataItem.flag &= (~DataItem::LOCAL_FLAG);
486 std::vector<uint8_t> devVect;
487 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
488 if (errCode != E_OK) {
489 return errCode;
490 }
491 dataItem.origDev = std::string(devVect.begin(), devVect.end());
492 int keyIndex = SYNC_RES_KEY_INDEX;
493 // If the data has been deleted, just use the hash key for sync.
494 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
495 keyIndex = SYNC_RES_HASH_KEY_INDEX;
496 }
497 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, dataItem.key);
498 if (errCode != E_OK) {
499 return errCode;
500 }
501 return SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
502 }
503 }
504
GetSyncDataItems(std::vector<DataItem> & dataItems,sqlite3_stmt * statement,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo) const505 int SQLiteSingleVerStorageExecutor::GetSyncDataItems(std::vector<DataItem> &dataItems, sqlite3_stmt *statement,
506 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo) const
507 {
508 int errCode;
509 size_t dataTotalSize = 0;
510 do {
511 DataItem dataItem;
512 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
513 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
514 errCode = GetDataItemForSync(statement, dataItem);
515 if (errCode != E_OK) {
516 LOGE("GetDataItemForSync failed:%d", errCode);
517 return errCode;
518 }
519 } else {
520 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
521 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
522 errCode = -E_FINISHED;
523 } else {
524 LOGE("Get sync data error:%d", errCode);
525 }
526 break;
527 }
528
529 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
530 dataTotalSize += GetDataItemSerialSize(dataItem, appendLength);
531 if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) ||
532 dataItems.size() >= dataSizeInfo.packetSize) {
533 errCode = -E_UNFINISHED;
534 break;
535 } else {
536 dataItems.push_back(std::move(dataItem));
537 }
538 } while (true);
539 return errCode;
540 }
541
GetSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const542 int SQLiteSingleVerStorageExecutor::GetSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
543 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
544 {
545 sqlite3_stmt *statement = nullptr;
546 int errCode = PrepareForSyncDataByTime(begin, end, statement);
547 if (errCode != E_OK) {
548 return errCode;
549 }
550
551 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
552 int ret = E_OK;
553 SQLiteUtils::ResetStatement(statement, true, ret);
554 return CheckCorruptedStatus(errCode);
555 }
556
GetDeletedSyncDataByTimestamp(std::vector<DataItem> & dataItems,size_t appendLength,Timestamp begin,Timestamp end,const DataSizeSpecInfo & dataSizeInfo) const557 int SQLiteSingleVerStorageExecutor::GetDeletedSyncDataByTimestamp(std::vector<DataItem> &dataItems, size_t appendLength,
558 Timestamp begin, Timestamp end, const DataSizeSpecInfo &dataSizeInfo) const
559 {
560 sqlite3_stmt *statement = nullptr;
561 int errCode = PrepareForSyncDataByTime(begin, end, statement, true);
562 if (errCode != E_OK) {
563 return errCode;
564 }
565
566 errCode = GetSyncDataItems(dataItems, statement, appendLength, dataSizeInfo);
567 int ret = E_OK;
568 SQLiteUtils::ResetStatement(statement, true, ret);
569 return CheckCorruptedStatus(errCode);
570 }
571
572 namespace {
AppendDataItem(std::vector<DataItem> & dataItems,const DataItem & item,size_t & dataTotalSize,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo)573 int AppendDataItem(std::vector<DataItem> &dataItems, const DataItem &item, size_t &dataTotalSize, size_t appendLength,
574 const DataSizeSpecInfo &dataSizeInfo)
575 {
576 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
577 size_t appendSize = dataTotalSize + SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength);
578 if ((appendSize > dataSizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= dataSizeInfo.packetSize) {
579 return -E_UNFINISHED;
580 }
581 dataItems.push_back(item);
582 dataTotalSize = appendSize;
583 return E_OK;
584 }
585
GetFullDataStatement(sqlite3 * db,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)586 int GetFullDataStatement(sqlite3 *db, const std::pair<Timestamp, Timestamp> &timeRange, sqlite3_stmt *&stmt)
587 {
588 int errCode = SQLiteUtils::GetStatement(db, SELECT_SYNC_MODIFY_SQL, stmt);
589 if (errCode != E_OK) {
590 LOGE("Get statement failed. %d", errCode);
591 return errCode;
592 }
593 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange.first); // 1 : Bind time rang index start
594 if (errCode != E_OK) {
595 LOGE("Bind time range to statement failed. %d", errCode);
596 goto ERR;
597 }
598 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange.second); // 2 : Bind time rang index end
599 if (errCode != E_OK) {
600 LOGE("Bind time range to statement failed. %d", errCode);
601 goto ERR;
602 }
603 return E_OK; // do not release statement when success
604 ERR:
605 int ret = E_OK;
606 SQLiteUtils::ResetStatement(stmt, true, ret);
607 return errCode;
608 }
609
GetQueryDataStatement(sqlite3 * db,QueryObject query,const std::pair<Timestamp,Timestamp> & timeRange,sqlite3_stmt * & stmt)610 int GetQueryDataStatement(sqlite3 *db, QueryObject query, const std::pair<Timestamp, Timestamp> &timeRange,
611 sqlite3_stmt *&stmt)
612 {
613 int errCode = E_OK;
614 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
615 if (errCode != E_OK) {
616 return errCode;
617 }
618 return helper.GetQuerySyncStatement(db, timeRange.first, timeRange.second, stmt);
619 }
620
GetNextDataItem(sqlite3_stmt * stmt,bool isMemDB,DataItem & item)621 int GetNextDataItem(sqlite3_stmt *stmt, bool isMemDB, DataItem &item)
622 {
623 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
624 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
625 errCode = GetDataItemForSync(stmt, item);
626 }
627 return errCode;
628 }
629 }
630
GetSyncDataWithQuery(const QueryObject & query,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,const std::pair<Timestamp,Timestamp> & timeRange,std::vector<DataItem> & dataItems) const631 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(const QueryObject &query, size_t appendLength,
632 const DataSizeSpecInfo &dataSizeInfo, const std::pair<Timestamp, Timestamp> &timeRange,
633 std::vector<DataItem> &dataItems) const
634 {
635 sqlite3_stmt *fullStmt = nullptr; // statement for get all modified data in the time range
636 sqlite3_stmt *queryStmt = nullptr; // statement for get modified data which is matched query in the time range
637 int errCode = GetQueryDataStatement(dbHandle_, query, timeRange, queryStmt);
638 if (errCode != E_OK) {
639 LOGE("Get query matched data statement failed. %d", errCode);
640 goto END;
641 }
642 if (query.IsQueryOnlyByKey()) {
643 // Query sync by prefixKey only should not deal with REMOTE_DEVICE_DATA_MISS_QUERY. Get the data directly.
644 errCode = GetSyncDataItems(dataItems, queryStmt, appendLength, dataSizeInfo);
645 goto END;
646 }
647 errCode = GetFullDataStatement(dbHandle_, timeRange, fullStmt);
648 if (errCode != E_OK) {
649 LOGE("Get full changed data statement failed. %d", errCode);
650 goto END;
651 }
652 errCode = GetSyncDataWithQuery(fullStmt, queryStmt, appendLength, dataSizeInfo, dataItems);
653 if (errCode != E_OK && errCode != -E_UNFINISHED && errCode != -E_FINISHED) {
654 LOGE("Get sync data with query failed. %d", errCode);
655 }
656 END:
657 int ret = E_OK;
658 SQLiteUtils::ResetStatement(fullStmt, true, ret);
659 SQLiteUtils::ResetStatement(queryStmt, true, ret);
660 return CheckCorruptedStatus(errCode);
661 }
662
GetSyncDataWithQuery(sqlite3_stmt * fullStmt,sqlite3_stmt * queryStmt,size_t appendLength,const DataSizeSpecInfo & dataSizeInfo,std::vector<DataItem> & dataItems) const663 int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(sqlite3_stmt *fullStmt, sqlite3_stmt *queryStmt,
664 size_t appendLength, const DataSizeSpecInfo &dataSizeInfo, std::vector<DataItem> &dataItems) const
665 {
666 int errCode = E_OK;
667 size_t dataTotalSize = 0;
668 DataItem fullItem;
669 DataItem matchItem;
670 bool isFullItemFinished = false;
671 bool isMatchItemFinished = false;
672 while (!isFullItemFinished || !isMatchItemFinished) {
673 errCode = GetNextDataItem(queryStmt, isMemDb_, matchItem);
674 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // query finished
675 isMatchItemFinished = true;
676 } else if (errCode != E_OK) { // step failed or get data failed
677 LOGE("Get next query matched data failed. %d", errCode);
678 return errCode;
679 }
680 while (!isFullItemFinished) {
681 errCode = GetNextDataItem(fullStmt, isMemDb_, fullItem);
682 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // queryStmt is a subset of fullStmt
683 isFullItemFinished = true;
684 break;
685 } else if (errCode != E_OK) { // step failed or get data failed
686 LOGE("Get next changed data failed. %d", errCode);
687 return errCode;
688 }
689 bool matchData = true;
690 if (isMatchItemFinished || matchItem.key != fullItem.key) {
691 matchData = false; // got miss query data
692 DBCommon::CalcValueHash(fullItem.key, fullItem.key); // set and send key with hash_key
693 Value().swap(fullItem.value); // not send value when data miss query
694 fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; // mark with miss query flag
695 }
696 errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo);
697 if (errCode == -E_UNFINISHED) {
698 goto END;
699 }
700 if (matchData) {
701 break; // step to next match data
702 }
703 }
704 }
705 END:
706 LOGD("Get sync data finished, size of packet:%zu, number of item:%zu", dataTotalSize, dataItems.size());
707 return (isFullItemFinished && isMatchItemFinished) ? -E_FINISHED : errCode;
708 }
709
OpenResultSet(const Key & keyPrefix,int & count)710 int SQLiteSingleVerStorageExecutor::OpenResultSet(const Key &keyPrefix, int &count)
711 {
712 sqlite3_stmt *countStatement = nullptr;
713 if (InitResultSet(keyPrefix, countStatement) != E_OK) {
714 LOGE("Initialize result set stat failed.");
715 return -E_INVALID_DB;
716 }
717
718 int errCode = StartTransaction(TransactType::DEFERRED);
719 if (errCode != E_OK) {
720 goto END;
721 }
722
723 // get count value
724 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
725 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
726 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
727 if (readCount > INT32_MAX) {
728 LOGW("total count is beyond the max count");
729 count = 0;
730 errCode = -E_UNEXPECTED_DATA;
731 } else {
732 count = static_cast<int>(readCount);
733 errCode = E_OK;
734 }
735 LOGD("Entry count in this result set is %d", count);
736 } else {
737 errCode = -E_UNEXPECTED_DATA;
738 }
739
740 END:
741 int ret = E_OK;
742 SQLiteUtils::ResetStatement(countStatement, true, ret);
743 if (errCode != E_OK) {
744 CloseResultSet();
745 }
746 return CheckCorruptedStatus(errCode);
747 }
748
OpenResultSet(QueryObject & queryObj,int & count)749 int SQLiteSingleVerStorageExecutor::OpenResultSet(QueryObject &queryObj, int &count)
750 {
751 sqlite3_stmt *countStatement = nullptr;
752 int errCode = InitResultSet(queryObj, countStatement);
753 if (errCode != E_OK) {
754 LOGE("Initialize result set stat failed.");
755 return errCode;
756 }
757
758 errCode = StartTransaction(TransactType::DEFERRED);
759 if (errCode != E_OK) {
760 goto END;
761 }
762
763 // get count value
764 errCode = SQLiteUtils::StepWithRetry(countStatement, isMemDb_);
765 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
766 uint64_t readCount = static_cast<uint64_t>(sqlite3_column_int64(countStatement, 0));
767 if (queryObj.HasLimit()) {
768 int limit = 0;
769 int offset = 0;
770 queryObj.GetLimitVal(limit, offset);
771 offset = (offset < 0) ? 0 : offset;
772 limit = (limit < 0) ? 0 : limit;
773 if (readCount <= static_cast<uint64_t>(offset)) {
774 readCount = 0;
775 } else {
776 readCount = std::min(readCount - offset, static_cast<uint64_t>(limit));
777 }
778 }
779
780 if (readCount > INT32_MAX) {
781 LOGW("total count is beyond the max count");
782 count = 0;
783 errCode = -E_UNEXPECTED_DATA;
784 } else {
785 count = static_cast<int>(readCount);
786 errCode = E_OK;
787 }
788 LOGD("Entry count in this result set is %d", count);
789 } else {
790 errCode = -E_UNEXPECTED_DATA;
791 }
792
793 END:
794 int ret = E_OK;
795 SQLiteUtils::ResetStatement(countStatement, true, ret);
796 if (errCode != E_OK) {
797 CloseResultSet();
798 }
799 return CheckCorruptedStatus(errCode);
800 }
801
OpenResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)802 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(const Key &keyPrefix,
803 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
804 {
805 if (dbHandle_ == nullptr) {
806 return -E_INVALID_DB;
807 }
808 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
809 if (errCode != E_OK) {
810 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Get rowId stmt fail, errCode=%d", errCode);
811 return CheckCorruptedStatus(errCode);
812 }
813 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument index is 1
814 int ret = E_OK;
815 if (errCode != E_OK) {
816 LOGE("[SqlSinExe][OpenResSetRowId][PrefixKey] Bind rowid stmt fail, errCode=%d", errCode);
817 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
818 return CheckCorruptedStatus(errCode);
819 }
820 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
821 if (errCode != E_OK) {
822 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
823 }
824 return errCode;
825 }
826
OpenResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)827 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdMode(QueryObject &queryObj,
828 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, int &count)
829 {
830 if (dbHandle_ == nullptr) {
831 return -E_INVALID_DB;
832 }
833
834 int errCode = E_OK;
835 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
836 if (errCode != E_OK) {
837 return errCode;
838 }
839
840 if (!queryObj.IsValid()) {
841 LOGE("[SqlSinExe][OpenResSetRowId][Query] query object not Valid");
842 return -E_INVALID_QUERY_FORMAT;
843 }
844
845 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
846 int ret = E_OK;
847 if (errCode != E_OK) {
848 LOGE("[SqlSinExe][OpenResSetRowId][Query] Get Stmt fail, errCode=%d", errCode);
849 // The GetQuerySqlStatement does not self rollback(BAD...), so we have to reset the stmt here.
850 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
851 return errCode;
852 }
853 errCode = OpenResultSetForCacheRowIdModeCommon(rowIdCache, cacheLimit, count);
854 if (errCode != E_OK) {
855 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
856 }
857 return errCode;
858 }
859
ReloadResultSet(const Key & keyPrefix)860 int SQLiteSingleVerStorageExecutor::ReloadResultSet(const Key &keyPrefix)
861 {
862 int errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, SELECT_SYNC_ROWID_PREFIX_SQL);
863 if (errCode != E_OK) {
864 return CheckCorruptedStatus(errCode);
865 }
866
867 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
868 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
869 if (errCode != E_OK) {
870 LOGE("Rebind result set rowid statement of keyPrefix error:%d", errCode);
871 return CheckCorruptedStatus(errCode);
872 }
873 return E_OK;
874 }
875
ReloadResultSet(QueryObject & queryObj)876 int SQLiteSingleVerStorageExecutor::ReloadResultSet(QueryObject &queryObj)
877 {
878 int errCode = E_OK;
879 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
880 if (errCode != E_OK) {
881 return errCode;
882 }
883
884 if (!queryObj.IsValid()) {
885 return -E_INVALID_QUERY_FORMAT;
886 }
887
888 std::string sql;
889 errCode = helper.GetQuerySql(sql, true); // only rowid sql
890 if (errCode != E_OK) {
891 return errCode;
892 }
893
894 errCode = ResetOrRegetStmt(dbHandle_, getResultRowIdStatement_, sql);
895 if (errCode != E_OK) {
896 return CheckCorruptedStatus(errCode);
897 }
898
899 // No need to reset getResultEntryStatement_. Because the binding of it will be cleared in each get operation
900 // GetQuerySqlStatement will not alter getResultRowIdStatement_ if it is not null
901 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
902 if (errCode != E_OK) {
903 LOGE("Rebind result set rowid statement of query error:%d", errCode);
904 return CheckCorruptedStatus(errCode);
905 }
906 return E_OK;
907 }
908
ReloadResultSetForCacheRowIdMode(const Key & keyPrefix,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)909 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(const Key &keyPrefix,
910 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
911 {
912 int errCode = ReloadResultSet(keyPrefix); // Reuse this function(A convenience)
913 if (errCode != E_OK) {
914 return errCode;
915 }
916 int count = 0; // Ignored
917 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
918 if (errCode != E_OK) {
919 LOGE("[SqlSinExe][ReloadResSet][KeyPrefix] Load fail, errCode=%d", errCode);
920 }
921 // We can just return, no need to reset the statement
922 return errCode;
923 }
924
ReloadResultSetForCacheRowIdMode(QueryObject & queryObj,std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos)925 int SQLiteSingleVerStorageExecutor::ReloadResultSetForCacheRowIdMode(QueryObject &queryObj,
926 std::vector<int64_t> &rowIdCache, uint32_t cacheLimit, uint32_t cacheStartPos)
927 {
928 int errCode = ReloadResultSet(queryObj); // Reuse this function(A convenience)
929 if (errCode != E_OK) {
930 return errCode;
931 }
932 int count = 0; // Ignored
933 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, cacheStartPos, count);
934 if (errCode != E_OK) {
935 LOGE("[SqlSinExe][ReloadResSet][Query] Load fail, errCode=%d", errCode);
936 }
937 // We can just return, no need to reset the statement
938 return errCode;
939 }
940
GetNextEntryFromResultSet(Key & key,Value & value,bool isCopy)941 int SQLiteSingleVerStorageExecutor::GetNextEntryFromResultSet(Key &key, Value &value, bool isCopy)
942 {
943 if (getResultRowIdStatement_ == nullptr || getResultEntryStatement_ == nullptr) {
944 return -E_RESULT_SET_STATUS_INVALID;
945 }
946
947 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
948 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
949 if (!isCopy) {
950 return E_OK;
951 }
952 int64_t rowId = sqlite3_column_int64(getResultRowIdStatement_, 0);
953 errCode = E_OK;
954 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
955 if (errCode != E_OK) {
956 LOGE("[SqlSinExe][GetNext] Reset result set entry statement fail, errCode=%d.", errCode);
957 return CheckCorruptedStatus(errCode);
958 }
959
960 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
961 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
962 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
963 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, key);
964 if (errCode != E_OK) {
965 LOGE("[SqlSinExe][GetNext] Get key failed:%d", errCode);
966 return CheckCorruptedStatus(errCode);
967 }
968 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, value);
969 if (errCode != E_OK) {
970 LOGE("[SqlSinExe][GetNext] Get value failed:%d", errCode);
971 return CheckCorruptedStatus(errCode);
972 }
973 return E_OK;
974 } else {
975 return -E_UNEXPECTED_DATA;
976 }
977 }
978 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
979 return -E_FINISHED;
980 }
981
982 LOGE("SQLite step failed:%d", errCode);
983 return CheckCorruptedStatus(errCode);
984 }
985
GetEntryByRowId(int64_t rowId,Entry & entry)986 int SQLiteSingleVerStorageExecutor::GetEntryByRowId(int64_t rowId, Entry &entry)
987 {
988 if (getResultEntryStatement_ == nullptr) {
989 return -E_RESULT_SET_STATUS_INVALID;
990 }
991 int errCode = E_OK;
992 SQLiteUtils::ResetStatement(getResultEntryStatement_, false, errCode);
993 if (errCode != E_OK) {
994 LOGE("[SqlSinExe][GetEntryByRowid] Reset result set entry statement fail, errCode=%d.", errCode);
995 return CheckCorruptedStatus(errCode);
996 }
997 SQLiteUtils::BindInt64ToStatement(getResultEntryStatement_, 1, rowId);
998 errCode = SQLiteUtils::StepWithRetry(getResultEntryStatement_, isMemDb_);
999 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1000 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 0, entry.key);
1001 if (errCode != E_OK) {
1002 LOGE("[SqlSinExe][GetEntryByRowid] Get key failed, errCode=%d.", errCode);
1003 return CheckCorruptedStatus(errCode);
1004 }
1005 errCode = SQLiteUtils::GetColumnBlobValue(getResultEntryStatement_, 1, entry.value);
1006 if (errCode != E_OK) {
1007 LOGE("[SqlSinExe][GetEntryByRowid] Get value failed, errCode=%d.", errCode);
1008 return CheckCorruptedStatus(errCode);
1009 }
1010 return E_OK;
1011 } else {
1012 LOGE("[SqlSinExe][GetEntryByRowid] Step failed, errCode=%d.", errCode);
1013 return -E_UNEXPECTED_DATA;
1014 }
1015 }
1016
CloseResultSet()1017 void SQLiteSingleVerStorageExecutor::CloseResultSet()
1018 {
1019 int errCode = E_OK;
1020 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
1021 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1022 SetCorruptedStatus();
1023 }
1024 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
1025 if (errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) {
1026 SetCorruptedStatus();
1027 }
1028 if (isTransactionOpen_) {
1029 SQLiteUtils::RollbackTransaction(dbHandle_);
1030 isTransactionOpen_ = false;
1031 }
1032 }
1033
StartTransaction(TransactType type)1034 int SQLiteSingleVerStorageExecutor::StartTransaction(TransactType type)
1035 {
1036 if (dbHandle_ == nullptr) {
1037 LOGE("Begin transaction failed, dbHandle is null.");
1038 return -E_INVALID_DB;
1039 }
1040 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
1041 if (errCode == E_OK) {
1042 isTransactionOpen_ = true;
1043 } else {
1044 LOGE("Begin transaction failed, errCode = %d", errCode);
1045 }
1046 return CheckCorruptedStatus(errCode);
1047 }
1048
Commit()1049 int SQLiteSingleVerStorageExecutor::Commit()
1050 {
1051 if (dbHandle_ == nullptr) {
1052 return -E_INVALID_DB;
1053 }
1054 int errCode = SQLiteUtils::CommitTransaction(dbHandle_);
1055 if (errCode != E_OK) {
1056 LOGE("sqlite single ver storage executor commit fail! errCode = [%d]", errCode);
1057 return CheckCorruptedStatus(errCode);
1058 }
1059 isTransactionOpen_ = false;
1060 return E_OK;
1061 }
1062
Rollback()1063 int SQLiteSingleVerStorageExecutor::Rollback()
1064 {
1065 if (dbHandle_ == nullptr) {
1066 return -E_INVALID_DB;
1067 }
1068 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
1069 if (errCode != E_OK) {
1070 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
1071 return CheckCorruptedStatus(errCode);
1072 }
1073 isTransactionOpen_ = false;
1074 return E_OK;
1075 }
1076
CheckIfKeyExisted(const Key & key,bool isLocal,Value & value,Timestamp & timestamp) const1077 bool SQLiteSingleVerStorageExecutor::CheckIfKeyExisted(const Key &key, bool isLocal,
1078 Value &value, Timestamp ×tamp) const
1079 {
1080 // not local value, no need to get the value.
1081 if (!isLocal) {
1082 return false;
1083 }
1084
1085 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1086 if (errCode != E_OK) {
1087 return false;
1088 }
1089 return true;
1090 }
1091
GetDeviceIdentifier(PragmaEntryDeviceIdentifier * identifier)1092 int SQLiteSingleVerStorageExecutor::GetDeviceIdentifier(PragmaEntryDeviceIdentifier *identifier)
1093 {
1094 if (identifier == nullptr) {
1095 return -E_INVALID_ARGS;
1096 }
1097
1098 if (dbHandle_ == nullptr) {
1099 return -E_INVALID_DB;
1100 }
1101
1102 sqlite3_stmt *statement = nullptr;
1103 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ENTRY_DEVICE, statement);
1104 if (errCode != E_OK) {
1105 return errCode;
1106 }
1107
1108 int keyIndex = identifier->origDevice ? BIND_ORI_DEVICE_ID : BIND_PRE_DEVICE_ID;
1109 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_KV_KEY_INDEX, identifier->key, false);
1110 if (errCode != E_OK) {
1111 goto END;
1112 }
1113
1114 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1115 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1116 std::vector<uint8_t> deviceId;
1117 errCode = SQLiteUtils::GetColumnBlobValue(statement, keyIndex, deviceId);
1118 identifier->deviceIdentifier.assign(deviceId.begin(), deviceId.end());
1119 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1120 errCode = -E_NOT_FOUND;
1121 }
1122
1123 END:
1124 int ret = E_OK;
1125 SQLiteUtils::ResetStatement(statement, true, ret);
1126 return CheckCorruptedStatus(errCode);
1127 }
1128
PutIntoCommittedData(const DataItem & itemPut,const DataItem & itemGet,const DataOperStatus & status,SingleVerNaturalStoreCommitNotifyData * committedData)1129 void SQLiteSingleVerStorageExecutor::PutIntoCommittedData(const DataItem &itemPut, const DataItem &itemGet,
1130 const DataOperStatus &status, SingleVerNaturalStoreCommitNotifyData *committedData)
1131 {
1132 if (committedData == nullptr) {
1133 return;
1134 }
1135
1136 Entry entry;
1137 int errCode;
1138 if (!status.isDeleted) {
1139 entry.key = itemPut.key;
1140 entry.value = itemPut.value;
1141 DataType dataType = (status.preStatus == DataStatus::EXISTED) ? DataType::UPDATE : DataType::INSERT;
1142 errCode = committedData->InsertCommittedData(std::move(entry), dataType, true);
1143 } else {
1144 entry.key = itemGet.key;
1145 entry.value = itemGet.value;
1146 errCode = committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1147 }
1148
1149 if (errCode != E_OK) {
1150 LOGE("[SingleVerExe][PutCommitData]Insert failed:%d", errCode);
1151 }
1152 }
1153
PrepareForSavingData(const std::string & readSql,const std::string & insertSql,const std::string & updateSql,SaveRecordStatements & statements) const1154 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(const std::string &readSql, const std::string &insertSql,
1155 const std::string &updateSql, SaveRecordStatements &statements) const
1156 {
1157 int errCode = SQLiteUtils::GetStatement(dbHandle_, readSql, statements.queryStatement);
1158 if (errCode != E_OK) {
1159 LOGE("Get query statement failed. errCode = [%d]", errCode);
1160 goto ERR;
1161 }
1162
1163 errCode = SQLiteUtils::GetStatement(dbHandle_, insertSql, statements.insertStatement);
1164 if (errCode != E_OK) {
1165 LOGE("Get insert statement failed. errCode = [%d]", errCode);
1166 goto ERR;
1167 }
1168
1169 errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, statements.updateStatement);
1170 if (errCode != E_OK) {
1171 LOGE("Get update statement failed. errCode = [%d]", errCode);
1172 goto ERR;
1173 }
1174 return E_OK;
1175 ERR:
1176 (void)statements.ResetStatement();
1177 return errCode;
1178 }
1179
PrepareForSavingData(SingleVerDataType type)1180 int SQLiteSingleVerStorageExecutor::PrepareForSavingData(SingleVerDataType type)
1181 {
1182 int errCode = -E_NOT_SUPPORT;
1183 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1184 // currently, Local type has not been optimized, so pass updateSql parameter with INSERT_LOCAL_SQL
1185 errCode = PrepareForSavingData(SELECT_LOCAL_HASH_SQL, INSERT_LOCAL_SQL, INSERT_LOCAL_SQL, saveLocalStatements_);
1186 } else if (type == SingleVerDataType::SYNC_TYPE) {
1187 errCode = PrepareForSavingData(SELECT_SYNC_HASH_SQL, INSERT_SYNC_SQL, UPDATE_SYNC_SQL, saveSyncStatements_);
1188 }
1189 return CheckCorruptedStatus(errCode);
1190 }
1191
ResetForSavingData(SingleVerDataType type)1192 int SQLiteSingleVerStorageExecutor::ResetForSavingData(SingleVerDataType type)
1193 {
1194 int errCode = E_OK;
1195 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
1196 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
1197 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
1198 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
1199 } else if (type == SingleVerDataType::SYNC_TYPE) {
1200 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1201 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1202 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1203 }
1204 return CheckCorruptedStatus(errCode);
1205 }
1206
GetOriginDevName(const DataItem & dataItem,const std::string & origDevGet)1207 std::string SQLiteSingleVerStorageExecutor::GetOriginDevName(const DataItem &dataItem,
1208 const std::string &origDevGet)
1209 {
1210 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
1211 return origDevGet;
1212 }
1213 return dataItem.origDev;
1214 }
1215
SaveSyncDataToDatabase(const DataItem & dataItem,const Key & hashKey,const std::string & origDev,const std::string & deviceName,bool isUpdate)1216 int SQLiteSingleVerStorageExecutor::SaveSyncDataToDatabase(const DataItem &dataItem, const Key &hashKey,
1217 const std::string &origDev, const std::string &deviceName, bool isUpdate)
1218 {
1219 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1220 LOGD("Find query data missing, erase local data.");
1221 return EraseSyncData(hashKey);
1222 }
1223 auto statement = saveSyncStatements_.GetDataSaveStatement(isUpdate);
1224 if (statement == nullptr) {
1225 return -E_INVALID_ARGS;
1226 }
1227
1228 std::string devName = DBCommon::TransferHashString(deviceName);
1229 int errCode = BindSavedSyncData(statement, dataItem, hashKey, {origDev, devName}, isUpdate);
1230 if (errCode != E_OK) {
1231 return errCode;
1232 }
1233
1234 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1235 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1236 errCode = E_OK;
1237 }
1238 if (errCode == E_OK) {
1239 errCode = RemoveCloudUploadFlag(hashKey);
1240 }
1241 return errCode;
1242 }
1243
JudgeSyncSaveType(DataItem & dataItem,const DataItem & itemGet,const DeviceInfo & deviceInfo,bool isHashKeyExisted,bool isPermitForceWrite)1244 DataOperStatus SQLiteSingleVerStorageExecutor::JudgeSyncSaveType(DataItem &dataItem,
1245 const DataItem &itemGet, const DeviceInfo &deviceInfo, bool isHashKeyExisted, bool isPermitForceWrite)
1246 {
1247 DataOperStatus status;
1248 status.isDeleted = ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1249 (dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY);
1250 if (isHashKeyExisted) {
1251 if ((itemGet.flag & DataItem::DELETE_FLAG) != 0) {
1252 status.preStatus = DataStatus::DELETED;
1253 } else {
1254 status.preStatus = DataStatus::EXISTED;
1255 }
1256 std::string deviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1257 if (itemGet.writeTimestamp >= dataItem.writeTimestamp) {
1258 // for multi user mode, no permit to forcewrite
1259 if (((!deviceName.empty()) && IsFromDataOwner(itemGet, deviceName) && isPermitForceWrite) ||
1260 deviceInfo.isLocal) {
1261 LOGI("Force overwrite the data:%" PRIu64 " vs %" PRIu64 " isLocal %d",
1262 itemGet.writeTimestamp, dataItem.writeTimestamp, static_cast<int>(deviceInfo.isLocal));
1263 status.isDefeated = false;
1264 dataItem.writeTimestamp = itemGet.writeTimestamp + 1;
1265 dataItem.timestamp = itemGet.timestamp;
1266 } else {
1267 status.isDefeated = true;
1268 }
1269 }
1270 }
1271 return status;
1272 }
1273
GetSyncDataItemExt(const DataItem & dataItem,DataItem & itemGet,const DataOperStatus & dataStatus) const1274 int SQLiteSingleVerStorageExecutor::GetSyncDataItemExt(const DataItem &dataItem, DataItem &itemGet,
1275 const DataOperStatus &dataStatus) const
1276 {
1277 if (dataStatus.preStatus != DataStatus::EXISTED) {
1278 return E_OK;
1279 }
1280 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1281 // only deleted item need origin value.
1282 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1283 if (errCode != E_OK) {
1284 return errCode;
1285 }
1286
1287 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, itemGet.value);
1288 if (errCode != E_OK) {
1289 LOGE("Get column value data failed:%d", errCode);
1290 }
1291
1292 return errCode;
1293 }
1294
ResetSaveSyncStatements(int errCode)1295 int SQLiteSingleVerStorageExecutor::ResetSaveSyncStatements(int errCode)
1296 {
1297 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
1298 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
1299 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
1300 return CheckCorruptedStatus(errCode);
1301 }
1302
1303 namespace {
IsNeedIgnoredData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & devInfo,bool isHashKeyExisted,int policy)1304 inline bool IsNeedIgnoredData(const DataItem &itemPut, const DataItem &itemGet,
1305 const DeviceInfo &devInfo, bool isHashKeyExisted, int policy)
1306 {
1307 // deny the data synced from other dev which the origin dev is current or the existed value is current dev data.
1308 return (((itemGet.origDev.empty() && isHashKeyExisted) || itemPut.origDev.empty()) &&
1309 (!devInfo.isLocal && policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA));
1310 }
1311 }
1312
PrepareForNotifyConflictAndObserver(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)1313 int SQLiteSingleVerStorageExecutor::PrepareForNotifyConflictAndObserver(DataItem &dataItem,
1314 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
1315 {
1316 // Check sava data existed info
1317 int errCode = GetSyncDataItemPre(dataItem, notify.getData, notify.hashKey);
1318 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1319 LOGD("[SingleVerExe][PrepareForNotifyConflictAndObserver] failed:%d", errCode);
1320 if (isSyncMigrating_) {
1321 ResetForMigrateCacheData();
1322 return errCode;
1323 }
1324 return ResetSaveSyncStatements(errCode);
1325 }
1326
1327 bool isHashKeyExisted = (errCode != -E_NOT_FOUND);
1328 if (IsNeedIgnoredData(dataItem, notify.getData, deviceInfo, isHashKeyExisted, conflictResolvePolicy_)) {
1329 LOGD("[SingleVerExe] Ignore the sync data.");
1330 if (isSyncMigrating_) {
1331 ResetForMigrateCacheData();
1332 return -E_IGNORE_DATA;
1333 }
1334 return ResetSaveSyncStatements(-E_IGNORE_DATA);
1335 }
1336
1337 notify.dataStatus = JudgeSyncSaveType(dataItem, notify.getData, deviceInfo, isHashKeyExisted,
1338 isPermitForceWrite);
1339 InitCommitNotifyDataKeyStatus(notify.committedData, notify.hashKey, notify.dataStatus);
1340
1341 // Nonexistent data, but deleted by local.
1342 if ((notify.dataStatus.preStatus == DataStatus::DELETED || notify.dataStatus.preStatus == DataStatus::NOEXISTED) &&
1343 (dataItem.flag & DataItem::DELETE_FLAG) != 0 &&
1344 (dataItem.flag & DataItem::LOCAL_FLAG) != 0) {
1345 // For delete item in cacheDB, which not in mainDB. Cannot notify, but this is not error.
1346 errCode = -E_NOT_FOUND;
1347 LOGD("Nonexistent data, but deleted by local");
1348 if (isSyncMigrating_) {
1349 ResetForMigrateCacheData();
1350 return errCode;
1351 }
1352 return ResetSaveSyncStatements(errCode);
1353 }
1354
1355 // get key and value from ori database
1356 errCode = GetSyncDataItemExt(dataItem, notify.getData, notify.dataStatus);
1357 if (errCode != E_OK) {
1358 LOGD("GetSyncDataItemExt failed:%d", errCode);
1359 if (isSyncMigrating_) {
1360 ResetForMigrateCacheData();
1361 return errCode;
1362 }
1363 return ResetSaveSyncStatements(errCode);
1364 }
1365
1366 return E_OK;
1367 }
1368
SaveSyncDataItem(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,SingleVerNaturalStoreCommitNotifyData * committedData,bool isPermitForceWrite)1369 int SQLiteSingleVerStorageExecutor::SaveSyncDataItem(DataItem &dataItem, const DeviceInfo &deviceInfo,
1370 Timestamp &maxStamp, SingleVerNaturalStoreCommitNotifyData *committedData, bool isPermitForceWrite)
1371 {
1372 NotifyConflictAndObserverData notify = {
1373 .committedData = committedData
1374 };
1375
1376 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
1377 if (errCode != E_OK) {
1378 if (errCode == -E_IGNORE_DATA) {
1379 errCode = E_OK;
1380 }
1381 return errCode;
1382 }
1383
1384 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, committedData);
1385 if (notify.dataStatus.isDefeated) {
1386 LOGE("Data status is defeated:%d", errCode);
1387 return ResetSaveSyncStatements(errCode);
1388 }
1389
1390 bool isUpdate = (notify.dataStatus.preStatus != DataStatus::NOEXISTED);
1391 std::string origDev = GetOriginDevName(dataItem, notify.getData.origDev);
1392 errCode = SaveSyncDataToDatabase(dataItem, notify.hashKey, origDev, deviceInfo.deviceName, isUpdate);
1393 if (errCode == E_OK) {
1394 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, committedData);
1395 maxStamp = std::max(dataItem.timestamp, maxStamp);
1396 } else {
1397 LOGE("Save sync data to db failed:%d", errCode);
1398 }
1399 return ResetSaveSyncStatements(errCode);
1400 }
1401
GetAllMetaKeys(std::vector<Key> & keys) const1402 int SQLiteSingleVerStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
1403 {
1404 sqlite3_stmt *statement = nullptr;
1405 const std::string &sqlStr = (attachMetaMode_ ? SELECT_ATTACH_ALL_META_KEYS : SELECT_ALL_META_KEYS);
1406 int errCode = SQLiteUtils::GetStatement(dbHandle_, sqlStr, statement);
1407 if (errCode != E_OK) {
1408 LOGE("[SingleVerExe][GetAllKey] Get statement failed:%d", errCode);
1409 return errCode;
1410 }
1411
1412 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
1413 int ret = E_OK;
1414 SQLiteUtils::ResetStatement(statement, true, ret);
1415 return errCode != E_OK ? errCode : ret;
1416 }
1417
GetAllSyncedEntries(const std::string & hashDev,std::vector<Entry> & entries) const1418 int SQLiteSingleVerStorageExecutor::GetAllSyncedEntries(const std::string &hashDev,
1419 std::vector<Entry> &entries) const
1420 {
1421 int errCode = E_OK;
1422 sqlite3_stmt *statement = nullptr;
1423 if (hashDev.empty()) {
1424 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1425 SELECT_ALL_SYNC_ENTRIES_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES);
1426 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1427 if (errCode != E_OK) {
1428 LOGE("Get all entries statement failed:%d", errCode);
1429 return errCode;
1430 }
1431 } else {
1432 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN ?
1433 SELECT_ALL_SYNC_ENTRIES_BY_DEV_FROM_CACHEHANDLE : SELECT_ALL_SYNC_ENTRIES_BY_DEV);
1434 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1435 if (errCode != E_OK) {
1436 LOGE("Get all entries statement failed:%d", errCode);
1437 return errCode;
1438 }
1439
1440 // deviceName always hash string
1441 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
1442 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, devVect, true); // bind the 1st to device.
1443 if (errCode != E_OK) {
1444 LOGE("Failed to bind the synced device for all entries:%d", errCode);
1445 goto END;
1446 }
1447 }
1448
1449 errCode = GetAllEntries(statement, entries);
1450 END:
1451 int ret = E_OK;
1452 SQLiteUtils::ResetStatement(statement, true, ret);
1453 return errCode != E_OK ? errCode : ret;
1454 }
1455
GetAllEntries(sqlite3_stmt * statement,std::vector<Entry> & entries) const1456 int SQLiteSingleVerStorageExecutor::GetAllEntries(sqlite3_stmt *statement, std::vector<Entry> &entries) const
1457 {
1458 if (statement == nullptr) {
1459 return -E_INVALID_DB;
1460 }
1461 int errCode;
1462 do {
1463 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1464 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1465 Entry entry;
1466 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, entry.key); // No.0 is the key
1467 if (errCode != E_OK) {
1468 break;
1469 }
1470 errCode = SQLiteUtils::GetColumnBlobValue(statement, 1, entry.value); // No.1 is the value
1471 if (errCode != E_OK) {
1472 break;
1473 }
1474
1475 entries.push_back(std::move(entry));
1476 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1477 errCode = E_OK;
1478 break;
1479 } else {
1480 LOGE("SQLite step for all entries failed:%d", errCode);
1481 break;
1482 }
1483 } while (true);
1484
1485 return errCode;
1486 }
1487
BindSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,const SyncDataDevices & devices,bool isUpdate)1488 int SQLiteSingleVerStorageExecutor::BindSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1489 const Key &hashKey, const SyncDataDevices &devices, bool isUpdate)
1490 {
1491 const int hashKeyIndex = isUpdate ? BIND_SYNC_UPDATE_HASH_KEY_INDEX : BIND_SYNC_HASH_KEY_INDEX;
1492 int errCode = SQLiteUtils::BindBlobToStatement(statement, hashKeyIndex, hashKey, false);
1493 if (errCode != E_OK) {
1494 LOGE("Bind saved sync data hash key failed:%d", errCode);
1495 return errCode;
1496 }
1497
1498 // if delete flag is set, just use the hash key instead of the key
1499 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1500 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_SYNC_KEY_INDEX, -1));
1501 } else {
1502 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_KEY_INDEX, dataItem.key, false);
1503 }
1504
1505 if (errCode != E_OK) {
1506 LOGE("Bind saved sync data key failed:%d", errCode);
1507 return errCode;
1508 }
1509
1510 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_VAL_INDEX, dataItem.value, true);
1511 if (errCode != E_OK) {
1512 LOGE("Bind saved sync data value failed:%d", errCode);
1513 return errCode;
1514 }
1515
1516 errCode = BindSyncDataTime(statement, dataItem, isUpdate);
1517 if (errCode != E_OK) {
1518 return errCode;
1519 }
1520 return BindDevForSavedSyncData(statement, dataItem, devices.origDev, devices.dev);
1521 }
1522
PutConflictData(const DataItem & itemPut,const DataItem & itemGet,const DeviceInfo & deviceInfo,const DataOperStatus & dataStatus,SingleVerNaturalStoreCommitNotifyData * commitData)1523 void SQLiteSingleVerStorageExecutor::PutConflictData(const DataItem &itemPut, const DataItem &itemGet,
1524 const DeviceInfo &deviceInfo, const DataOperStatus &dataStatus,
1525 SingleVerNaturalStoreCommitNotifyData *commitData)
1526 {
1527 if (commitData == nullptr) {
1528 return;
1529 }
1530
1531 bool conflictNotifyMatch = commitData->IsConflictedNotifyMatched(itemPut, itemGet);
1532 if (!conflictNotifyMatch) {
1533 return;
1534 }
1535
1536 if (dataStatus.preStatus == DataStatus::NOEXISTED ||
1537 ((dataStatus.preStatus == DataStatus::DELETED) && dataStatus.isDeleted)) {
1538 return;
1539 }
1540
1541 Key origKey;
1542 if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1543 (itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
1544 origKey = itemGet.key;
1545 } else {
1546 origKey = itemPut.key;
1547 }
1548
1549 // insert db original entry
1550 std::vector<uint8_t> getDevVect(itemGet.dev.begin(), itemGet.dev.end());
1551 DataItemInfo orgItemInfo = {itemGet, true, getDevVect};
1552 orgItemInfo.dataItem.key = origKey;
1553 commitData->InsertConflictedItem(orgItemInfo, true);
1554
1555 // insert conflict entry
1556 std::string putDeviceName = DBCommon::TransferHashString(deviceInfo.deviceName);
1557 std::vector<uint8_t> putDevVect(putDeviceName.begin(), putDeviceName.end());
1558
1559 DataItemInfo newItemInfo = {itemPut, deviceInfo.isLocal, putDevVect};
1560 newItemInfo.dataItem.key = origKey;
1561 commitData->InsertConflictedItem(newItemInfo, false);
1562 }
1563
Reset()1564 int SQLiteSingleVerStorageExecutor::Reset()
1565 {
1566 if (isTransactionOpen_) {
1567 Rollback();
1568 }
1569
1570 int errCode = ResetForSavingData(SingleVerDataType::SYNC_TYPE);
1571 if (errCode != E_OK) {
1572 LOGE("Finalize the sync resources for saving sync data failed: %d", errCode);
1573 }
1574
1575 errCode = ResetForSavingData(SingleVerDataType::LOCAL_TYPE_SQLITE);
1576 if (errCode != E_OK) {
1577 LOGE("Finalize the local resources for saving sync data failed: %d", errCode);
1578 }
1579 return SQLiteStorageExecutor::Reset();
1580 }
1581
GetSyncDataItemPre(const DataItem & itemPut,DataItem & itemGet,Key & hashKey) const1582 int SQLiteSingleVerStorageExecutor::GetSyncDataItemPre(const DataItem &itemPut, DataItem &itemGet,
1583 Key &hashKey) const
1584 {
1585 if (isSyncMigrating_) {
1586 hashKey = itemPut.hashKey;
1587 } else if ((itemPut.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
1588 ((itemPut.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY)) {
1589 hashKey = itemPut.key;
1590 } else {
1591 int errCode = DBCommon::CalcValueHash(itemPut.key, hashKey);
1592 if (errCode != E_OK) {
1593 return errCode;
1594 }
1595 }
1596
1597 return GetSyncDataPreByHashKey(hashKey, itemGet);
1598 }
1599
GetSyncDataPreByHashKey(const Key & hashKey,DataItem & itemGet) const1600 int SQLiteSingleVerStorageExecutor::GetSyncDataPreByHashKey(const Key &hashKey, DataItem &itemGet) const
1601 {
1602 auto statement = isSyncMigrating_ ? migrateSyncStatements_.queryStatement : saveSyncStatements_.queryStatement;
1603 int errCode = SQLiteUtils::BindBlobToStatement(statement, 1, hashKey, false); // 1st arg.
1604 if (errCode != E_OK) {
1605 return errCode;
1606 }
1607
1608 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1609 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // no find the key
1610 errCode = -E_NOT_FOUND;
1611 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1612 itemGet.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1613 itemGet.writeTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1614 itemGet.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1615 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, itemGet.key);
1616 if (errCode != E_OK) {
1617 return errCode;
1618 }
1619 std::vector<uint8_t> devVect;
1620 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1621 if (errCode != E_OK) {
1622 return errCode;
1623 }
1624
1625 std::vector<uint8_t> origDevVect;
1626 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, origDevVect);
1627 if (errCode != E_OK) {
1628 return errCode;
1629 }
1630 itemGet.dev.assign(devVect.begin(), devVect.end());
1631 itemGet.origDev.assign(origDevVect.begin(), origDevVect.end());
1632 }
1633 return errCode;
1634 }
1635
DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData * committedData,const Key & key,const Value & value)1636 int SQLiteSingleVerStorageExecutor::DeleteLocalDataInner(SingleVerNaturalStoreCommitNotifyData *committedData,
1637 const Key &key, const Value &value)
1638 {
1639 if (committedData != nullptr) {
1640 Key hashKey;
1641 int innerErrCode = DBCommon::CalcValueHash(key, hashKey);
1642 if (innerErrCode != E_OK) {
1643 return innerErrCode;
1644 }
1645 committedData->InitKeyPropRecord(hashKey, ExistStatus::EXIST);
1646 }
1647
1648 std::string sql = DELETE_LOCAL_SQL;
1649 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
1650 sql = DELETE_LOCAL_SQL_FROM_CACHEHANDLE;
1651 }
1652 sqlite3_stmt *statement = nullptr;
1653 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
1654 if (errCode != E_OK) {
1655 goto ERROR;
1656 }
1657
1658 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);
1659 if (errCode != E_OK) {
1660 LOGE("Bind the key error(%d) when delete kv data.", errCode);
1661 goto ERROR;
1662 }
1663
1664 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1665 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1666 if (sqlite3_changes(dbHandle_) > 0) {
1667 if (committedData != nullptr) {
1668 Entry entry = {key, value};
1669 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, true);
1670 } else {
1671 LOGE("DeleteLocalKvData failed to do commit notify because of OOM.");
1672 }
1673 errCode = E_OK;
1674 }
1675 }
1676
1677 ERROR:
1678 int ret = E_OK;
1679 SQLiteUtils::ResetStatement(statement, true, ret);
1680 return CheckCorruptedStatus(errCode);
1681 }
1682
DeleteLocalKvData(const Key & key,SingleVerNaturalStoreCommitNotifyData * committedData,Value & value,Timestamp & timestamp)1683 int SQLiteSingleVerStorageExecutor::DeleteLocalKvData(const Key &key,
1684 SingleVerNaturalStoreCommitNotifyData *committedData, Value &value, Timestamp ×tamp)
1685 {
1686 int errCode = GetKvData(SingleVerDataType::LOCAL_TYPE_SQLITE, key, value, timestamp);
1687 if (errCode != E_OK) {
1688 return CheckCorruptedStatus(errCode);
1689 }
1690
1691 return DeleteLocalDataInner(committedData, key, value);
1692 }
1693
EraseSyncData(const Key & hashKey)1694 int SQLiteSingleVerStorageExecutor::EraseSyncData(const Key &hashKey)
1695 {
1696 sqlite3_stmt *stmt = nullptr;
1697 std::string sql = (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
1698 DELETE_SYNC_DATA_WITH_HASHKEY_FROM_CACHEHANDLE : DELETE_SYNC_DATA_WITH_HASHKEY;
1699 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1700 if (errCode != E_OK) {
1701 LOGE("get erase statement failed:%d", errCode);
1702 return errCode;
1703 }
1704
1705 errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, hashKey, false);
1706 if (errCode != E_OK) {
1707 LOGE("bind hashKey failed:%d", errCode);
1708 goto END;
1709 }
1710
1711 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1712 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1713 errCode = E_OK;
1714 } else {
1715 LOGE("erase data failed:%d", errCode);
1716 }
1717 END:
1718 int ret = E_OK;
1719 SQLiteUtils::ResetStatement(stmt, true, ret);
1720 return CheckCorruptedStatus(errCode);
1721 }
1722
RemoveDeviceData(const std::string & deviceName)1723 int SQLiteSingleVerStorageExecutor::RemoveDeviceData(const std::string &deviceName)
1724 {
1725 int errCode = E_OK;
1726 bool isCreate = false;
1727 int ret = SQLiteUtils::CheckTableExists(dbHandle_, NATURALBASE_KV_AUX_SYNC_DATA_LOG_TABLE_NAME, isCreate);
1728 bool isTableExists = (ret == E_OK && isCreate);
1729 if (deviceName.empty()) {
1730 if (isTableExists) {
1731 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_ALL_LOG_DATA_SQL, "", "");
1732 }
1733 errCode = CloudExcuteRemoveOrUpdate(REMOVE_ALL_DEV_DATA_SQL, "", "");
1734 } else {
1735 if (isTableExists) {
1736 CloudExcuteRemoveOrUpdate(REMOVE_CLOUD_LOG_DATA_BY_DEVID_SQL, deviceName, "");
1737 }
1738 errCode = CloudExcuteRemoveOrUpdate(REMOVE_DEV_DATA_SQL, deviceName, "");
1739 }
1740 return CheckCorruptedStatus(errCode);
1741 }
1742
StepForResultEntries(bool isGetValue,sqlite3_stmt * statement,std::vector<Entry> & entries) const1743 int SQLiteSingleVerStorageExecutor::StepForResultEntries(bool isGetValue, sqlite3_stmt *statement,
1744 std::vector<Entry> &entries) const
1745 {
1746 entries.clear();
1747 entries.shrink_to_fit();
1748 int errCode = E_OK;
1749 do {
1750 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1751 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1752 errCode = GetEntryFromStatement(isGetValue, statement, entries);
1753 if (errCode != E_OK) {
1754 return errCode;
1755 }
1756 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1757 errCode = E_OK;
1758 break;
1759 } else {
1760 LOGE("SQLite step failed:%d", errCode);
1761 return errCode;
1762 }
1763 } while (true);
1764
1765 // if select no result, return the -E_NOT_FOUND.
1766 if (entries.empty()) {
1767 errCode = -E_NOT_FOUND;
1768 }
1769
1770 return errCode;
1771 }
1772
BindDevForSavedSyncData(sqlite3_stmt * statement,const DataItem & dataItem,const std::string & origDev,const std::string & deviceName)1773 int SQLiteSingleVerStorageExecutor::BindDevForSavedSyncData(sqlite3_stmt *statement, const DataItem &dataItem,
1774 const std::string &origDev, const std::string &deviceName)
1775 {
1776 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_SYNC_FLAG_INDEX,
1777 static_cast<int64_t>(dataItem.flag));
1778 if (errCode != E_OK) {
1779 LOGE("Bind saved sync data flag failed:%d", errCode);
1780 return errCode;
1781 }
1782
1783 std::vector<uint8_t> devVect(deviceName.begin(), deviceName.end());
1784 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_DEV_INDEX, devVect, true);
1785 if (errCode != E_OK) {
1786 LOGE("Bind dev for sync data failed:%d", errCode);
1787 return errCode;
1788 }
1789
1790 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
1791 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_SYNC_ORI_DEV_INDEX, origDevVect, true);
1792 if (errCode != E_OK) {
1793 LOGE("Bind orig dev for sync data failed:%d", errCode);
1794 }
1795
1796 return errCode;
1797 }
1798
GetDataItemSerialSize(const DataItem & item,size_t appendLen)1799 size_t SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(const DataItem &item, size_t appendLen)
1800 {
1801 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
1802 // the size would not be very large.
1803 static const size_t maxOrigDevLength = 40;
1804 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
1805 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
1806 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
1807 return dataSize;
1808 }
1809
InitResultSet(const Key & keyPrefix,sqlite3_stmt * & countStmt)1810 int SQLiteSingleVerStorageExecutor::InitResultSet(const Key &keyPrefix, sqlite3_stmt *&countStmt)
1811 {
1812 if (dbHandle_ == nullptr) {
1813 return -E_INVALID_DB;
1814 }
1815 // bind statement for count
1816 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_COUNT_SYNC_PREFIX_SQL, countStmt);
1817 if (errCode != E_OK) {
1818 LOGE("Get count statement for resultset error:%d", errCode);
1819 return errCode;
1820 }
1821
1822 errCode = SQLiteUtils::BindPrefixKey(countStmt, 1, keyPrefix); // first argument is key
1823 if (errCode != E_OK) {
1824 LOGE("Bind count key error:%d", errCode);
1825 goto ERROR;
1826 }
1827 // bind statement for result set
1828 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_ROWID_PREFIX_SQL, getResultRowIdStatement_);
1829 if (errCode != E_OK) {
1830 LOGE("Get result set rowid statement error:%d", errCode);
1831 goto ERROR;
1832 }
1833
1834 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1835 if (errCode != E_OK) {
1836 LOGE("Get result set entry statement error:%d", errCode);
1837 goto ERROR;
1838 }
1839
1840 errCode = SQLiteUtils::BindPrefixKey(getResultRowIdStatement_, 1, keyPrefix); // first argument is key
1841 if (errCode != E_OK) {
1842 LOGE("Bind result set rowid statement error:%d", errCode);
1843 goto ERROR;
1844 }
1845 return E_OK;
1846
1847 ERROR:
1848 int ret = E_OK;
1849 SQLiteUtils::ResetStatement(countStmt, true, ret);
1850 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
1851 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, ret);
1852 return CheckCorruptedStatus(errCode);
1853 }
1854
InitResultSetCount(QueryObject & queryObj,sqlite3_stmt * & countStmt)1855 int SQLiteSingleVerStorageExecutor::InitResultSetCount(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1856 {
1857 if (dbHandle_ == nullptr) {
1858 return -E_INVALID_DB;
1859 }
1860
1861 int errCode = E_OK;
1862 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1863 if (errCode != E_OK) {
1864 return errCode;
1865 }
1866
1867 errCode = helper.GetCountSqlStatement(dbHandle_, countStmt);
1868 if (errCode != E_OK) {
1869 LOGE("Get count bind statement error:%d", errCode);
1870 int ret = E_OK;
1871 SQLiteUtils::ResetStatement(countStmt, true, ret);
1872 }
1873 return errCode;
1874 }
1875
InitResultSetContent(QueryObject & queryObj)1876 int SQLiteSingleVerStorageExecutor::InitResultSetContent(QueryObject &queryObj)
1877 {
1878 int errCode = E_OK;
1879 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1880 if (errCode != E_OK) {
1881 return errCode;
1882 }
1883
1884 // bind statement for result set
1885 errCode = helper.GetQuerySqlStatement(dbHandle_, true, getResultRowIdStatement_);
1886 if (errCode != E_OK) {
1887 LOGE("[SqlSinExe][InitResSetContent] Bind result set rowid statement of query error:%d", errCode);
1888 int ret = E_OK;
1889 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, ret);
1890 return errCode;
1891 }
1892 errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
1893 if (errCode != E_OK) {
1894 LOGE("[SqlSinExe][InitResSetContent] Get result set entry statement of query error:%d", errCode);
1895 return CheckCorruptedStatus(errCode);
1896 }
1897 return errCode;
1898 }
1899
InitResultSet(QueryObject & queryObj,sqlite3_stmt * & countStmt)1900 int SQLiteSingleVerStorageExecutor::InitResultSet(QueryObject &queryObj, sqlite3_stmt *&countStmt)
1901 {
1902 if (dbHandle_ == nullptr) {
1903 return -E_INVALID_DB;
1904 }
1905
1906 int errCode = E_OK;
1907 SqliteQueryHelper helper = queryObj.GetQueryHelper(errCode);
1908 if (errCode != E_OK) {
1909 return errCode;
1910 }
1911
1912 if (!queryObj.IsValid()) {
1913 return -E_INVALID_QUERY_FORMAT;
1914 }
1915
1916 errCode = InitResultSetCount(queryObj, countStmt);
1917 if (errCode != E_OK) {
1918 return CheckCorruptedStatus(errCode);
1919 }
1920
1921 errCode = InitResultSetContent(queryObj);
1922 if (errCode != E_OK) {
1923 SQLiteUtils::ResetStatement(countStmt, true, errCode);
1924 }
1925 return CheckCorruptedStatus(errCode);
1926 }
1927
UpdateLocalDataTimestamp(Timestamp timestamp)1928 int SQLiteSingleVerStorageExecutor::UpdateLocalDataTimestamp(Timestamp timestamp)
1929 {
1930 const std::string updateSql = "UPDATE local_data SET timestamp=";
1931 std::string sql = updateSql + std::to_string(timestamp) + " WHERE timestamp=0;";
1932 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1933 return CheckCorruptedStatus(errCode);
1934 }
1935
SetAttachMetaMode(bool attachMetaMode)1936 void SQLiteSingleVerStorageExecutor::SetAttachMetaMode(bool attachMetaMode)
1937 {
1938 attachMetaMode_ = attachMetaMode;
1939 }
1940
GetOneRawDataItem(sqlite3_stmt * statement,DataItem & dataItem,uint64_t & verInCurCacheDb,bool isCacheDb) const1941 int SQLiteSingleVerStorageExecutor::GetOneRawDataItem(sqlite3_stmt *statement, DataItem &dataItem,
1942 uint64_t &verInCurCacheDb, bool isCacheDb) const
1943 {
1944 int errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_KEY_INDEX, dataItem.key);
1945 if (errCode != E_OK) {
1946 return errCode;
1947 }
1948
1949 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_VAL_INDEX, dataItem.value);
1950 if (errCode != E_OK) {
1951 return errCode;
1952 }
1953
1954 dataItem.timestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_TIME_INDEX));
1955 dataItem.flag = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_FLAG_INDEX));
1956
1957 std::vector<uint8_t> devVect;
1958 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_DEVICE_INDEX, devVect);
1959 if (errCode != E_OK) {
1960 return errCode;
1961 }
1962 dataItem.dev = std::string(devVect.begin(), devVect.end());
1963
1964 devVect.clear();
1965 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_ORI_DEV_INDEX, devVect);
1966 if (errCode != E_OK) {
1967 return errCode;
1968 }
1969 dataItem.origDev = std::string(devVect.begin(), devVect.end());
1970
1971 errCode = SQLiteUtils::GetColumnBlobValue(statement, SYNC_RES_HASH_KEY_INDEX, dataItem.hashKey);
1972 if (errCode != E_OK) {
1973 return errCode;
1974 }
1975 dataItem.writeTimestamp = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_W_TIME_INDEX));
1976 if (errCode != E_OK) {
1977 return errCode;
1978 }
1979 if (isCacheDb) {
1980 verInCurCacheDb = static_cast<uint64_t>(sqlite3_column_int64(statement, SYNC_RES_VERSION_INDEX));
1981 }
1982 return E_OK;
1983 }
1984
GetAllDataItems(sqlite3_stmt * statement,std::vector<DataItem> & dataItems,uint64_t & verInCurCacheDb,bool isCacheDb) const1985 int SQLiteSingleVerStorageExecutor::GetAllDataItems(sqlite3_stmt *statement, std::vector<DataItem> &dataItems,
1986 uint64_t &verInCurCacheDb, bool isCacheDb) const
1987 {
1988 dataItems.clear();
1989 dataItems.shrink_to_fit();
1990 DataItem dataItem;
1991 int errCode;
1992 do {
1993 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
1994 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1995 errCode = GetOneRawDataItem(statement, dataItem, verInCurCacheDb, isCacheDb);
1996 if (errCode != E_OK) {
1997 return errCode;
1998 }
1999 dataItems.push_back(std::move(dataItem));
2000 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2001 errCode = E_OK;
2002 break;
2003 } else {
2004 LOGE("SQLite step failed:%d", errCode);
2005 break;
2006 }
2007 } while (true);
2008
2009 return CheckCorruptedStatus(errCode);
2010 }
2011
OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,int & count)2012 int SQLiteSingleVerStorageExecutor::OpenResultSetForCacheRowIdModeCommon(std::vector<int64_t> &rowIdCache,
2013 uint32_t cacheLimit, int &count)
2014 {
2015 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_SYNC_DATA_BY_ROWID_SQL, getResultEntryStatement_);
2016 if (errCode != E_OK) {
2017 LOGE("[SqlSinExe][OpenResSetRowId][Common] Get entry stmt fail, errCode=%d", errCode);
2018 return CheckCorruptedStatus(errCode);
2019 }
2020 errCode = StartTransaction(TransactType::DEFERRED);
2021 if (errCode != E_OK) {
2022 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2023 return CheckCorruptedStatus(errCode);
2024 }
2025 // Now Ready To Execute
2026 errCode = ResultSetLoadRowIdCache(rowIdCache, cacheLimit, 0, count);
2027 if (errCode != E_OK) {
2028 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2029 Rollback();
2030 return CheckCorruptedStatus(errCode);
2031 }
2032 // Consider finalize getResultRowIdStatement_ here if count equal to size of rowIdCache.
2033 return E_OK;
2034 }
2035
ResultSetLoadRowIdCache(std::vector<int64_t> & rowIdCache,uint32_t cacheLimit,uint32_t cacheStartPos,int & count)2036 int SQLiteSingleVerStorageExecutor::ResultSetLoadRowIdCache(std::vector<int64_t> &rowIdCache, uint32_t cacheLimit,
2037 uint32_t cacheStartPos, int &count)
2038 {
2039 rowIdCache.clear();
2040 count = 0;
2041 while (true) {
2042 int errCode = SQLiteUtils::StepWithRetry(getResultRowIdStatement_, isMemDb_);
2043 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2044 if (count >= static_cast<int>(cacheStartPos) && rowIdCache.size() < cacheLimit) {
2045 // If we can start cache, and, if we can still cache
2046 int64_t rowid = sqlite3_column_int64(getResultRowIdStatement_, 0);
2047 rowIdCache.push_back(rowid);
2048 }
2049 // Always increase the count
2050 count++;
2051 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2052 break;
2053 } else {
2054 LOGE("[SqlSinExe][ResSetLoadCache] Step fail, errCode=%d", errCode);
2055 rowIdCache.clear();
2056 count = 0;
2057 return CheckCorruptedStatus(errCode);
2058 }
2059 }
2060 return E_OK;
2061 }
2062
ResetStatement()2063 int SQLiteSingleVerStorageExecutor::SaveRecordStatements::ResetStatement()
2064 {
2065 int errCode = E_OK;
2066 SQLiteUtils::ResetStatement(insertStatement, true, errCode);
2067 if (errCode != E_OK) {
2068 LOGE("Finalize insert statements failed, error: %d", errCode);
2069 }
2070
2071 SQLiteUtils::ResetStatement(updateStatement, true, errCode);
2072 if (errCode != E_OK) {
2073 LOGE("Finalize update statements failed, error: %d", errCode);
2074 }
2075
2076 SQLiteUtils::ResetStatement(queryStatement, true, errCode);
2077 if (errCode != E_OK) {
2078 LOGE("Finalize query statement failed, error: %d", errCode);
2079 }
2080 return errCode;
2081 }
2082
FinalizeAllStatements()2083 void SQLiteSingleVerStorageExecutor::FinalizeAllStatements()
2084 {
2085 int errCode = saveLocalStatements_.ResetStatement();
2086 if (errCode != E_OK) {
2087 LOGE("Finalize saveLocal statements failed, error: %d", errCode);
2088 }
2089
2090 errCode = saveSyncStatements_.ResetStatement();
2091 if (errCode != E_OK) {
2092 LOGE("Finalize saveSync statement failed, error: %d", errCode);
2093 }
2094
2095 SQLiteUtils::ResetStatement(getResultRowIdStatement_, true, errCode);
2096 if (errCode != E_OK) {
2097 LOGE("Finalize getResultRowIdStatement_ failed, error: %d", errCode);
2098 }
2099
2100 SQLiteUtils::ResetStatement(getResultEntryStatement_, true, errCode);
2101 if (errCode != E_OK) {
2102 LOGE("Finalize getResultEntryStatement_ failed, error: %d", errCode);
2103 }
2104
2105 errCode = migrateSyncStatements_.ResetStatement();
2106 if (errCode != E_OK) {
2107 LOGE("Finalize migrateSync statements failed, error: %d", errCode);
2108 }
2109
2110 ReleaseContinueStatement();
2111 }
2112
SetConflictResolvePolicy(int policy)2113 void SQLiteSingleVerStorageExecutor::SetConflictResolvePolicy(int policy)
2114 {
2115 if (policy == DENY_OTHER_DEV_AMEND_CUR_DEV_DATA || policy == DEFAULT_LAST_WIN) {
2116 conflictResolvePolicy_ = policy;
2117 }
2118 }
2119
CheckIntegrity() const2120 int SQLiteSingleVerStorageExecutor::CheckIntegrity() const
2121 {
2122 if (dbHandle_ == nullptr) {
2123 return -E_INVALID_DB;
2124 }
2125
2126 return SQLiteUtils::CheckIntegrity(dbHandle_, CHECK_DB_INTEGRITY_SQL);
2127 }
2128
ForceCheckPoint() const2129 int SQLiteSingleVerStorageExecutor::ForceCheckPoint() const
2130 {
2131 if (dbHandle_ == nullptr) {
2132 return -E_INVALID_DB;
2133 }
2134 SQLiteUtils::ExecuteCheckPoint(dbHandle_);
2135 return E_OK;
2136 }
2137
GetLogFileSize() const2138 uint64_t SQLiteSingleVerStorageExecutor::GetLogFileSize() const
2139 {
2140 if (isMemDb_) {
2141 return 0;
2142 }
2143
2144 const char *fileName = sqlite3_db_filename(dbHandle_, "main");
2145 if (fileName == nullptr) {
2146 return 0;
2147 }
2148 std::string walName = std::string(fileName) + "-wal";
2149 uint64_t fileSize = 0;
2150 int result = OS::CalFileSize(std::string(walName), fileSize);
2151 if (result != E_OK) {
2152 return 0;
2153 }
2154 return fileSize;
2155 }
2156
GetExistsDevicesFromMeta(std::set<std::string> & devices)2157 int SQLiteSingleVerStorageExecutor::GetExistsDevicesFromMeta(std::set<std::string> &devices)
2158 {
2159 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_,
2160 attachMetaMode_ ? SqliteMetaExecutor::MetaMode::KV_ATTACH : SqliteMetaExecutor::MetaMode::KV,
2161 isMemDb_, devices);
2162 }
2163
UpdateKey(const UpdateKeyCallback & callback)2164 int SQLiteSingleVerStorageExecutor::UpdateKey(const UpdateKeyCallback &callback)
2165 {
2166 if (dbHandle_ == nullptr) {
2167 return -E_INVALID_DB;
2168 }
2169 UpdateContext context;
2170 context.callback = callback;
2171 int errCode = CreateFuncUpdateKey(context, &Translate, &CalHashKey);
2172 if (errCode != E_OK) {
2173 return errCode;
2174 }
2175 int executeErrCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, UPDATE_SYNC_DATA_KEY_SQL);
2176 context.callback = nullptr;
2177 errCode = CreateFuncUpdateKey(context, nullptr, nullptr);
2178 if (context.errCode != E_OK) {
2179 return context.errCode;
2180 }
2181 if (executeErrCode != E_OK) {
2182 return executeErrCode;
2183 }
2184 if (errCode != E_OK) {
2185 return errCode;
2186 }
2187 return E_OK;
2188 }
2189
CreateFuncUpdateKey(UpdateContext & context,void (* translateFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv),void (* calHashFunc)(sqlite3_context * ctx,int argc,sqlite3_value ** argv)) const2190 int SQLiteSingleVerStorageExecutor::CreateFuncUpdateKey(UpdateContext &context,
2191 void(*translateFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv),
2192 void(*calHashFunc)(sqlite3_context *ctx, int argc, sqlite3_value **argv)) const
2193 {
2194 int errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_TRANSLATE_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2195 &context, translateFunc, nullptr, nullptr, nullptr);
2196 if (errCode != SQLITE_OK) {
2197 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2198 return SQLiteUtils::MapSQLiteErrno(errCode);
2199 }
2200 errCode = sqlite3_create_function_v2(dbHandle_, FUNC_NAME_CAL_HASH_KEY, 1, SQLITE_UTF8 | SQLITE_DETERMINISTIC,
2201 &context, calHashFunc, nullptr, nullptr, nullptr);
2202 if (errCode != SQLITE_OK) {
2203 LOGE("[SqlSinExe][UpdateKey] Create func=translate_key failed=%d", errCode);
2204 return SQLiteUtils::MapSQLiteErrno(errCode);
2205 }
2206 return E_OK;
2207 }
2208
Translate(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2209 void SQLiteSingleVerStorageExecutor::Translate(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2210 {
2211 if (ctx == nullptr || argc != 1 || argv == nullptr) { // i parameters, which are key
2212 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2213 return;
2214 }
2215 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2216 auto keyBlob = static_cast<const uint8_t *>(sqlite3_value_blob(argv[0]));
2217 int keyBlobLen = sqlite3_value_bytes(argv[0]);
2218 Key oldKey;
2219 if (keyBlob != nullptr && keyBlobLen > 0) {
2220 oldKey = Key(keyBlob, keyBlob + keyBlobLen);
2221 }
2222 Key newKey;
2223 context->callback(oldKey, newKey);
2224 if (newKey.size() >= DBConstant::MAX_KEY_SIZE || newKey.empty()) {
2225 LOGE("[SqlSinExe][Translate] invalid key len=%zu", newKey.size());
2226 context->errCode = -E_INVALID_ARGS;
2227 sqlite3_result_error(ctx, "Update key is invalid", -1);
2228 return;
2229 }
2230 context->newKey = newKey;
2231 sqlite3_result_blob(ctx, newKey.data(), static_cast<int>(newKey.size()), SQLITE_TRANSIENT);
2232 }
2233
CalHashKey(sqlite3_context * ctx,int argc,sqlite3_value ** argv)2234 void SQLiteSingleVerStorageExecutor::CalHashKey(sqlite3_context *ctx, int argc, sqlite3_value **argv)
2235 {
2236 if (ctx == nullptr || argc != 1 || argv == nullptr) {
2237 LOGW("[SqlSinExe][Translate] invalid param=%d", argc);
2238 return;
2239 }
2240 auto context = static_cast<UpdateContext *>(sqlite3_user_data(ctx));
2241 Key hashKey;
2242 DBCommon::CalcValueHash(context->newKey, hashKey);
2243 sqlite3_result_blob(ctx, hashKey.data(), static_cast<int>(hashKey.size()), SQLITE_TRANSIENT);
2244 }
2245
IsPrintTimestamp()2246 bool SQLiteSingleVerStorageExecutor::IsPrintTimestamp()
2247 {
2248 // Print only maxLogTimesPerSecond times a second.
2249 Timestamp curTime = TimeHelper::GetSysCurrentTime();
2250 if (curTime - startTime_ > printIntervalSeconds) {
2251 logCount_ = 0;
2252 startTime_ = curTime;
2253 if (droppedCount_ > 0) {
2254 LOGI("Dropped number of timestamp log:%" PRIu64, droppedCount_);
2255 }
2256 droppedCount_ = 0;
2257 } else {
2258 logCount_++;
2259 }
2260
2261 if (logCount_ < maxLogTimesPerSecond) {
2262 return true;
2263 } else {
2264 droppedCount_++;
2265 return false;
2266 }
2267 }
2268 } // namespace DistributedDB
2269