1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21
22 namespace DistributedDB {
23 static constexpr const int ROW_ID_INDEX = 1;
24 static constexpr const int TIMESTAMP_INDEX = 2;
25
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)26 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
27 std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
28 {
29 if (assetFields.empty() && pkSet.empty()) {
30 return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
31 }
32 std::string gid;
33 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
34 if (errCode != E_OK) {
35 LOGE("Get cloud gid fail when query log table.");
36 return errCode;
37 }
38
39 if (pkSet.empty() && gid.empty()) {
40 LOGE("query log table failed because of both primary key and gid are empty.");
41 return -E_CLOUD_ERROR;
42 }
43 std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
44 " a.cloud_gid";
45 for (const auto &field : assetFields) {
46 sql += ", b." + field.colName;
47 }
48 for (const auto &pk : pkSet) {
49 sql += ", b." + pk;
50 }
51 sql += " from '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b ";
52 sql += " ON (a.data_key = b.rowid) WHERE ";
53 if (!gid.empty()) {
54 sql += " a.cloud_gid = ? or ";
55 }
56 sql += "a.hash_key = ?";
57 querySql = sql;
58 return E_OK;
59 }
60
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)61 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
62 const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
63 {
64 std::string sql = "UPDATE " + tableName + " SET ";
65 for (const auto &field: fields) {
66 sql += field.colName + " = ?,";
67 }
68 sql.pop_back();
69 sql += " WHERE rowid = (";
70 sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
71 sqlite3_stmt *stmt = nullptr;
72 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
73 if (errCode != E_OK) {
74 LOGE("Get fill asset statement failed, %d", errCode);
75 return errCode;
76 }
77 for (size_t i = 0; i < fields.size(); ++i) {
78 errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
79 if (errCode != E_OK) {
80 SQLiteUtils::ResetStatement(stmt, true, errCode);
81 return errCode;
82 }
83 }
84 statement = stmt;
85 return errCode;
86 }
87
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)88 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
89 VBucket &vBucket, bool isDownloadSuccess)
90 {
91 std::string cloudGid;
92 int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
93 if (errCode != E_OK) {
94 LOGE("Miss gid when fill Asset");
95 return errCode;
96 }
97 sqlite3_stmt *stmt = nullptr;
98 std::vector<Field> assetsField;
99 errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
100 if (errCode != E_OK) {
101 LOGE("No assets need to be filled.");
102 return errCode;
103 }
104 CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
105
106 if (isDownloadSuccess) {
107 CloudStorageUtils::FillAssetFromVBucketFinish(vBucket, CloudStorageUtils::FillAssetAfterDownload,
108 CloudStorageUtils::FillAssetsAfterDownload);
109 } else {
110 CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetAfterDownloadFail);
111 }
112 errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
113 if (errCode != E_OK) {
114 return errCode;
115 }
116 errCode = SQLiteUtils::BindTextToStatement(stmt, assetsField.size() + 1, cloudGid);
117 if (errCode != E_OK) {
118 LOGE("Bind cloud gid to statement failed. %d", errCode);
119 int ret = E_OK;
120 SQLiteUtils::ResetStatement(stmt, true, ret);
121 return errCode;
122 }
123 errCode = SQLiteUtils::StepWithRetry(stmt);
124 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
125 errCode = E_OK;
126 } else {
127 LOGE("Fill cloud asset failed:%d", errCode);
128 }
129 int ret = E_OK;
130 SQLiteUtils::ResetStatement(stmt, true, ret);
131 return errCode != E_OK ? errCode : ret;
132 }
133
FillCloudAssetForUpload(const std::string & tableName,const CloudSyncBatch & data)134 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(const std::string &tableName,
135 const CloudSyncBatch &data)
136 {
137 if (data.rowid.empty() || data.timestamp.empty()) {
138 return -E_INVALID_ARGS;
139 }
140 if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size()) {
141 return -E_INVALID_ARGS;
142 }
143 int errCode = SetLogTriggerStatus(false);
144 if (errCode != E_OK) {
145 LOGE("Fail to set log trigger off, %d", errCode);
146 return errCode;
147 }
148 sqlite3_stmt *stmt = nullptr;
149 for (size_t i = 0; i < data.assets.size(); ++i) {
150 if (data.assets.at(i).empty()) {
151 continue;
152 }
153 errCode = InitFillUploadAssetStatement(tableName, data, i, stmt);
154 if (errCode != E_OK) {
155 break;
156 }
157 errCode = SQLiteUtils::StepWithRetry(stmt, false);
158 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
159 LOGE("Fill upload asset failed:%d", errCode);
160 break;
161 }
162 errCode = E_OK;
163 SQLiteUtils::ResetStatement(stmt, true, errCode);
164 stmt = nullptr;
165 if (errCode != E_OK) {
166 break;
167 }
168 }
169 int ret = E_OK;
170 SQLiteUtils::ResetStatement(stmt, true, ret);
171 int endCode = SetLogTriggerStatus(true);
172 if (endCode != E_OK) {
173 LOGE("Fail to set log trigger off, %d", endCode);
174 return endCode;
175 }
176 return errCode != E_OK ? errCode : ret;
177 }
178
InitFillUploadAssetStatement(const std::string & tableName,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)179 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(const std::string &tableName,
180 const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
181 {
182 VBucket vBucket = data.assets.at(index);
183 CloudStorageUtils::FillAssetFromVBucketFinish(vBucket, CloudStorageUtils::FillAssetForUpload,
184 CloudStorageUtils::FillAssetsForUpload);
185 std::string sql = "UPDATE " + tableName + " SET ";
186 for (const auto &item: vBucket) {
187 sql += item.first + " = ?,";
188 }
189 sql.pop_back();
190 sql += " WHERE rowid = ? and (select 1 from " + DBCommon::GetLogTableName(tableName) +
191 " WHERE timestamp = ?);";
192 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
193 if (errCode != E_OK) {
194 return errCode;
195 }
196 int batchIndex = 1;
197 for (const auto &item: vBucket) {
198 Field field = {
199 .colName = item.first, .type = static_cast<int32_t>(item.second.index())
200 };
201 errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](batchIndex++, vBucket, field, statement);
202 if (errCode != E_OK) {
203 return errCode;
204 }
205 }
206 int64_t rowid = data.rowid[index];
207 errCode = SQLiteUtils::BindInt64ToStatement(statement, vBucket.size() + ROW_ID_INDEX, rowid);
208 if (errCode != E_OK) {
209 return errCode;
210 }
211 int64_t timeStamp = data.timestamp[index];
212 return SQLiteUtils::BindInt64ToStatement(statement, vBucket.size() + TIMESTAMP_INDEX, timeStamp);
213 }
214
IsGetCloudDataContinue(uint32_t curNum,uint32_t curSize,uint32_t maxSize)215 bool SQLiteSingleVerRelationalStorageExecutor::IsGetCloudDataContinue(uint32_t curNum, uint32_t curSize,
216 uint32_t maxSize)
217 {
218 if (curNum == 0) {
219 return true;
220 }
221 #ifdef MAX_UPLOAD_COUNT
222 if (curSize < maxSize && curNum < MAX_UPLOAD_COUNT) {
223 return true;
224 }
225 #else
226 if (curSize < maxSize) {
227 return true;
228 }
229 #endif
230 return false;
231 }
232 } // namespace DistributedDB
233 #endif