1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "RdbCloud"
17 #include "rdb_cloud.h"
18
19 #include "cloud/schema_meta.h"
20 #include "log_print.h"
21 #include "rdb_query.h"
22 #include "relational_store_manager.h"
23 #include "utils/anonymous.h"
24 #include "value_proxy.h"
25
26 namespace OHOS::DistributedRdb {
27 using namespace DistributedDB;
28 using namespace DistributedData;
RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB,BindAssets bindAssets)29 RdbCloud::RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB, BindAssets bindAssets)
30 : cloudDB_(std::move(cloudDB)), snapshots_(std::move(bindAssets))
31 {
32 }
33
BatchInsert(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)34 DBStatus RdbCloud::BatchInsert(
35 const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
36 {
37 extend.resize(record.size());
38 VBuckets extends = ValueProxy::Convert(std::move(extend));
39 VBuckets records = ValueProxy::Convert(std::move(record));
40 std::set<std::string> skipAssets;
41 PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
42 VBuckets temp = records;
43 auto error = cloudDB_->BatchInsert(tableName, std::move(records), extends);
44 PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
45 ConvertErrorField(extends);
46 extend = ValueProxy::Convert(std::move(extends));
47 return ConvertStatus(static_cast<GeneralError>(error));
48 }
49
BatchUpdate(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)50 DBStatus RdbCloud::BatchUpdate(
51 const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
52 {
53 extend.resize(record.size());
54 VBuckets extends = ValueProxy::Convert(std::move(extend));
55 VBuckets records = ValueProxy::Convert(std::move(record));
56 std::set<std::string> skipAssets;
57 PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
58 VBuckets temp = records;
59 auto error = cloudDB_->BatchUpdate(tableName, std::move(records), extends);
60 PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
61 ConvertErrorField(extends);
62 extend = ValueProxy::Convert(std::move(extends));
63 return ConvertStatus(static_cast<GeneralError>(error));
64 }
65
BatchDelete(const std::string & tableName,std::vector<DBVBucket> & extend)66 DBStatus RdbCloud::BatchDelete(const std::string &tableName, std::vector<DBVBucket> &extend)
67 {
68 VBuckets extends = ValueProxy::Convert(std::move(extend));
69 auto error = cloudDB_->BatchDelete(tableName, extends);
70 ConvertErrorField(extends);
71 extend = ValueProxy::Convert(std::move(extends));
72 return ConvertStatus(static_cast<GeneralError>(error));
73 }
74
Query(const std::string & tableName,DBVBucket & extend,std::vector<DBVBucket> & data)75 DBStatus RdbCloud::Query(const std::string &tableName, DBVBucket &extend, std::vector<DBVBucket> &data)
76 {
77 auto [nodes, status] = ConvertQuery(extend);
78 std::shared_ptr<Cursor> cursor = nullptr;
79 int32_t code = E_OK;
80 if (status == GeneralError::E_OK && !nodes.empty()) {
81 RdbQuery query;
82 query.SetQueryNodes(tableName, std::move(nodes));
83 std::tie(code, cursor) = cloudDB_->Query(query, ValueProxy::Convert(std::move(extend)));
84 } else {
85 std::tie(code, cursor) = cloudDB_->Query(tableName, ValueProxy::Convert(std::move(extend)));
86 }
87 if (cursor == nullptr || code != E_OK) {
88 ZLOGE("cursor is null, table:%{public}s, extend:%{public}zu",
89 Anonymous::Change(tableName).c_str(), extend.size());
90 return ConvertStatus(static_cast<GeneralError>(code != E_OK ? code : E_ERROR));
91 }
92 int32_t count = cursor->GetCount();
93 data.reserve(count);
94 auto err = cursor->MoveToFirst();
95 while (err == E_OK && count > 0) {
96 DistributedData::VBucket entry;
97 err = cursor->GetEntry(entry);
98 if (err != E_OK) {
99 break;
100 }
101 data.emplace_back(ValueProxy::Convert(std::move(entry)));
102 err = cursor->MoveToNext();
103 count--;
104 }
105 DistributedData::Value cursorFlag;
106 cursor->Get(SchemaMeta::CURSOR_FIELD, cursorFlag);
107 extend[SchemaMeta::CURSOR_FIELD] = ValueProxy::Convert(std::move(cursorFlag));
108 if (cursor->IsEnd()) {
109 ZLOGD("query end, table:%{public}s", Anonymous::Change(tableName).c_str());
110 return DBStatus::QUERY_END;
111 }
112 return ConvertStatus(static_cast<GeneralError>(err));
113 }
114
PreSharing(const std::string & tableName,VBuckets & extend)115 DistributedData::GeneralError RdbCloud::PreSharing(const std::string& tableName, VBuckets& extend)
116 {
117 return static_cast<GeneralError>(cloudDB_->PreSharing(tableName, extend));
118 }
119
Lock()120 std::pair<DBStatus, uint32_t> RdbCloud::Lock()
121 {
122 auto result = InnerLock(FLAG::SYSTEM_ABILITY);
123 return { ConvertStatus(result.first), result.second };
124 }
125
UnLock()126 DBStatus RdbCloud::UnLock()
127 {
128 return ConvertStatus(InnerUnLock(FLAG::SYSTEM_ABILITY));
129 }
130
HeartBeat()131 DBStatus RdbCloud::HeartBeat()
132 {
133 auto error = cloudDB_->Heartbeat();
134 return ConvertStatus(static_cast<GeneralError>(error));
135 }
136
Close()137 DBStatus RdbCloud::Close()
138 {
139 auto error = cloudDB_->Close();
140 return ConvertStatus(static_cast<GeneralError>(error));
141 }
142
InnerLock(FLAG flag)143 std::pair<GeneralError, uint32_t> RdbCloud::InnerLock(FLAG flag)
144 {
145 std::lock_guard<decltype(mutex_)> lock(mutex_);
146 flag_ |= flag;
147 // int64_t <-> uint32_t, s <-> ms
148 return std::make_pair(static_cast<GeneralError>(cloudDB_->Lock()), cloudDB_->AliveTime() * TO_MS);
149 }
150
InnerUnLock(FLAG flag)151 GeneralError RdbCloud::InnerUnLock(FLAG flag)
152 {
153 std::lock_guard<decltype(mutex_)> lock(mutex_);
154 flag_ &= ~flag;
155 if (flag_ == 0) {
156 return static_cast<GeneralError>(cloudDB_->Unlock());
157 }
158 return GeneralError::E_OK;
159 }
160
LockCloudDB(FLAG flag)161 std::pair<GeneralError, uint32_t> RdbCloud::LockCloudDB(FLAG flag)
162 {
163 return InnerLock(flag);
164 }
165
UnLockCloudDB(FLAG flag)166 GeneralError RdbCloud::UnLockCloudDB(FLAG flag)
167 {
168 return InnerUnLock(flag);
169 }
170
GetEmptyCursor(const std::string & tableName)171 std::pair<DBStatus, std::string> RdbCloud::GetEmptyCursor(const std::string &tableName)
172 {
173 auto [error, cursor] = cloudDB_->GetEmptyCursor(tableName);
174 return { ConvertStatus(static_cast<GeneralError>(error)), cursor };
175 }
176
ConvertStatus(DistributedData::GeneralError error)177 DBStatus RdbCloud::ConvertStatus(DistributedData::GeneralError error)
178 {
179 switch (error) {
180 case GeneralError::E_OK:
181 return DBStatus::OK;
182 case GeneralError::E_NETWORK_ERROR:
183 return DBStatus::CLOUD_NETWORK_ERROR;
184 case GeneralError::E_LOCKED_BY_OTHERS:
185 return DBStatus::CLOUD_LOCK_ERROR;
186 case GeneralError::E_RECODE_LIMIT_EXCEEDED:
187 return DBStatus::CLOUD_FULL_RECORDS;
188 case GeneralError::E_NO_SPACE_FOR_ASSET:
189 return DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT;
190 case GeneralError::E_VERSION_CONFLICT:
191 return DBStatus::CLOUD_VERSION_CONFLICT;
192 case GeneralError::E_RECORD_EXIST_CONFLICT:
193 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
194 case GeneralError::E_RECORD_NOT_FOUND:
195 return DBStatus::CLOUD_RECORD_NOT_FOUND;
196 case GeneralError::E_RECORD_ALREADY_EXISTED:
197 return DBStatus::CLOUD_RECORD_ALREADY_EXISTED;
198 case GeneralError::E_FILE_NOT_EXIST:
199 return DBStatus::LOCAL_ASSET_NOT_FOUND;
200 case GeneralError::E_TIME_OUT:
201 return DBStatus::TIME_OUT;
202 case GeneralError::E_CLOUD_DISABLED:
203 return DBStatus::CLOUD_DISABLED;
204 case GeneralError::E_SKIP_ASSET:
205 return DBStatus::SKIP_ASSET;
206 default:
207 ZLOGI("error:0x%{public}x", error);
208 break;
209 }
210 return DBStatus::CLOUD_ERROR;
211 }
212
ConvertQuery(RdbCloud::DBVBucket & extend)213 std::pair<RdbCloud::QueryNodes, DistributedData::GeneralError> RdbCloud::ConvertQuery(RdbCloud::DBVBucket& extend)
214 {
215 auto it = extend.find(TYPE_FIELD);
216 if (it == extend.end() || std::get<int64_t>(it->second) != static_cast<int64_t>(CloudQueryType::QUERY_FIELD)) {
217 return { {}, GeneralError::E_ERROR };
218 }
219 it = extend.find(QUERY_FIELD);
220 if (it == extend.end()) {
221 ZLOGE("error, no QUERY_FIELD!");
222 return { {}, GeneralError::E_ERROR };
223 }
224
225 auto bytes = std::get_if<DistributedDB::Bytes>(&it->second);
226 std::vector<DistributedDB::QueryNode> nodes;
227 DBStatus status = DB_ERROR;
228 if (bytes != nullptr) {
229 nodes = DistributedDB::RelationalStoreManager::ParserQueryNodes(*bytes, status);
230 }
231 if (status != OK) {
232 ZLOGE("error, ParserQueryNodes failed, status:%{public}d", status);
233 return { {}, GeneralError::E_ERROR };
234 }
235 return { ConvertQuery(std::move(nodes)), GeneralError::E_OK };
236 }
237
ConvertQuery(RdbCloud::DBQueryNodes && nodes)238 RdbCloud::QueryNodes RdbCloud::ConvertQuery(RdbCloud::DBQueryNodes&& nodes)
239 {
240 QueryNodes queryNodes;
241 queryNodes.reserve(nodes.size());
242 for (auto& node : nodes) {
243 QueryNode queryNode;
244 queryNode.fieldName = std::move(node.fieldName);
245 queryNode.fieldValue = ValueProxy::Convert(std::move(node.fieldValue));
246 switch (node.type) {
247 case QueryNodeType::IN:
248 queryNode.op = QueryOperation::IN;
249 break;
250 case QueryNodeType::OR:
251 queryNode.op = QueryOperation::OR;
252 break;
253 case QueryNodeType::AND:
254 queryNode.op = QueryOperation::AND;
255 break;
256 case QueryNodeType::EQUAL_TO:
257 queryNode.op = QueryOperation::EQUAL_TO;
258 break;
259 case QueryNodeType::BEGIN_GROUP:
260 queryNode.op = QueryOperation::BEGIN_GROUP;
261 break;
262 case QueryNodeType::END_GROUP:
263 queryNode.op = QueryOperation::END_GROUP;
264 break;
265 default:
266 ZLOGE("invalid operation:0x%{public}d", node.type);
267 return {};
268 }
269 queryNodes.emplace_back(std::move(queryNode));
270 }
271 return queryNodes;
272 }
273
PostEvent(VBuckets & records,std::set<std::string> & skipAssets,VBuckets & extend,DistributedData::AssetEvent eventId)274 void RdbCloud::PostEvent(VBuckets& records, std::set<std::string>& skipAssets, VBuckets& extend,
275 DistributedData::AssetEvent eventId)
276 {
277 int32_t index = 0;
278 for (auto& record : records) {
279 DataBucket& ext = extend[index++];
280 for (auto& [key, value] : record) {
281 PostEvent(value, ext, skipAssets, eventId);
282 }
283 }
284 }
285
PostEvent(DistributedData::Value & value,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)286 void RdbCloud::PostEvent(DistributedData::Value& value, DataBucket& extend, std::set<std::string>& skipAssets,
287 DistributedData::AssetEvent eventId)
288 {
289 if (value.index() != TYPE_INDEX<DistributedData::Asset> && value.index() != TYPE_INDEX<DistributedData::Assets>) {
290 return;
291 }
292
293 if (value.index() == TYPE_INDEX<DistributedData::Asset>) {
294 auto* asset = Traits::get_if<DistributedData::Asset>(&value);
295 PostEventAsset(*asset, extend, skipAssets, eventId);
296 }
297
298 if (value.index() == TYPE_INDEX<DistributedData::Assets>) {
299 auto* assets = Traits::get_if<DistributedData::Assets>(&value);
300 for (auto& asset : *assets) {
301 PostEventAsset(asset, extend, skipAssets, eventId);
302 }
303 }
304 }
305
PostEventAsset(DistributedData::Asset & asset,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)306 void RdbCloud::PostEventAsset(DistributedData::Asset& asset, DataBucket& extend, std::set<std::string>& skipAssets,
307 DistributedData::AssetEvent eventId)
308 {
309 if (snapshots_ == nullptr) {
310 return;
311 }
312 auto it = snapshots_->find(asset.uri);
313 if (it == snapshots_->end() || it->second == nullptr) {
314 return;
315 }
316
317 if (eventId == DistributedData::UPLOAD) {
318 it->second->Upload(asset);
319 if (it->second->GetAssetStatus(asset) == TransferStatus::STATUS_WAIT_UPLOAD) {
320 skipAssets.insert(asset.uri);
321 extend[SchemaMeta::ERROR_FIELD] = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
322 }
323 }
324
325 if (eventId == DistributedData::UPLOAD_FINISHED) {
326 auto skip = skipAssets.find(asset.uri);
327 if (skip != skipAssets.end()) {
328 return;
329 }
330 it->second->Uploaded(asset);
331 }
332 }
333
GetLockFlag() const334 uint8_t RdbCloud::GetLockFlag() const
335 {
336 return flag_;
337 }
338
ConvertErrorField(DistributedData::VBuckets & extends)339 void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
340 {
341 for (auto& extend : extends) {
342 auto errorField = extend.find(SchemaMeta::ERROR_FIELD);
343 if (errorField == extend.end()) {
344 continue;
345 }
346 auto errCode = Traits::get_if<int64_t>(&(errorField->second));
347 if (errCode == nullptr) {
348 continue;
349 }
350 errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
351 }
352 }
353
SetPrepareTraceId(const std::string & traceId)354 void RdbCloud::SetPrepareTraceId(const std::string &traceId)
355 {
356 if (cloudDB_ == nullptr) {
357 return;
358 }
359 cloudDB_->SetPrepareTraceId(traceId);
360 }
361 } // namespace OHOS::DistributedRdb