• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "RdbCloud"
17 #include "rdb_cloud.h"
18 
19 #include "cloud/schema_meta.h"
20 #include "log_print.h"
21 #include "rdb_query.h"
22 #include "relational_store_manager.h"
23 #include "utils/anonymous.h"
24 #include "value_proxy.h"
25 
26 namespace OHOS::DistributedRdb {
27 using namespace DistributedDB;
28 using namespace DistributedData;
RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB,BindAssets * bindAssets)29 RdbCloud::RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB, BindAssets* bindAssets)
30     : cloudDB_(std::move(cloudDB)), snapshots_(bindAssets)
31 {
32 }
33 
BatchInsert(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)34 DBStatus RdbCloud::BatchInsert(
35     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
36 {
37     extend.resize(record.size());
38     VBuckets extends = ValueProxy::Convert(std::move(extend));
39     VBuckets records = ValueProxy::Convert(std::move(record));
40     std::set<std::string> skipAssets;
41     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
42     VBuckets temp = records;
43     auto error = cloudDB_->BatchInsert(tableName, std::move(records), extends);
44     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
45     ConvertErrorField(extends);
46     extend = ValueProxy::Convert(std::move(extends));
47     return ConvertStatus(static_cast<GeneralError>(error));
48 }
49 
BatchUpdate(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)50 DBStatus RdbCloud::BatchUpdate(
51     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
52 {
53     extend.resize(record.size());
54     VBuckets extends = ValueProxy::Convert(std::move(extend));
55     VBuckets records = ValueProxy::Convert(std::move(record));
56     std::set<std::string> skipAssets;
57     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
58     VBuckets temp = records;
59     auto error = cloudDB_->BatchUpdate(tableName, std::move(records), extends);
60     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
61     ConvertErrorField(extends);
62     extend = ValueProxy::Convert(std::move(extends));
63     return ConvertStatus(static_cast<GeneralError>(error));
64 }
65 
BatchDelete(const std::string & tableName,std::vector<DBVBucket> & extend)66 DBStatus RdbCloud::BatchDelete(const std::string &tableName, std::vector<DBVBucket> &extend)
67 {
68     VBuckets extends = ValueProxy::Convert(std::move(extend));
69     auto error = cloudDB_->BatchDelete(tableName, extends);
70     ConvertErrorField(extends);
71     extend = ValueProxy::Convert(std::move(extends));
72     return ConvertStatus(static_cast<GeneralError>(error));
73 }
74 
Query(const std::string & tableName,DBVBucket & extend,std::vector<DBVBucket> & data)75 DBStatus RdbCloud::Query(const std::string &tableName, DBVBucket &extend, std::vector<DBVBucket> &data)
76 {
77     auto [nodes, status] = ConvertQuery(extend);
78     std::shared_ptr<Cursor> cursor = nullptr;
79     if (status == GeneralError::E_OK && !nodes.empty()) {
80         RdbQuery query;
81         query.SetQueryNodes(tableName, std::move(nodes));
82         cursor = cloudDB_->Query(query, ValueProxy::Convert(std::move(extend)));
83     } else {
84         cursor = cloudDB_->Query(tableName, ValueProxy::Convert(std::move(extend)));
85     }
86     if (cursor == nullptr) {
87         ZLOGE("cursor is null, table:%{public}s, extend:%{public}zu",
88             Anonymous::Change(tableName).c_str(), extend.size());
89         return ConvertStatus(static_cast<GeneralError>(E_ERROR));
90     }
91     int32_t count = cursor->GetCount();
92     data.reserve(count);
93     auto err = cursor->MoveToFirst();
94     while (err == E_OK && count > 0) {
95         DistributedData::VBucket entry;
96         err = cursor->GetEntry(entry);
97         if (err != E_OK) {
98             break;
99         }
100         data.emplace_back(ValueProxy::Convert(std::move(entry)));
101         err = cursor->MoveToNext();
102         count--;
103     }
104     DistributedData::Value cursorFlag;
105     cursor->Get(SchemaMeta::CURSOR_FIELD, cursorFlag);
106     extend[SchemaMeta::CURSOR_FIELD] = ValueProxy::Convert(std::move(cursorFlag));
107     if (cursor->IsEnd()) {
108         ZLOGD("query end, table:%{public}s", Anonymous::Change(tableName).c_str());
109         return DBStatus::QUERY_END;
110     }
111     return ConvertStatus(static_cast<GeneralError>(err));
112 }
113 
PreSharing(const std::string & tableName,VBuckets & extend)114 DistributedData::GeneralError RdbCloud::PreSharing(const std::string& tableName, VBuckets& extend)
115 {
116     return static_cast<GeneralError>(cloudDB_->PreSharing(tableName, extend));
117 }
118 
Lock()119 std::pair<DBStatus, uint32_t> RdbCloud::Lock()
120 {
121     auto result = InnerLock(FLAG::SYSTEM_ABILITY);
122     return { ConvertStatus(result.first), result.second };
123 }
124 
UnLock()125 DBStatus RdbCloud::UnLock()
126 {
127     return ConvertStatus(InnerUnLock(FLAG::SYSTEM_ABILITY));
128 }
129 
HeartBeat()130 DBStatus RdbCloud::HeartBeat()
131 {
132     auto error = cloudDB_->Heartbeat();
133     return ConvertStatus(static_cast<GeneralError>(error));
134 }
135 
Close()136 DBStatus RdbCloud::Close()
137 {
138     auto error = cloudDB_->Close();
139     return ConvertStatus(static_cast<GeneralError>(error));
140 }
141 
InnerLock(FLAG flag)142 std::pair<GeneralError, uint32_t> RdbCloud::InnerLock(FLAG flag)
143 {
144     std::lock_guard<decltype(mutex_)> lock(mutex_);
145     flag_ |= flag;
146     // int64_t <-> uint32_t, s <-> ms
147     return std::make_pair(static_cast<GeneralError>(cloudDB_->Lock()), cloudDB_->AliveTime() * TO_MS);
148 }
149 
InnerUnLock(FLAG flag)150 GeneralError RdbCloud::InnerUnLock(FLAG flag)
151 {
152     std::lock_guard<decltype(mutex_)> lock(mutex_);
153     flag_ &= ~flag;
154     if (flag_ == 0) {
155         return static_cast<GeneralError>(cloudDB_->Unlock());
156     }
157     return GeneralError::E_OK;
158 }
159 
LockCloudDB(FLAG flag)160 std::pair<GeneralError, uint32_t> RdbCloud::LockCloudDB(FLAG flag)
161 {
162     return InnerLock(flag);
163 }
164 
UnLockCloudDB(FLAG flag)165 GeneralError RdbCloud::UnLockCloudDB(FLAG flag)
166 {
167     return InnerUnLock(flag);
168 }
169 
GetEmptyCursor(const std::string & tableName)170 std::pair<DBStatus, std::string> RdbCloud::GetEmptyCursor(const std::string &tableName)
171 {
172     auto [error, cursor] = cloudDB_->GetEmptyCursor(tableName);
173     return { ConvertStatus(static_cast<GeneralError>(error)), cursor };
174 }
175 
ConvertStatus(DistributedData::GeneralError error)176 DBStatus RdbCloud::ConvertStatus(DistributedData::GeneralError error)
177 {
178     switch (error) {
179         case GeneralError::E_OK:
180             return DBStatus::OK;
181         case GeneralError::E_NETWORK_ERROR:
182             return DBStatus::CLOUD_NETWORK_ERROR;
183         case GeneralError::E_LOCKED_BY_OTHERS:
184             return DBStatus::CLOUD_LOCK_ERROR;
185         case GeneralError::E_RECODE_LIMIT_EXCEEDED:
186             return DBStatus::CLOUD_FULL_RECORDS;
187         case GeneralError::E_NO_SPACE_FOR_ASSET:
188             return DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT;
189         case GeneralError::E_VERSION_CONFLICT:
190             return DBStatus::CLOUD_VERSION_CONFLICT;
191         case GeneralError::E_RECORD_EXIST_CONFLICT:
192             return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
193         case GeneralError::E_RECORD_NOT_FOUND:
194             return DBStatus::CLOUD_RECORD_NOT_FOUND;
195         case GeneralError::E_RECORD_ALREADY_EXISTED:
196             return DBStatus::CLOUD_RECORD_ALREADY_EXISTED;
197         case GeneralError::E_FILE_NOT_EXIST:
198             return DBStatus::LOCAL_ASSET_NOT_FOUND;
199         case GeneralError::E_TIME_OUT:
200             return DBStatus::TIME_OUT;
201         case GeneralError::E_CLOUD_DISABLED:
202             return DBStatus::CLOUD_DISABLED;
203         case GeneralError::E_SKIP_ASSET:
204             return DBStatus::SKIP_ASSET;
205         default:
206             ZLOGI("error:0x%{public}x", error);
207             break;
208     }
209     return DBStatus::CLOUD_ERROR;
210 }
211 
ConvertQuery(RdbCloud::DBVBucket & extend)212 std::pair<RdbCloud::QueryNodes, DistributedData::GeneralError> RdbCloud::ConvertQuery(RdbCloud::DBVBucket& extend)
213 {
214     auto it = extend.find(TYPE_FIELD);
215     if (it == extend.end() || std::get<int64_t>(it->second) != static_cast<int64_t>(CloudQueryType::QUERY_FIELD)) {
216         return { {}, GeneralError::E_ERROR };
217     }
218     it = extend.find(QUERY_FIELD);
219     if (it == extend.end()) {
220         ZLOGE("error, no QUERY_FIELD!");
221         return { {}, GeneralError::E_ERROR };
222     }
223 
224     auto bytes = std::get_if<DistributedDB::Bytes>(&it->second);
225     std::vector<DistributedDB::QueryNode> nodes;
226     DBStatus status = DB_ERROR;
227     if (bytes != nullptr) {
228         nodes = DistributedDB::RelationalStoreManager::ParserQueryNodes(*bytes, status);
229     }
230     if (status != OK) {
231         ZLOGE("error, ParserQueryNodes failed, status:%{public}d", status);
232         return { {}, GeneralError::E_ERROR };
233     }
234     return { ConvertQuery(std::move(nodes)), GeneralError::E_OK };
235 }
236 
ConvertQuery(RdbCloud::DBQueryNodes && nodes)237 RdbCloud::QueryNodes RdbCloud::ConvertQuery(RdbCloud::DBQueryNodes&& nodes)
238 {
239     QueryNodes queryNodes;
240     queryNodes.reserve(nodes.size());
241     for (auto& node : nodes) {
242         QueryNode queryNode;
243         queryNode.fieldName = std::move(node.fieldName);
244         queryNode.fieldValue = ValueProxy::Convert(std::move(node.fieldValue));
245         switch (node.type) {
246             case QueryNodeType::IN:
247                 queryNode.op = QueryOperation::IN;
248                 break;
249             case QueryNodeType::OR:
250                 queryNode.op = QueryOperation::OR;
251                 break;
252             case QueryNodeType::AND:
253                 queryNode.op = QueryOperation::AND;
254                 break;
255             case QueryNodeType::EQUAL_TO:
256                 queryNode.op = QueryOperation::EQUAL_TO;
257                 break;
258             case QueryNodeType::BEGIN_GROUP:
259                 queryNode.op = QueryOperation::BEGIN_GROUP;
260                 break;
261             case QueryNodeType::END_GROUP:
262                 queryNode.op = QueryOperation::END_GROUP;
263                 break;
264             default:
265                 ZLOGE("invalid operation:0x%{public}d", node.type);
266                 return {};
267         }
268         queryNodes.emplace_back(std::move(queryNode));
269     }
270     return queryNodes;
271 }
272 
PostEvent(VBuckets & records,std::set<std::string> & skipAssets,VBuckets & extend,DistributedData::AssetEvent eventId)273 void RdbCloud::PostEvent(VBuckets& records, std::set<std::string>& skipAssets, VBuckets& extend,
274     DistributedData::AssetEvent eventId)
275 {
276     int32_t index = 0;
277     for (auto& record : records) {
278         DataBucket& ext = extend[index++];
279         for (auto& [key, value] : record) {
280             PostEvent(value, ext, skipAssets, eventId);
281         }
282     }
283 }
284 
PostEvent(DistributedData::Value & value,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)285 void RdbCloud::PostEvent(DistributedData::Value& value, DataBucket& extend, std::set<std::string>& skipAssets,
286     DistributedData::AssetEvent eventId)
287 {
288     if (value.index() != TYPE_INDEX<DistributedData::Asset> && value.index() != TYPE_INDEX<DistributedData::Assets>) {
289         return;
290     }
291 
292     if (value.index() == TYPE_INDEX<DistributedData::Asset>) {
293         auto* asset = Traits::get_if<DistributedData::Asset>(&value);
294         PostEventAsset(*asset, extend, skipAssets, eventId);
295     }
296 
297     if (value.index() == TYPE_INDEX<DistributedData::Assets>) {
298         auto* assets = Traits::get_if<DistributedData::Assets>(&value);
299         for (auto& asset : *assets) {
300             PostEventAsset(asset, extend, skipAssets, eventId);
301         }
302     }
303 }
304 
PostEventAsset(DistributedData::Asset & asset,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)305 void RdbCloud::PostEventAsset(DistributedData::Asset& asset, DataBucket& extend, std::set<std::string>& skipAssets,
306     DistributedData::AssetEvent eventId)
307 {
308     if (snapshots_->bindAssets == nullptr) {
309         return;
310     }
311     auto it = snapshots_->bindAssets->find(asset.uri);
312     if (it == snapshots_->bindAssets->end() || it->second == nullptr) {
313         return;
314     }
315 
316     if (eventId == DistributedData::UPLOAD) {
317         it->second->Upload(asset);
318         if (it->second->GetAssetStatus(asset) == TransferStatus::STATUS_WAIT_UPLOAD) {
319             skipAssets.insert(asset.uri);
320             extend[SchemaMeta::ERROR_FIELD] = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
321         }
322     }
323 
324     if (eventId == DistributedData::UPLOAD_FINISHED) {
325         auto skip = skipAssets.find(asset.uri);
326         if (skip != skipAssets.end()) {
327             return;
328         }
329         it->second->Uploaded(asset);
330     }
331 }
332 
GetLockFlag() const333 uint8_t RdbCloud::GetLockFlag() const
334 {
335     return flag_;
336 }
337 
ConvertErrorField(DistributedData::VBuckets & extends)338 void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
339 {
340     for (auto& extend : extends) {
341         auto errorField = extend.find(SchemaMeta::ERROR_FIELD);
342         if (errorField == extend.end()) {
343             continue;
344         }
345         auto errCode = Traits::get_if<int64_t>(&(errorField->second));
346         if (errCode == nullptr) {
347             continue;
348         }
349         errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
350     }
351 }
352 
SetPrepareTraceId(const std::string & traceId)353 void RdbCloud::SetPrepareTraceId(const std::string &traceId)
354 {
355     if (cloudDB_ == nullptr) {
356         return;
357     }
358     cloudDB_->SetPrepareTraceId(traceId);
359 }
360 } // namespace OHOS::DistributedRdb