• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_proxy.h"
17 
18 #include "cloud/schema_mgr.h"
19 #include "store_types.h"
20 
21 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)22 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
23     :store_(iCloud),
24     transactionExeFlag_(false),
25     isWrite_(false)
26 {
27 }
28 
GetCloudDb(ICloudSyncStorageInterface * iCloud)29 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
30 {
31     std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
32     proxy->Init();
33     return proxy;
34 }
35 
Init()36 void StorageProxy::Init()
37 {
38     cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
39 }
40 
Close()41 int StorageProxy::Close()
42 {
43     std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
44     if (transactionExeFlag_.load()) {
45         LOGE("the transaction has been started, storage proxy can not closed");
46         return -E_BUSY;
47     }
48     store_ = nullptr;
49     cloudMetaData_ = nullptr;
50     return E_OK;
51 }
52 
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)53 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
54 {
55     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
56     if (cloudMetaData_ == nullptr) {
57         return -E_INVALID_DB;
58     }
59     if (transactionExeFlag_.load() && isWrite_.load()) {
60         LOGE("the write transaction has been started, can not get meta");
61         return -E_BUSY;
62     }
63     return cloudMetaData_->GetLocalWaterMark(tableName, localMark);
64 }
65 
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)66 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
67 {
68     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
69     if (cloudMetaData_ == nullptr) {
70         return -E_INVALID_DB;
71     }
72     if (transactionExeFlag_.load() && isWrite_.load()) {
73         LOGE("the write transaction has been started, can not put meta");
74         return -E_BUSY;
75     }
76     return cloudMetaData_->SetLocalWaterMark(tableName, localMark);
77 }
78 
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)79 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
80 {
81     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
82     if (cloudMetaData_ == nullptr) {
83         return -E_INVALID_DB;
84     }
85     return cloudMetaData_->GetCloudWaterMark(tableName, cloudMark);
86 }
87 
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)88 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
89 {
90     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
91     if (cloudMetaData_ == nullptr) {
92         return -E_INVALID_DB;
93     }
94     return cloudMetaData_->SetCloudWaterMark(tableName, cloudMark);
95 }
96 
StartTransaction(TransactType type)97 int StorageProxy::StartTransaction(TransactType type)
98 {
99     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
100     if (store_ == nullptr) {
101         return -E_INVALID_DB;
102     }
103     int errCode = store_->StartTransaction(type);
104     if (errCode == E_OK) {
105         transactionExeFlag_.store(true);
106         isWrite_.store(type == TransactType::IMMEDIATE);
107     }
108     return errCode;
109 }
110 
Commit()111 int StorageProxy::Commit()
112 {
113     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
114     if (store_ == nullptr) {
115         return -E_INVALID_DB;
116     }
117     int errCode = store_->Commit();
118     if (errCode == E_OK) {
119         transactionExeFlag_.store(false);
120     }
121     return errCode;
122 }
123 
Rollback()124 int StorageProxy::Rollback()
125 {
126     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
127     if (store_ == nullptr) {
128         return -E_INVALID_DB;
129     }
130     int errCode = store_->Rollback();
131     if (errCode == E_OK) {
132         transactionExeFlag_.store(false);
133     }
134     return errCode;
135 }
136 
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)137 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
138     const bool isCloudForcePush, int64_t &count)
139 {
140     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
141     if (store_ == nullptr) {
142         return -E_INVALID_DB;
143     }
144     if (!transactionExeFlag_.load()) {
145         LOGE("the transaction has not been started");
146         return -E_TRANSACT_STATE;
147     }
148     return store_->GetUploadCount(tableName, localMark, isCloudForcePush, count);
149 }
150 
FillCloudGid(const CloudSyncData & data)151 int StorageProxy::FillCloudGid(const CloudSyncData &data)
152 {
153     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
154     if (store_ == nullptr) {
155         return -E_INVALID_DB;
156     }
157     if (!transactionExeFlag_.load()) {
158         LOGE("the transaction has not been started");
159         return -E_TRANSACT_STATE;
160     }
161     return store_->FillCloudGid(data);
162 }
163 
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)164 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
165     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
166 {
167     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
168     if (store_ == nullptr) {
169         return -E_INVALID_DB;
170     }
171     if (!transactionExeFlag_.load()) {
172         LOGE("the transaction has not been started");
173         return -E_TRANSACT_STATE;
174     }
175     TableSchema tableSchema;
176     int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
177     if (errCode != E_OK) {
178         return errCode;
179     }
180     return store_->GetCloudData(tableSchema, timeRange, continueStmtToken, cloudDataResult);
181 }
182 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const183 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
184 {
185     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
186     if (store_ == nullptr) {
187         return -E_INVALID_DB;
188     }
189     if (!transactionExeFlag_.load()) {
190         LOGE("the transaction has not been started");
191         return -E_TRANSACT_STATE;
192     }
193     return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
194 }
195 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)196 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
197     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
198 {
199     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
200     if (store_ == nullptr) {
201         return -E_INVALID_DB;
202     }
203     if (!transactionExeFlag_.load()) {
204         LOGE("the transaction has not been started");
205         return -E_TRANSACT_STATE;
206     }
207 
208     int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
209     if (errCode == E_OK) {
210         dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
211         dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
212     }
213     return errCode;
214 }
215 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)216 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
217 {
218     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
219     if (store_ == nullptr) {
220         return -E_INVALID_DB;
221     }
222     if (!transactionExeFlag_.load()) {
223         LOGE("the transaction has not been started");
224         return -E_TRANSACT_STATE;
225     }
226 
227     return store_->PutCloudSyncData(tableName, downloadData);
228 }
229 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)230 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
231     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
232 {
233     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234     if (store_ == nullptr) {
235         return -E_INVALID_DB;
236     }
237     if (!transactionExeFlag_.load()) {
238         LOGE("the transaction has not been started");
239         return -E_TRANSACT_STATE;
240     }
241     return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
242 }
243 
ReleaseContinueToken(ContinueToken & continueStmtToken)244 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
245 {
246     return store_->ReleaseCloudDataToken(continueStmtToken);
247 }
248 
CheckSchema(const TableName & tableName) const249 int StorageProxy::CheckSchema(const TableName &tableName) const
250 {
251     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252     if (store_ == nullptr) {
253         return -E_INVALID_DB;
254     }
255     return store_->ChkSchema(tableName);
256 }
257 
CheckSchema(std::vector<std::string> & tables)258 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
259 {
260     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
261     if (store_ == nullptr) {
262         return -E_INVALID_DB;
263     }
264     if (tables.empty()) {
265         return -E_INVALID_ARGS;
266     }
267     for (const auto &table : tables) {
268         int ret = store_->ChkSchema(table);
269         if (ret != E_OK) {
270             return ret;
271         }
272     }
273     return E_OK;
274 }
275 
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)276 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
277     std::vector<Field> &assetFields)
278 {
279     if (!colNames.empty()) {
280         // output parameter should be empty
281         return -E_INVALID_ARGS;
282     }
283 
284     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
285     if (store_ == nullptr) {
286         return -E_INVALID_DB;
287     }
288     // GetTableInfo
289     TableSchema tableSchema;
290     int ret = store_->GetCloudTableSchema(tableName, tableSchema);
291     if (ret != E_OK) {
292         LOGE("Cannot get cloud table schema: %d", ret);
293         return ret;
294     }
295     for (const auto &field : tableSchema.fields) {
296         if (field.primary) {
297             colNames.push_back(field.colName);
298         }
299         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
300             assetFields.push_back(field);
301         }
302     }
303     if (colNames.empty() || colNames.size() > 1) {
304         (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
305     }
306     return E_OK;
307 }
308 
NotifyChangedData(const std::string deviceName,ChangedData && changedData)309 int StorageProxy::NotifyChangedData(const std::string deviceName, ChangedData &&changedData)
310 {
311     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
312     if (store_ == nullptr) {
313         return -E_INVALID_DB;
314     }
315     store_->TriggerObserverAction(deviceName, std::move(changedData), true);
316     return E_OK;
317 }
318 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)319 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
320 {
321     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
322     if (store_ == nullptr) {
323         return -E_INVALID_DB;
324     }
325     if (!transactionExeFlag_.load() || !isWrite_.load()) {
326         LOGE("the write transaction has not started before fill download assets");
327         return -E_TRANSACT_STATE;
328     }
329     return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
330 }
331 
SetLogTriggerStatus(bool status)332 int StorageProxy::SetLogTriggerStatus(bool status)
333 {
334     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
335     if (store_ == nullptr) {
336         return -E_INVALID_DB;
337     }
338     return store_->SetLogTriggerStatus(status);
339 }
340 
FillCloudGidAndAsset(OpType opType,const CloudSyncData & data)341 int StorageProxy::FillCloudGidAndAsset(OpType opType, const CloudSyncData &data)
342 {
343     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
344     if (store_ == nullptr) {
345         return -E_INVALID_DB;
346     }
347     if (!transactionExeFlag_.load()) {
348         LOGE("the transaction has not been started");
349         return -E_TRANSACT_STATE;
350     }
351     return store_->FillCloudGidAndAsset(opType, data);
352 }
353 
GetIdentify() const354 std::string StorageProxy::GetIdentify() const
355 {
356     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
357     if (store_ == nullptr) {
358         LOGW("[StorageProxy] store is nullptr return default");
359         return "";
360     }
361     return store_->GetIdentify();
362 }
363 
EraseNanoTime(DistributedDB::Timestamp localTime)364 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
365 {
366     return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
367 }
368 
CleanWaterMark(const DistributedDB::TableName & tableName)369 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
370 {
371     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
372     if (cloudMetaData_ == nullptr) {
373         LOGW("[StorageProxy] meta is nullptr return default");
374         return -E_INVALID_DB;
375     }
376     return cloudMetaData_->CleanWaterMark(tableName);
377 }
378 }
379