• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_storage_utils.h"
20 #include "db_common.h"
21 
22 namespace DistributedDB {
23 static constexpr const int ROW_ID_INDEX = 1;
24 static constexpr const int TIMESTAMP_INDEX = 2;
25 
GetQueryInfoSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::vector<Field> & assetFields,std::string & querySql)26 int SQLiteSingleVerRelationalStorageExecutor::GetQueryInfoSql(const std::string &tableName, const VBucket &vBucket,
27     std::set<std::string> &pkSet, std::vector<Field> &assetFields, std::string &querySql)
28 {
29     if (assetFields.empty() && pkSet.empty()) {
30         return GetQueryLogSql(tableName, vBucket, pkSet, querySql);
31     }
32     std::string gid;
33     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gid);
34     if (errCode != E_OK) {
35         LOGE("Get cloud gid fail when query log table.");
36         return errCode;
37     }
38 
39     if (pkSet.empty() && gid.empty()) {
40         LOGE("query log table failed because of both primary key and gid are empty.");
41         return -E_CLOUD_ERROR;
42     }
43     std::string sql = "select a.data_key, a.device, a.ori_device, a.timestamp, a.wtimestamp, a.flag, a.hash_key,"
44         " a.cloud_gid";
45     for (const auto &field : assetFields) {
46         sql += ", b." + field.colName;
47     }
48     for (const auto &pk : pkSet) {
49         sql += ", b." + pk;
50     }
51     sql += " from '" + DBCommon::GetLogTableName(tableName) + "' AS a LEFT JOIN '" + tableName + "' AS b ";
52     sql += " ON (a.data_key = b.rowid) WHERE ";
53     if (!gid.empty()) {
54         sql += " a.cloud_gid = ? or ";
55     }
56     sql += "a.hash_key = ?";
57     querySql = sql;
58     return E_OK;
59 }
60 
GetFillDownloadAssetStatement(const std::string & tableName,const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * & statement)61 int SQLiteSingleVerRelationalStorageExecutor::GetFillDownloadAssetStatement(const std::string &tableName,
62     const VBucket &vBucket, const std::vector<Field> &fields, sqlite3_stmt *&statement)
63 {
64     std::string sql = "UPDATE " + tableName + " SET ";
65     for (const auto &field: fields) {
66         sql += field.colName + " = ?,";
67     }
68     sql.pop_back();
69     sql += " WHERE rowid = (";
70     sql += "SELECT data_key FROM " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = ?);";
71     sqlite3_stmt *stmt = nullptr;
72     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
73     if (errCode != E_OK) {
74         LOGE("Get fill asset statement failed, %d", errCode);
75         return errCode;
76     }
77     for (size_t i = 0; i < fields.size(); ++i) {
78         errCode = BindOneField(i + 1, vBucket, fields[i], stmt);
79         if (errCode != E_OK) {
80             SQLiteUtils::ResetStatement(stmt, true, errCode);
81             return errCode;
82         }
83     }
84     statement = stmt;
85     return errCode;
86 }
87 
FillCloudAssetForDownload(const TableSchema & tableSchema,VBucket & vBucket,bool isDownloadSuccess)88 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForDownload(const TableSchema &tableSchema,
89     VBucket &vBucket, bool isDownloadSuccess)
90 {
91     std::string cloudGid;
92     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
93     if (errCode != E_OK) {
94         LOGE("Miss gid when fill Asset");
95         return errCode;
96     }
97     sqlite3_stmt *stmt = nullptr;
98     std::vector<Field> assetsField;
99     errCode = CloudStorageUtils::GetAssetFieldsFromSchema(tableSchema, vBucket, assetsField);
100     if (errCode != E_OK) {
101         LOGE("No assets need to be filled.");
102         return errCode;
103     }
104     CloudStorageUtils::ChangeAssetsOnVBucketToAsset(vBucket, assetsField);
105 
106     if (isDownloadSuccess) {
107         CloudStorageUtils::FillAssetFromVBucketFinish(vBucket, CloudStorageUtils::FillAssetAfterDownload,
108             CloudStorageUtils::FillAssetsAfterDownload);
109     } else {
110         CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetAfterDownloadFail);
111     }
112     errCode = GetFillDownloadAssetStatement(tableSchema.name, vBucket, assetsField, stmt);
113     if (errCode != E_OK) {
114         return errCode;
115     }
116     errCode = SQLiteUtils::BindTextToStatement(stmt, assetsField.size() + 1, cloudGid);
117     if (errCode != E_OK) {
118         LOGE("Bind cloud gid to statement failed. %d", errCode);
119         int ret = E_OK;
120         SQLiteUtils::ResetStatement(stmt, true, ret);
121         return errCode;
122     }
123     errCode = SQLiteUtils::StepWithRetry(stmt);
124     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
125         errCode = E_OK;
126     } else {
127         LOGE("Fill cloud asset failed:%d", errCode);
128     }
129     int ret = E_OK;
130     SQLiteUtils::ResetStatement(stmt, true, ret);
131     return errCode != E_OK ? errCode : ret;
132 }
133 
FillCloudAssetForUpload(const std::string & tableName,const CloudSyncBatch & data)134 int SQLiteSingleVerRelationalStorageExecutor::FillCloudAssetForUpload(const std::string &tableName,
135     const CloudSyncBatch &data)
136 {
137     if (data.rowid.empty() || data.timestamp.empty()) {
138         return -E_INVALID_ARGS;
139     }
140     if (data.assets.size() != data.rowid.size() || data.assets.size() != data.timestamp.size()) {
141         return -E_INVALID_ARGS;
142     }
143     int errCode = SetLogTriggerStatus(false);
144     if (errCode != E_OK) {
145         LOGE("Fail to set log trigger off, %d", errCode);
146         return errCode;
147     }
148     sqlite3_stmt *stmt = nullptr;
149     for (size_t i = 0; i < data.assets.size(); ++i) {
150         if (data.assets.at(i).empty()) {
151             continue;
152         }
153         errCode = InitFillUploadAssetStatement(tableName, data, i, stmt);
154         if (errCode != E_OK) {
155             break;
156         }
157         errCode = SQLiteUtils::StepWithRetry(stmt, false);
158         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
159             LOGE("Fill upload asset failed:%d", errCode);
160             break;
161         }
162         errCode = E_OK;
163         SQLiteUtils::ResetStatement(stmt, true, errCode);
164         stmt = nullptr;
165         if (errCode != E_OK) {
166             break;
167         }
168     }
169     int ret = E_OK;
170     SQLiteUtils::ResetStatement(stmt, true, ret);
171     int endCode = SetLogTriggerStatus(true);
172     if (endCode != E_OK) {
173         LOGE("Fail to set log trigger off, %d", endCode);
174         return endCode;
175     }
176     return errCode != E_OK ? errCode : ret;
177 }
178 
InitFillUploadAssetStatement(const std::string & tableName,const CloudSyncBatch & data,const int & index,sqlite3_stmt * & statement)179 int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(const std::string &tableName,
180     const CloudSyncBatch &data, const int &index, sqlite3_stmt *&statement)
181 {
182     VBucket vBucket = data.assets.at(index);
183     CloudStorageUtils::FillAssetFromVBucketFinish(vBucket, CloudStorageUtils::FillAssetForUpload,
184         CloudStorageUtils::FillAssetsForUpload);
185     std::string sql = "UPDATE " + tableName + " SET ";
186     for (const auto &item: vBucket) {
187         sql += item.first + " = ?,";
188     }
189     sql.pop_back();
190     sql += " WHERE rowid = ? and (select 1 from " + DBCommon::GetLogTableName(tableName) +
191         " WHERE timestamp = ?);";
192     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
193     if (errCode != E_OK) {
194         return errCode;
195     }
196     int batchIndex = 1;
197     for (const auto &item: vBucket) {
198         Field field = {
199             .colName = item.first, .type = static_cast<int32_t>(item.second.index())
200         };
201         errCode = bindCloudFieldFuncMap_[TYPE_INDEX<Assets>](batchIndex++, vBucket, field, statement);
202         if (errCode != E_OK) {
203             return errCode;
204         }
205     }
206     int64_t rowid = data.rowid[index];
207     errCode = SQLiteUtils::BindInt64ToStatement(statement, vBucket.size() + ROW_ID_INDEX, rowid);
208     if (errCode != E_OK) {
209         return errCode;
210     }
211     int64_t timeStamp = data.timestamp[index];
212     return SQLiteUtils::BindInt64ToStatement(statement, vBucket.size() + TIMESTAMP_INDEX, timeStamp);
213 }
214 
IsGetCloudDataContinue(uint32_t curNum,uint32_t curSize,uint32_t maxSize)215 bool SQLiteSingleVerRelationalStorageExecutor::IsGetCloudDataContinue(uint32_t curNum, uint32_t curSize,
216     uint32_t maxSize)
217 {
218     if (curNum == 0) {
219         return true;
220     }
221 #ifdef MAX_UPLOAD_COUNT
222     if (curSize < maxSize && curNum < MAX_UPLOAD_COUNT) {
223         return true;
224     }
225 #else
226     if (curSize < maxSize) {
227         return true;
228     }
229 #endif
230     return false;
231 }
232 } // namespace DistributedDB
233 #endif