1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_continue_token.h"
17 #include "sqlite_utils.h"
18
19 namespace DistributedDB {
SQLiteSingleVerRelationalContinueToken(const SyncTimeRange & timeRange,const QueryObject & queryObject)20 SQLiteSingleVerRelationalContinueToken::SQLiteSingleVerRelationalContinueToken(
21 const SyncTimeRange &timeRange, const QueryObject &queryObject)
22 : isGettingDeletedData_(false), queryObj_(queryObject), tableName_(queryObj_.GetTableName()), timeRange_(timeRange)
23 {
24 }
25
CheckValid() const26 bool SQLiteSingleVerRelationalContinueToken::CheckValid() const
27 {
28 bool isValid = (magicBegin_ == MAGIC_BEGIN && magicEnd_ == MAGIC_END);
29 if (!isValid) {
30 LOGE("Invalid continue token.");
31 }
32 return isValid;
33 }
34
GetStatement(sqlite3 * db,sqlite3_stmt * & queryStmt,sqlite3_stmt * & fullStmt,bool & isGettingDeletedData)35 int SQLiteSingleVerRelationalContinueToken::GetStatement(sqlite3 *db, sqlite3_stmt *&queryStmt, sqlite3_stmt *&fullStmt,
36 bool &isGettingDeletedData)
37 {
38 isGettingDeletedData = isGettingDeletedData_;
39 if (isGettingDeletedData) {
40 return GetDeletedDataStmt(db, queryStmt);
41 }
42
43 int errCode = GetQuerySyncStatement(db, queryStmt);
44 if (errCode != E_OK) {
45 return errCode;
46 }
47
48 // if lastQueryTime equals 0, that means never sync before, need not to send miss query data.
49 // if queryObj is empty, that means to send all data now, need not to send miss query data.
50 if (timeRange_.lastQueryTime != 0 && !queryObj_.Empty()) {
51 errCode = GetMissQueryStatement(db, fullStmt);
52 }
53 if (errCode != E_OK) {
54 int ret = E_OK;
55 SQLiteUtils::ResetStatement(queryStmt, true, ret);
56 }
57 return errCode;
58 }
59
SetNextBeginTime(const DataItem & theLastItem)60 void SQLiteSingleVerRelationalContinueToken::SetNextBeginTime(const DataItem &theLastItem)
61 {
62 Timestamp nextBeginTime = theLastItem.timestamp + 1;
63 if (nextBeginTime > INT64_MAX) {
64 nextBeginTime = INT64_MAX;
65 }
66 if (!isGettingDeletedData_) {
67 timeRange_.beginTime = nextBeginTime;
68 timeRange_.lastQueryTime = std::max(nextBeginTime, timeRange_.lastQueryTime);
69 return;
70 }
71 if ((theLastItem.flag & DataItem::DELETE_FLAG) != 0) { // The last one could be non-deleted.
72 timeRange_.deleteBeginTime = nextBeginTime;
73 }
74 }
75
FinishGetData()76 void SQLiteSingleVerRelationalContinueToken::FinishGetData()
77 {
78 if (isGettingDeletedData_) {
79 timeRange_.deleteEndTime = 0;
80 return;
81 }
82 isGettingDeletedData_ = true;
83 timeRange_.endTime = 0;
84 return;
85 }
86
IsGetAllDataFinished() const87 bool SQLiteSingleVerRelationalContinueToken::IsGetAllDataFinished() const
88 {
89 return timeRange_.beginTime >= timeRange_.endTime && timeRange_.deleteBeginTime >= timeRange_.deleteEndTime;
90 }
91
GetQuerySyncStatement(sqlite3 * db,sqlite3_stmt * & stmt)92 int SQLiteSingleVerRelationalContinueToken::GetQuerySyncStatement(sqlite3 *db, sqlite3_stmt *&stmt)
93 {
94 int errCode = E_OK;
95 SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
96 if (errCode != E_OK) {
97 return errCode;
98 }
99 if (fieldNames_.empty()) {
100 LOGE("field names cannot be empty.");
101 return -E_INTERNAL_ERROR;
102 }
103 return helper.GetRelationalQueryStatement(db, timeRange_.beginTime, timeRange_.endTime, fieldNames_, stmt);
104 }
105
GetMissQueryStatement(sqlite3 * db,sqlite3_stmt * & stmt)106 int SQLiteSingleVerRelationalContinueToken::GetMissQueryStatement(sqlite3 *db, sqlite3_stmt *&stmt)
107 {
108 int errCode = E_OK;
109 SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
110 if (errCode != E_OK) {
111 return errCode;
112 }
113 return helper.GetRelationalMissQueryStatement(db, timeRange_.lastQueryTime + 1, INT64_MAX, fieldNames_, stmt);
114 }
115
GetDeletedDataStmt(sqlite3 * db,sqlite3_stmt * & stmt) const116 int SQLiteSingleVerRelationalContinueToken::GetDeletedDataStmt(sqlite3 *db, sqlite3_stmt *&stmt) const
117 {
118 // get stmt
119 const std::string sql = GetDeletedDataSQL();
120 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
121 if (errCode != E_OK) {
122 return errCode;
123 }
124
125 // bind stmt
126 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timeRange_.deleteBeginTime); // 1 means begin time
127 if (errCode != E_OK) {
128 goto ERROR;
129 }
130 errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, timeRange_.deleteEndTime); // 2 means end time
131 if (errCode != E_OK) {
132 goto ERROR;
133 }
134 return errCode;
135
136 ERROR:
137 int ret = E_OK;
138 SQLiteUtils::ResetStatement(stmt, true, ret);
139 return errCode;
140 }
141
GetQuery() const142 const QueryObject &SQLiteSingleVerRelationalContinueToken::GetQuery() const
143 {
144 return queryObj_;
145 }
146
GetDeletedDataSQL() const147 std::string SQLiteSingleVerRelationalContinueToken::GetDeletedDataSQL() const
148 {
149 std::string tableName = DBConstant::RELATIONAL_PREFIX + tableName_ + "_log";
150 return "SELECT * FROM " + tableName +
151 " WHERE timestamp >= ? AND timestamp < ? AND (flag&0x03 = 0x03) ORDER BY timestamp ASC;";
152 }
153
SetFieldNames(const std::vector<std::string> & fieldNames)154 void SQLiteSingleVerRelationalContinueToken::SetFieldNames(const std::vector<std::string> &fieldNames)
155 {
156 fieldNames_ = fieldNames;
157 }
158
UpdateNextSyncOffset(int addOffset)159 void SQLiteSingleVerRelationalContinueToken::UpdateNextSyncOffset(int addOffset)
160 {
161 if (!queryObj_.HasLimit() || queryObj_.HasOrderBy()) {
162 return;
163 }
164 int limit;
165 int offset;
166 queryObj_.GetLimitVal(limit, offset);
167 if (limit < addOffset) {
168 LOGW("Sync data is over limit.");
169 return;
170 }
171 queryObj_.SetLimit(limit - addOffset, offset + addOffset);
172 }
173
SetCloudTableSchema(const TableSchema & schema)174 void SQLiteSingleVerRelationalContinueToken::SetCloudTableSchema(const TableSchema &schema)
175 {
176 tableSchema_ = schema;
177 }
178
GetCloudStatement(sqlite3 * db,CloudSyncData & cloudDataResult,sqlite3_stmt * & queryStmt,bool & isFirstTime)179 int SQLiteSingleVerRelationalContinueToken::GetCloudStatement(sqlite3 *db, CloudSyncData &cloudDataResult,
180 sqlite3_stmt *&queryStmt, bool &isFirstTime)
181 {
182 if (queryStmt_ != nullptr) {
183 queryStmt = queryStmt_;
184 isFirstTime = false;
185 return E_OK;
186 }
187 int errCode;
188 SqliteQueryHelper helper = queryObj_.GetQueryHelper(errCode);
189 if (errCode != E_OK) {
190 return errCode;
191 }
192 std::string sql = helper.GetRelationalCloudQuerySql(tableSchema_.fields, cloudDataResult.isCloudForcePushStrategy,
193 cloudDataResult.isCompensatedTask, cloudDataResult.mode);
194 errCode = helper.GetCloudQueryStatement(true, db, sql, queryStmt_);
195 if (errCode == E_OK) {
196 queryStmt = queryStmt_;
197 }
198 isFirstTime = true;
199 return errCode;
200 }
201
GetCloudTableSchema(TableSchema & tableSchema) const202 void SQLiteSingleVerRelationalContinueToken::GetCloudTableSchema(TableSchema &tableSchema) const
203 {
204 tableSchema = tableSchema_;
205 }
206
ReleaseCloudStatement()207 int SQLiteSingleVerRelationalContinueToken::ReleaseCloudStatement()
208 {
209 if (queryStmt_ == nullptr) {
210 return E_OK;
211 }
212 int errCode = E_OK;
213 SQLiteUtils::ResetStatement(queryStmt_, true, errCode);
214 queryStmt_ = nullptr;
215 return errCode;
216 }
217
IsUseLocalSchema() const218 bool SQLiteSingleVerRelationalContinueToken::IsUseLocalSchema() const
219 {
220 return queryObj_.IsUseLocalSchema();
221 }
222
GetRemoteDev() const223 std::string SQLiteSingleVerRelationalContinueToken::GetRemoteDev() const
224 {
225 return queryObj_.GetRemoteDev();
226 }
227 } // namespace DistributedDB
228 #endif