• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "RdbCloud"
17 #include "rdb_cloud.h"
18 
19 #include "cloud/schema_meta.h"
20 #include "log_print.h"
21 #include "rdb_query.h"
22 #include "relational_store_manager.h"
23 #include "utils/anonymous.h"
24 #include "value_proxy.h"
25 
26 namespace OHOS::DistributedRdb {
27 using namespace DistributedDB;
28 using namespace DistributedData;
RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB,BindAssets bindAssets)29 RdbCloud::RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB, BindAssets bindAssets)
30     : cloudDB_(std::move(cloudDB)), snapshots_(std::move(bindAssets))
31 {
32 }
33 
BatchInsert(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)34 DBStatus RdbCloud::BatchInsert(
35     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
36 {
37     extend.resize(record.size());
38     VBuckets extends = ValueProxy::Convert(std::move(extend));
39     VBuckets records = ValueProxy::Convert(std::move(record));
40     std::set<std::string> skipAssets;
41     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
42     VBuckets temp = records;
43     auto error = cloudDB_->BatchInsert(tableName, std::move(records), extends);
44     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
45     ConvertErrorField(extends);
46     extend = ValueProxy::Convert(std::move(extends));
47     return ConvertStatus(static_cast<GeneralError>(error));
48 }
49 
BatchUpdate(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)50 DBStatus RdbCloud::BatchUpdate(
51     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
52 {
53     extend.resize(record.size());
54     VBuckets extends = ValueProxy::Convert(std::move(extend));
55     VBuckets records = ValueProxy::Convert(std::move(record));
56     std::set<std::string> skipAssets;
57     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
58     VBuckets temp = records;
59     auto error = cloudDB_->BatchUpdate(tableName, std::move(records), extends);
60     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
61     ConvertErrorField(extends);
62     extend = ValueProxy::Convert(std::move(extends));
63     return ConvertStatus(static_cast<GeneralError>(error));
64 }
65 
BatchDelete(const std::string & tableName,std::vector<DBVBucket> & extend)66 DBStatus RdbCloud::BatchDelete(const std::string &tableName, std::vector<DBVBucket> &extend)
67 {
68     VBuckets extends = ValueProxy::Convert(std::move(extend));
69     auto error = cloudDB_->BatchDelete(tableName, extends);
70     ConvertErrorField(extends);
71     extend = ValueProxy::Convert(std::move(extends));
72     return ConvertStatus(static_cast<GeneralError>(error));
73 }
74 
Query(const std::string & tableName,DBVBucket & extend,std::vector<DBVBucket> & data)75 DBStatus RdbCloud::Query(const std::string &tableName, DBVBucket &extend, std::vector<DBVBucket> &data)
76 {
77     auto [nodes, status] = ConvertQuery(extend);
78     std::shared_ptr<Cursor> cursor = nullptr;
79     int32_t code = E_OK;
80     if (status == GeneralError::E_OK && !nodes.empty()) {
81         RdbQuery query;
82         query.SetQueryNodes(tableName, std::move(nodes));
83         std::tie(code, cursor) = cloudDB_->Query(query, ValueProxy::Convert(std::move(extend)));
84     } else {
85         std::tie(code, cursor) = cloudDB_->Query(tableName, ValueProxy::Convert(std::move(extend)));
86     }
87     if (cursor == nullptr || code != E_OK) {
88         ZLOGE("cursor is null, table:%{public}s, extend:%{public}zu",
89             Anonymous::Change(tableName).c_str(), extend.size());
90         return ConvertStatus(static_cast<GeneralError>(code != E_OK ? code : E_ERROR));
91     }
92     int32_t count = cursor->GetCount();
93     data.reserve(count);
94     auto err = cursor->MoveToFirst();
95     while (err == E_OK && count > 0) {
96         DistributedData::VBucket entry;
97         err = cursor->GetEntry(entry);
98         if (err != E_OK) {
99             break;
100         }
101         data.emplace_back(ValueProxy::Convert(std::move(entry)));
102         err = cursor->MoveToNext();
103         count--;
104     }
105     DistributedData::Value cursorFlag;
106     cursor->Get(SchemaMeta::CURSOR_FIELD, cursorFlag);
107     extend[SchemaMeta::CURSOR_FIELD] = ValueProxy::Convert(std::move(cursorFlag));
108     if (cursor->IsEnd()) {
109         ZLOGD("query end, table:%{public}s", Anonymous::Change(tableName).c_str());
110         return DBStatus::QUERY_END;
111     }
112     return ConvertStatus(static_cast<GeneralError>(err));
113 }
114 
PreSharing(const std::string & tableName,VBuckets & extend)115 DistributedData::GeneralError RdbCloud::PreSharing(const std::string& tableName, VBuckets& extend)
116 {
117     return static_cast<GeneralError>(cloudDB_->PreSharing(tableName, extend));
118 }
119 
Lock()120 std::pair<DBStatus, uint32_t> RdbCloud::Lock()
121 {
122     auto result = InnerLock(FLAG::SYSTEM_ABILITY);
123     return { ConvertStatus(result.first), result.second };
124 }
125 
UnLock()126 DBStatus RdbCloud::UnLock()
127 {
128     return ConvertStatus(InnerUnLock(FLAG::SYSTEM_ABILITY));
129 }
130 
HeartBeat()131 DBStatus RdbCloud::HeartBeat()
132 {
133     auto error = cloudDB_->Heartbeat();
134     return ConvertStatus(static_cast<GeneralError>(error));
135 }
136 
Close()137 DBStatus RdbCloud::Close()
138 {
139     auto error = cloudDB_->Close();
140     return ConvertStatus(static_cast<GeneralError>(error));
141 }
142 
InnerLock(FLAG flag)143 std::pair<GeneralError, uint32_t> RdbCloud::InnerLock(FLAG flag)
144 {
145     std::lock_guard<decltype(mutex_)> lock(mutex_);
146     flag_ |= flag;
147     // int64_t <-> uint32_t, s <-> ms
148     return std::make_pair(static_cast<GeneralError>(cloudDB_->Lock()), cloudDB_->AliveTime() * TO_MS);
149 }
150 
InnerUnLock(FLAG flag)151 GeneralError RdbCloud::InnerUnLock(FLAG flag)
152 {
153     std::lock_guard<decltype(mutex_)> lock(mutex_);
154     flag_ &= ~flag;
155     if (flag_ == 0) {
156         return static_cast<GeneralError>(cloudDB_->Unlock());
157     }
158     return GeneralError::E_OK;
159 }
160 
LockCloudDB(FLAG flag)161 std::pair<GeneralError, uint32_t> RdbCloud::LockCloudDB(FLAG flag)
162 {
163     return InnerLock(flag);
164 }
165 
UnLockCloudDB(FLAG flag)166 GeneralError RdbCloud::UnLockCloudDB(FLAG flag)
167 {
168     return InnerUnLock(flag);
169 }
170 
GetEmptyCursor(const std::string & tableName)171 std::pair<DBStatus, std::string> RdbCloud::GetEmptyCursor(const std::string &tableName)
172 {
173     auto [error, cursor] = cloudDB_->GetEmptyCursor(tableName);
174     return { ConvertStatus(static_cast<GeneralError>(error)), cursor };
175 }
176 
ConvertStatus(DistributedData::GeneralError error)177 DBStatus RdbCloud::ConvertStatus(DistributedData::GeneralError error)
178 {
179     switch (error) {
180         case GeneralError::E_OK:
181             return DBStatus::OK;
182         case GeneralError::E_NETWORK_ERROR:
183             return DBStatus::CLOUD_NETWORK_ERROR;
184         case GeneralError::E_LOCKED_BY_OTHERS:
185             return DBStatus::CLOUD_LOCK_ERROR;
186         case GeneralError::E_RECODE_LIMIT_EXCEEDED:
187             return DBStatus::CLOUD_FULL_RECORDS;
188         case GeneralError::E_NO_SPACE_FOR_ASSET:
189             return DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT;
190         case GeneralError::E_VERSION_CONFLICT:
191             return DBStatus::CLOUD_VERSION_CONFLICT;
192         case GeneralError::E_RECORD_EXIST_CONFLICT:
193             return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
194         case GeneralError::E_RECORD_NOT_FOUND:
195             return DBStatus::CLOUD_RECORD_NOT_FOUND;
196         case GeneralError::E_RECORD_ALREADY_EXISTED:
197             return DBStatus::CLOUD_RECORD_ALREADY_EXISTED;
198         case GeneralError::E_FILE_NOT_EXIST:
199             return DBStatus::LOCAL_ASSET_NOT_FOUND;
200         case GeneralError::E_TIME_OUT:
201             return DBStatus::TIME_OUT;
202         case GeneralError::E_CLOUD_DISABLED:
203             return DBStatus::CLOUD_DISABLED;
204         case GeneralError::E_SKIP_ASSET:
205             return DBStatus::SKIP_ASSET;
206         default:
207             ZLOGI("error:0x%{public}x", error);
208             break;
209     }
210     return DBStatus::CLOUD_ERROR;
211 }
212 
ConvertQuery(RdbCloud::DBVBucket & extend)213 std::pair<RdbCloud::QueryNodes, DistributedData::GeneralError> RdbCloud::ConvertQuery(RdbCloud::DBVBucket& extend)
214 {
215     auto it = extend.find(TYPE_FIELD);
216     if (it == extend.end() || std::get<int64_t>(it->second) != static_cast<int64_t>(CloudQueryType::QUERY_FIELD)) {
217         return { {}, GeneralError::E_ERROR };
218     }
219     it = extend.find(QUERY_FIELD);
220     if (it == extend.end()) {
221         ZLOGE("error, no QUERY_FIELD!");
222         return { {}, GeneralError::E_ERROR };
223     }
224 
225     auto bytes = std::get_if<DistributedDB::Bytes>(&it->second);
226     std::vector<DistributedDB::QueryNode> nodes;
227     DBStatus status = DB_ERROR;
228     if (bytes != nullptr) {
229         nodes = DistributedDB::RelationalStoreManager::ParserQueryNodes(*bytes, status);
230     }
231     if (status != OK) {
232         ZLOGE("error, ParserQueryNodes failed, status:%{public}d", status);
233         return { {}, GeneralError::E_ERROR };
234     }
235     return { ConvertQuery(std::move(nodes)), GeneralError::E_OK };
236 }
237 
ConvertQuery(RdbCloud::DBQueryNodes && nodes)238 RdbCloud::QueryNodes RdbCloud::ConvertQuery(RdbCloud::DBQueryNodes&& nodes)
239 {
240     QueryNodes queryNodes;
241     queryNodes.reserve(nodes.size());
242     for (auto& node : nodes) {
243         QueryNode queryNode;
244         queryNode.fieldName = std::move(node.fieldName);
245         queryNode.fieldValue = ValueProxy::Convert(std::move(node.fieldValue));
246         switch (node.type) {
247             case QueryNodeType::IN:
248                 queryNode.op = QueryOperation::IN;
249                 break;
250             case QueryNodeType::OR:
251                 queryNode.op = QueryOperation::OR;
252                 break;
253             case QueryNodeType::AND:
254                 queryNode.op = QueryOperation::AND;
255                 break;
256             case QueryNodeType::EQUAL_TO:
257                 queryNode.op = QueryOperation::EQUAL_TO;
258                 break;
259             case QueryNodeType::BEGIN_GROUP:
260                 queryNode.op = QueryOperation::BEGIN_GROUP;
261                 break;
262             case QueryNodeType::END_GROUP:
263                 queryNode.op = QueryOperation::END_GROUP;
264                 break;
265             default:
266                 ZLOGE("invalid operation:0x%{public}d", node.type);
267                 return {};
268         }
269         queryNodes.emplace_back(std::move(queryNode));
270     }
271     return queryNodes;
272 }
273 
PostEvent(VBuckets & records,std::set<std::string> & skipAssets,VBuckets & extend,DistributedData::AssetEvent eventId)274 void RdbCloud::PostEvent(VBuckets& records, std::set<std::string>& skipAssets, VBuckets& extend,
275     DistributedData::AssetEvent eventId)
276 {
277     int32_t index = 0;
278     for (auto& record : records) {
279         DataBucket& ext = extend[index++];
280         for (auto& [key, value] : record) {
281             PostEvent(value, ext, skipAssets, eventId);
282         }
283     }
284 }
285 
PostEvent(DistributedData::Value & value,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)286 void RdbCloud::PostEvent(DistributedData::Value& value, DataBucket& extend, std::set<std::string>& skipAssets,
287     DistributedData::AssetEvent eventId)
288 {
289     if (value.index() != TYPE_INDEX<DistributedData::Asset> && value.index() != TYPE_INDEX<DistributedData::Assets>) {
290         return;
291     }
292 
293     if (value.index() == TYPE_INDEX<DistributedData::Asset>) {
294         auto* asset = Traits::get_if<DistributedData::Asset>(&value);
295         PostEventAsset(*asset, extend, skipAssets, eventId);
296     }
297 
298     if (value.index() == TYPE_INDEX<DistributedData::Assets>) {
299         auto* assets = Traits::get_if<DistributedData::Assets>(&value);
300         for (auto& asset : *assets) {
301             PostEventAsset(asset, extend, skipAssets, eventId);
302         }
303     }
304 }
305 
PostEventAsset(DistributedData::Asset & asset,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)306 void RdbCloud::PostEventAsset(DistributedData::Asset& asset, DataBucket& extend, std::set<std::string>& skipAssets,
307     DistributedData::AssetEvent eventId)
308 {
309     if (snapshots_ == nullptr) {
310         return;
311     }
312     auto it = snapshots_->find(asset.uri);
313     if (it == snapshots_->end() || it->second == nullptr) {
314         return;
315     }
316 
317     if (eventId == DistributedData::UPLOAD) {
318         it->second->Upload(asset);
319         if (it->second->GetAssetStatus(asset) == TransferStatus::STATUS_WAIT_UPLOAD) {
320             skipAssets.insert(asset.uri);
321             extend[SchemaMeta::ERROR_FIELD] = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
322         }
323     }
324 
325     if (eventId == DistributedData::UPLOAD_FINISHED) {
326         auto skip = skipAssets.find(asset.uri);
327         if (skip != skipAssets.end()) {
328             return;
329         }
330         it->second->Uploaded(asset);
331     }
332 }
333 
GetLockFlag() const334 uint8_t RdbCloud::GetLockFlag() const
335 {
336     return flag_;
337 }
338 
ConvertErrorField(DistributedData::VBuckets & extends)339 void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
340 {
341     for (auto& extend : extends) {
342         auto errorField = extend.find(SchemaMeta::ERROR_FIELD);
343         if (errorField == extend.end()) {
344             continue;
345         }
346         auto errCode = Traits::get_if<int64_t>(&(errorField->second));
347         if (errCode == nullptr) {
348             continue;
349         }
350         errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
351     }
352 }
353 
SetPrepareTraceId(const std::string & traceId)354 void RdbCloud::SetPrepareTraceId(const std::string &traceId)
355 {
356     if (cloudDB_ == nullptr) {
357         return;
358     }
359     cloudDB_->SetPrepareTraceId(traceId);
360 }
361 } // namespace OHOS::DistributedRdb