• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RelationalStore"
16 #include "relational_store.h"
17 
18 #include "logger.h"
19 #include "modify_time_cursor.h"
20 #include "raw_data_parser.h"
21 #include "rdb_errno.h"
22 #include "rdb_helper.h"
23 #include "rdb_predicates.h"
24 #include "rdb_sql_utils.h"
25 #include "relational_cursor.h"
26 #include "relational_predicates.h"
27 #include "relational_predicates_objects.h"
28 #include "relational_store_error_code.h"
29 #include "relational_store_impl.h"
30 #include "relational_types_v0.h"
31 #include "relational_values_bucket.h"
32 #include "securec.h"
33 #include "sqlite_global_config.h"
34 #include "convertor_error_code.h"
35 
36 using namespace OHOS::RdbNdk;
37 using namespace OHOS::DistributedRdb;
38 constexpr int RDB_STORE_CID = 1234560; // The class id used to uniquely identify the OH_Rdb_Store class.
39 constexpr int RDB_CONFIG_SIZE_V0 = 41;
40 constexpr int RDB_CONFIG_SIZE_V1 = 45;
OH_Rdb_CreateValueObject()41 OH_VObject *OH_Rdb_CreateValueObject()
42 {
43     return new (std::nothrow) RelationalPredicatesObjects();
44 }
45 
OH_Rdb_CreateValuesBucket()46 OH_VBucket *OH_Rdb_CreateValuesBucket()
47 {
48     return new (std::nothrow) RelationalValuesBucket();
49 }
50 
OH_Rdb_CreatePredicates(const char * table)51 OH_Predicates *OH_Rdb_CreatePredicates(const char *table)
52 {
53     if (table == nullptr) {
54         return nullptr;
55     }
56     return new (std::nothrow) RelationalPredicate(table);
57 }
58 
RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store)59 OHOS::RdbNdk::RelationalStore::RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store) : store_(store)
60 {
61     id = RDB_STORE_CID;
62 }
63 
SubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)64 int RelationalStore::SubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
65 {
66     std::lock_guard<decltype(mutex_)> lock(mutex_) ;
67     bool result = std::any_of(callbacks_.begin(), callbacks_.end(), [callback](const auto &observer) {
68         return *observer == callback;
69     });
70     if (result) {
71         LOG_INFO("duplicate subscribe.");
72         return OH_Rdb_ErrCode::RDB_OK;
73     }
74     auto obs = std::make_shared<NDKDetailProgressObserver>(callback);
75     int errCode = store_->RegisterAutoSyncCallback(obs);
76     if (errCode == NativeRdb::E_OK) {
77         LOG_ERROR("subscribe failed.");
78         return ConvertorErrorCode::NativeToNdk(errCode);
79     }
80     callbacks_.push_back(std::move(obs));
81     return OH_Rdb_ErrCode::RDB_OK;
82 }
83 
UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)84 int RelationalStore::UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
85 {
86     std::lock_guard<decltype(mutex_)> lock(mutex_) ;
87     for (auto it = callbacks_.begin(); it != callbacks_.end();) {
88         if (callback != nullptr && !(**it == callback)) {
89             ++it;
90             continue;
91         }
92 
93         int errCode = store_->UnregisterAutoSyncCallback(*it);
94         if (errCode != NativeRdb::E_OK) {
95             LOG_ERROR("unsubscribe failed.");
96             return ConvertorErrorCode::NativeToNdk(errCode);
97         }
98         it = callbacks_.erase(it);
99         LOG_DEBUG("progress unsubscribe success.");
100     }
101     return OH_Rdb_ErrCode::RDB_OK;
102 }
103 
~RelationalStore()104 RelationalStore::~RelationalStore()
105 {
106     if (store_ == nullptr || callbacks_.empty()) {
107         return;
108     }
109     for (auto &callback : callbacks_) {
110         store_->UnregisterAutoSyncCallback(callback);
111     }
112 }
113 
TransformMode(Rdb_SyncMode & mode)114 SyncMode NDKUtils::TransformMode(Rdb_SyncMode &mode)
115 {
116     switch (mode) {
117         case RDB_SYNC_MODE_TIME_FIRST:
118             return TIME_FIRST;
119         case RDB_SYNC_MODE_NATIVE_FIRST:
120             return NATIVE_FIRST;
121         case RDB_SYNC_MODE_CLOUD_FIRST:
122             return CLOUD_FIRST;
123         default:
124             return static_cast<SyncMode>(-1);
125     }
126 }
127 
GetSubscribeType(Rdb_SubscribeType & type)128 OHOS::DistributedRdb::SubscribeMode NDKUtils::GetSubscribeType(Rdb_SubscribeType &type)
129 {
130     switch (type) {
131         case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD:
132             return SubscribeMode::CLOUD;
133         case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS:
134             return SubscribeMode::CLOUD_DETAIL;
135         case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS:
136             return SubscribeMode::LOCAL_DETAIL;
137         default:
138             return SubscribeMode::SUBSCRIBE_MODE_MAX;
139     }
140 }
141 
142 class MainOpenCallback : public OHOS::NativeRdb::RdbOpenCallback {
143 public:
144     int OnCreate(OHOS::NativeRdb::RdbStore &rdbStore) override;
145     int OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion) override;
146 };
147 
OnCreate(OHOS::NativeRdb::RdbStore & rdbStore)148 int MainOpenCallback::OnCreate(OHOS::NativeRdb::RdbStore &rdbStore)
149 {
150     return OH_Rdb_ErrCode::RDB_OK;
151 }
152 
OnUpgrade(OHOS::NativeRdb::RdbStore & rdbStore,int oldVersion,int newVersion)153 int MainOpenCallback::OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion)
154 {
155     return OH_Rdb_ErrCode::RDB_OK;
156 }
157 
GetRelationalStore(OH_Rdb_Store * store)158 RelationalStore *GetRelationalStore(OH_Rdb_Store *store)
159 {
160     if (store == nullptr || store->id != RDB_STORE_CID) {
161         LOG_ERROR("store is invalid. is null %{public}d", (store == nullptr));
162         return nullptr;
163     }
164     return static_cast<RelationalStore *>(store);
165 }
166 
OH_Rdb_GetOrOpen(const OH_Rdb_Config * config,int * errCode)167 OH_Rdb_Store *OH_Rdb_GetOrOpen(const OH_Rdb_Config *config, int *errCode)
168 {
169     if (config == nullptr || config->selfSize > RDB_CONFIG_SIZE_V1 || errCode == nullptr) {
170         LOG_ERROR("Parameters set error:config is NULL ? %{public}d and config size is %{public}zu or "
171                   "errCode is NULL ? %{public}d ",
172                   (config == nullptr), sizeof(OH_Rdb_Config), (errCode == nullptr));
173         return nullptr;
174     }
175 
176     std::string realPath =
177         OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, *errCode);
178     if (*errCode != 0) {
179         *errCode = ConvertorErrorCode::NativeToNdk(*errCode);
180         LOG_ERROR("Get database path failed, ret %{public}d ", *errCode);
181         return nullptr;
182     }
183     OHOS::NativeRdb::RdbStoreConfig rdbStoreConfig(realPath);
184     rdbStoreConfig.SetSecurityLevel(OHOS::NativeRdb::SecurityLevel(config->securityLevel));
185     rdbStoreConfig.SetEncryptStatus(config->isEncrypt);
186     if (config->selfSize > RDB_CONFIG_SIZE_V0) {
187         rdbStoreConfig.SetArea(config->area);
188     }
189     if (config->bundleName != nullptr) {
190         rdbStoreConfig.SetBundleName(config->bundleName);
191     }
192     rdbStoreConfig.SetName(config->storeName);
193 
194     MainOpenCallback callback;
195     std::shared_ptr<OHOS::NativeRdb::RdbStore> store =
196         OHOS::NativeRdb::RdbHelper::GetRdbStore(rdbStoreConfig, -1, callback, *errCode);
197     *errCode = ConvertorErrorCode::NativeToNdk(*errCode);
198     if (store == nullptr) {
199         LOG_ERROR("Get RDB Store fail %{public}s", realPath.c_str());
200         return nullptr;
201     }
202     return new (std::nothrow) RelationalStore(store);
203 }
204 
OH_Rdb_CloseStore(OH_Rdb_Store * store)205 int OH_Rdb_CloseStore(OH_Rdb_Store *store)
206 {
207     auto rdbStore = GetRelationalStore(store);
208     if (rdbStore == nullptr) {
209         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
210     }
211     delete rdbStore;
212     return OH_Rdb_ErrCode::RDB_OK;
213 }
214 
OH_Rdb_DeleteStore(const OH_Rdb_Config * config)215 int OH_Rdb_DeleteStore(const OH_Rdb_Config *config)
216 {
217     if (config == nullptr || config->dataBaseDir == nullptr || config->storeName == nullptr) {
218         LOG_ERROR("Parameters set error:path is NULL ? %{public}d", (config == nullptr));
219         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
220     }
221     int errCode = OHOS::NativeRdb::E_OK;
222     std::string realPath =
223         OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, errCode);
224     if (errCode != OHOS::NativeRdb::E_OK) {
225         return ConvertorErrorCode::NativeToNdk(errCode);
226     }
227     return ConvertorErrorCode::NativeToNdk(OHOS::NativeRdb::RdbHelper::DeleteRdbStore(realPath));
228 }
229 
OH_Rdb_Insert(OH_Rdb_Store * store,const char * table,OH_VBucket * valuesBucket)230 int OH_Rdb_Insert(OH_Rdb_Store *store, const char *table, OH_VBucket *valuesBucket)
231 {
232     auto rdbStore = GetRelationalStore(store);
233     auto bucket = RelationalValuesBucket::GetSelf(valuesBucket);
234     if (rdbStore == nullptr || table == nullptr || bucket == nullptr) {
235         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
236     }
237     int64_t rowId = -1;
238     rdbStore->GetStore()->Insert(rowId, table, bucket->Get());
239     return rowId >= 0 ? rowId : OH_Rdb_ErrCode::RDB_ERR;
240 }
241 
OH_Rdb_Update(OH_Rdb_Store * store,OH_VBucket * valueBucket,OH_Predicates * predicates)242 int OH_Rdb_Update(OH_Rdb_Store *store, OH_VBucket *valueBucket, OH_Predicates *predicates)
243 {
244     auto rdbStore = GetRelationalStore(store);
245     auto predicate = RelationalPredicate::GetSelf(predicates);
246     auto bucket = RelationalValuesBucket::GetSelf(valueBucket);
247     if (rdbStore == nullptr || predicate == nullptr || bucket == nullptr) {
248         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
249     }
250     int updatedRows = -1;
251     rdbStore->GetStore()->Update(updatedRows, bucket->Get(), predicate->Get());
252     return updatedRows >= 0 ? updatedRows : OH_Rdb_ErrCode::RDB_ERR;
253 }
254 
OH_Rdb_Delete(OH_Rdb_Store * store,OH_Predicates * predicates)255 int OH_Rdb_Delete(OH_Rdb_Store *store, OH_Predicates *predicates)
256 {
257     auto rdbStore = GetRelationalStore(store);
258     auto predicate = RelationalPredicate::GetSelf(predicates);
259     if (rdbStore == nullptr || predicate == nullptr) {
260         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
261     }
262     int deletedRows = -1;
263     rdbStore->GetStore()->Delete(deletedRows, predicate->Get());
264     return deletedRows >= 0 ? deletedRows : OH_Rdb_ErrCode::RDB_ERR;
265 }
266 
OH_Rdb_Query(OH_Rdb_Store * store,OH_Predicates * predicates,const char * const * columnNames,int length)267 OH_Cursor *OH_Rdb_Query(OH_Rdb_Store *store, OH_Predicates *predicates, const char *const *columnNames, int length)
268 {
269     auto rdbStore = GetRelationalStore(store);
270     auto predicate = RelationalPredicate::GetSelf(predicates);
271     if (rdbStore == nullptr || predicate == nullptr) {
272         return nullptr;
273     }
274     std::vector<std::string> columns;
275     if (columnNames != nullptr && length > 0) {
276         columns.reserve(length);
277         for (int i = 0; i < length; i++) {
278             columns.push_back(columnNames[i]);
279         }
280     }
281 
282     std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
283         rdbStore->GetStore()->QueryByStep(predicate->Get(), columns);
284     if (resultSet == nullptr) {
285         return nullptr;
286     }
287     return new (std::nothrow) RelationalCursor(std::move(resultSet));
288 }
289 
OH_Rdb_ExecuteQuery(OH_Rdb_Store * store,const char * sql)290 OH_Cursor *OH_Rdb_ExecuteQuery(OH_Rdb_Store *store, const char *sql)
291 {
292     auto rdbStore = GetRelationalStore(store);
293     if (rdbStore == nullptr || sql == nullptr) {
294         return nullptr;
295     }
296     std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
297         rdbStore->GetStore()->QuerySql(sql, std::vector<std::string>{});
298     if (resultSet == nullptr) {
299         return nullptr;
300     }
301     return new OHOS::RdbNdk::RelationalCursor(std::move(resultSet));
302 }
303 
OH_Rdb_Execute(OH_Rdb_Store * store,const char * sql)304 int OH_Rdb_Execute(OH_Rdb_Store *store, const char *sql)
305 {
306     auto rdbStore = GetRelationalStore(store);
307     if (rdbStore == nullptr || sql == nullptr) {
308         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
309     }
310     return ConvertorErrorCode::NativeToNdk(
311         rdbStore->GetStore()->ExecuteSql(sql, std::vector<OHOS::NativeRdb::ValueObject>{}));
312 }
313 
OH_Rdb_BeginTransaction(OH_Rdb_Store * store)314 int OH_Rdb_BeginTransaction(OH_Rdb_Store *store)
315 {
316     auto rdbStore = GetRelationalStore(store);
317     if (rdbStore == nullptr) {
318         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
319     }
320     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->BeginTransaction());
321 }
322 
OH_Rdb_RollBack(OH_Rdb_Store * store)323 int OH_Rdb_RollBack(OH_Rdb_Store *store)
324 {
325     auto rdbStore = GetRelationalStore(store);
326     if (rdbStore == nullptr) {
327         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
328     }
329     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->RollBack());
330 }
331 
OH_Rdb_Commit(OH_Rdb_Store * store)332 int OH_Rdb_Commit(OH_Rdb_Store *store)
333 {
334     auto rdbStore = GetRelationalStore(store);
335     if (rdbStore == nullptr) {
336         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
337     }
338     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Commit());
339 }
340 
OH_Rdb_Backup(OH_Rdb_Store * store,const char * databasePath)341 int OH_Rdb_Backup(OH_Rdb_Store *store, const char *databasePath)
342 {
343     auto rdbStore = GetRelationalStore(store);
344     if (rdbStore == nullptr || databasePath == nullptr) {
345         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
346     }
347     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Backup(databasePath));
348 }
349 
OH_Rdb_Restore(OH_Rdb_Store * store,const char * databasePath)350 int OH_Rdb_Restore(OH_Rdb_Store *store, const char *databasePath)
351 {
352     auto rdbStore = GetRelationalStore(store);
353     if (rdbStore == nullptr || databasePath == nullptr) {
354         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
355     }
356     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Restore(databasePath));
357 }
358 
OH_Rdb_GetVersion(OH_Rdb_Store * store,int * version)359 int OH_Rdb_GetVersion(OH_Rdb_Store *store, int *version)
360 {
361     auto rdbStore = GetRelationalStore(store);
362     if (rdbStore == nullptr || version == nullptr) {
363         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
364     }
365     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->GetVersion(*version));
366 }
367 
OH_Rdb_SetVersion(OH_Rdb_Store * store,int version)368 int OH_Rdb_SetVersion(OH_Rdb_Store *store, int version)
369 {
370     auto rdbStore = GetRelationalStore(store);
371     if (rdbStore == nullptr) {
372         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
373     }
374     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->SetVersion(version));
375 }
376 
Convert(const Rdb_DistributedConfig * config)377 static std::pair<int32_t, Rdb_DistributedConfig> Convert(const Rdb_DistributedConfig *config)
378 {
379     std::pair<int32_t, Rdb_DistributedConfig> result = { OH_Rdb_ErrCode::RDB_E_INVALID_ARGS, {} };
380     auto &[errCode, cfg] = result;
381     switch (config->version) {
382         case DISTRIBUTED_CONFIG_V0: {
383             const auto *realCfg = reinterpret_cast<const DistributedConfigV0 *>(config);
384             cfg.version = realCfg->version;
385             cfg.isAutoSync = realCfg->isAutoSync;
386             errCode = OH_Rdb_ErrCode::RDB_OK;
387             break;
388         }
389         default:
390             break;
391     }
392     return result;
393 }
394 
OH_Rdb_SetDistributedTables(OH_Rdb_Store * store,const char * tables[],uint32_t count,Rdb_DistributedType type,const Rdb_DistributedConfig * config)395 int OH_Rdb_SetDistributedTables(OH_Rdb_Store *store, const char *tables[], uint32_t count, Rdb_DistributedType type,
396                                 const Rdb_DistributedConfig *config)
397 {
398     auto rdbStore = GetRelationalStore(store);
399     if (rdbStore == nullptr || type != Rdb_DistributedType::RDB_DISTRIBUTED_CLOUD || (count > 0 && tables == nullptr)) {
400         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
401     }
402 
403     auto [errCode, cfg] = Convert(config);
404     if (errCode != OH_Rdb_ErrCode::RDB_OK) {
405         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
406     }
407     std::vector<std::string> tableNames;
408     tableNames.reserve(count);
409     for (uint32_t i = 0; i < count; i++) {
410         if (tables[i] == nullptr) {
411             return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
412         }
413         tableNames.emplace_back(tables[i]);
414     }
415     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->SetDistributedTables(tableNames,
416         DistributedTableType::DISTRIBUTED_CLOUD, { cfg.isAutoSync }));
417 }
418 
OH_Rdb_FindModifyTime(OH_Rdb_Store * store,const char * tableName,const char * columnName,OH_VObject * values)419 OH_Cursor *OH_Rdb_FindModifyTime(OH_Rdb_Store *store, const char *tableName, const char *columnName, OH_VObject *values)
420 {
421     auto rdbStore = GetRelationalStore(store);
422     auto selfObjects = RelationalPredicatesObjects::GetSelf(values);
423     if (rdbStore == nullptr || selfObjects == nullptr || tableName == nullptr) {
424         return nullptr;
425     }
426     std::vector<ValueObject> objects = selfObjects->Get();
427     std::vector<OHOS::NativeRdb::RdbStore::PRIKey> keys;
428     keys.reserve(objects.size());
429     for (auto &object : objects) {
430         OHOS::NativeRdb::RdbStore::PRIKey priKey;
431         OHOS::NativeRdb::RawDataParser::Convert(std::move(object.value), priKey);
432         keys.push_back(std::move(priKey));
433     }
434     auto results = rdbStore->GetStore()->GetModifyTime(tableName, columnName, keys);
435     return new (std::nothrow) ModifyTimeCursor(std::move(results));
436 }
437 
OH_Rdb_Subscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)438 int OH_Rdb_Subscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
439 {
440     auto rdbStore = GetRelationalStore(store);
441     if (rdbStore == nullptr || observer == nullptr) {
442         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
443     }
444     return rdbStore->DoSubScribe(type, observer);
445 }
446 
OH_Rdb_Unsubscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)447 int OH_Rdb_Unsubscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
448 {
449     auto rdbStore = GetRelationalStore(store);
450     if (rdbStore == nullptr) {
451         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
452     }
453     return rdbStore->DoUnsubScribe(type, observer);
454 }
455 
DoSubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)456 int RelationalStore::DoSubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
457 {
458     if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS ||
459         observer == nullptr || observer->callback.briefObserver == nullptr ||
460         observer->callback.detailsObserver == nullptr) {
461         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
462     }
463 
464     std::lock_guard<decltype(mutex_)> lock(mutex_);
465     auto result = std::any_of(dataObservers_[type].begin(), dataObservers_[type].end(),
466                               [observer](const std::shared_ptr<NDKStoreObserver> &item) {
467                                   return *item.get() == observer;
468                               });
469     if (result) {
470         LOG_INFO("duplicate subscribe.");
471         return OH_Rdb_ErrCode::RDB_OK;
472     }
473     auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
474     auto ndkObserver = std::make_shared<NDKStoreObserver>(observer, type);
475     int subscribeResult = (type == RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) ?
476         store_->SubscribeObserver(subscribeOption, ndkObserver) : store_->Subscribe(subscribeOption, ndkObserver.get());
477     if (subscribeResult != OHOS::NativeRdb::E_OK) {
478         LOG_ERROR("subscribe failed.");
479     } else {
480         dataObservers_[type].emplace_back(std::move(ndkObserver));
481     }
482     return ConvertorErrorCode::NativeToNdk(subscribeResult);
483 }
484 
DoUnsubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)485 int RelationalStore::DoUnsubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
486 {
487     if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) {
488         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
489     }
490     std::lock_guard<decltype(mutex_)> lock(mutex_);
491     for (auto it = dataObservers_[type].begin(); it != dataObservers_[type].end();) {
492         if (observer != nullptr && !(**it == observer)) {
493             ++it;
494             continue;
495         }
496         auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
497         int errCode = (type == RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) ?
498             store_->UnsubscribeObserver(subscribeOption, *it) : store_->UnSubscribe(subscribeOption, it->get());
499         if (errCode != NativeRdb::E_OK) {
500             LOG_ERROR("unsubscribe failed.");
501             return ConvertorErrorCode::NativeToNdk(errCode);
502         }
503         it = dataObservers_[type].erase(it);
504         LOG_DEBUG("data observer unsubscribe success.");
505     }
506     return OH_Rdb_ErrCode::RDB_OK;
507 }
508 
509 namespace {
510 struct RelationalProgressDetails : public Rdb_ProgressDetails {
511     Rdb_TableDetails *details_ = nullptr;
512     explicit RelationalProgressDetails(const ProgressDetail &detail);
513     ~RelationalProgressDetails();
514 
515     Rdb_TableDetails *GetTableDetails(int paraVersion);
516     void DestroyTableDetails();
517 
518 private:
519     uint8_t *ResizeBuff(size_t size);
520 
521     TableDetails tableDetails_;
522     size_t size_ = 0;
523     uint8_t *buffer_ = nullptr;
524 };
525 
DestroyTableDetails()526 void RelationalProgressDetails::DestroyTableDetails()
527 {
528     delete[] details_;
529     details_ = nullptr;
530 }
531 
RelationalProgressDetails(const ProgressDetail & detail)532 RelationalProgressDetails::RelationalProgressDetails(const ProgressDetail &detail)
533 {
534     version = DISTRIBUTED_PROGRESS_DETAIL_VERSION;
535     schedule = detail.progress;
536     code = detail.code;
537     tableLength = (int32_t)detail.details.size();
538     tableDetails_ = detail.details;
539 }
540 
~RelationalProgressDetails()541 RelationalProgressDetails::~RelationalProgressDetails()
542 {
543     if (buffer_ != nullptr) {
544         free(buffer_);
545     }
546     buffer_ = nullptr;
547 }
548 
GetTableDetails(int paraVersion)549 Rdb_TableDetails *RelationalProgressDetails::GetTableDetails(int paraVersion)
550 {
551     switch (paraVersion) {
552         case TABLE_DETAIL_V0: {
553             auto length = sizeof(TableDetailsV0) * (tableLength + 1);
554             auto *detailsV0 = (TableDetailsV0 *)ResizeBuff(length);
555             if (detailsV0 == nullptr) {
556                 return nullptr;
557             }
558             (void)memset_s(detailsV0, length, 0, length);
559             int index = 0;
560             for (const auto &pair : tableDetails_) {
561                 detailsV0[index].table = pair.first.c_str();
562                 detailsV0[index].upload = StatisticV0{
563                     .total = (int)pair.second.upload.total,
564                     .successful = (int)pair.second.upload.success,
565                     .failed = (int)pair.second.upload.failed,
566                     .remained = (int)pair.second.upload.untreated,
567                 };
568                 detailsV0[index].download = StatisticV0{
569                     .total = (int)pair.second.download.total,
570                     .successful = (int)pair.second.download.success,
571                     .failed = (int)pair.second.download.failed,
572                     .remained = (int)pair.second.download.untreated,
573                 };
574                 index++;
575             }
576             return reinterpret_cast<Rdb_TableDetails *>(reinterpret_cast<uint8_t *>(detailsV0));
577         }
578         default:
579             return nullptr;
580     }
581 }
582 
ResizeBuff(size_t size)583 uint8_t *RelationalProgressDetails::ResizeBuff(size_t size)
584 {
585     if (size_ >= size) {
586         return buffer_;
587     }
588     if (buffer_ != nullptr) {
589         free(buffer_);
590     }
591     buffer_ = (uint8_t *)malloc(size);
592     return buffer_;
593 }
594 } // namespace
595 
GetDetails(Rdb_ProgressDetails * progress)596 static std::pair<int, RelationalProgressDetails *> GetDetails(Rdb_ProgressDetails *progress)
597 {
598     if (progress->version != DISTRIBUTED_PROGRESS_DETAIL_VERSION) {
599         return { -1, nullptr };
600     }
601     return { 0, (RelationalProgressDetails *)progress };
602 }
603 
OH_Rdb_GetTableDetails(Rdb_ProgressDetails * progress,int32_t version)604 Rdb_TableDetails *OH_Rdb_GetTableDetails(Rdb_ProgressDetails *progress, int32_t version)
605 {
606     auto [errCode, details] = GetDetails(progress);
607     if (errCode == -1 || details == nullptr) {
608         return nullptr;
609     }
610     return details->GetTableDetails(version);
611 }
612 
OH_Rdb_CloudSync(OH_Rdb_Store * store,Rdb_SyncMode mode,const char * tables[],uint32_t count,const Rdb_ProgressObserver * observer)613 int OH_Rdb_CloudSync(OH_Rdb_Store *store, Rdb_SyncMode mode, const char *tables[], uint32_t count,
614                      const Rdb_ProgressObserver *observer)
615 {
616     auto rdbStore = GetRelationalStore(store);
617     if (rdbStore == nullptr || mode < RDB_SYNC_MODE_TIME_FIRST || mode > RDB_SYNC_MODE_CLOUD_FIRST ||
618         observer == nullptr || observer->callback == nullptr || (count > 0 && tables == nullptr)) {
619         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
620     }
621     SyncOption syncOption{ .mode = NDKUtils::TransformMode(mode), .isBlock = false };
622     std::vector<std::string> tableNames;
623     for (uint32_t i = 0; i < count; ++i) {
624         if (tables[i] == nullptr) {
625             return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
626         }
627         tableNames.emplace_back(tables[i]);
628     }
629 
630     auto progressCallback = [cxt = (*observer).context, cb = (*observer).callback](Details &&details) {
631         if (details.size() > 1) {
632             LOG_ERROR("Not support edge to edge detail notify.");
633             return;
634         }
635         if (details.empty()) {
636             LOG_ERROR("No device or cloud synced.");
637             return;
638         }
639         for (auto &[device, detail] : details) {
640             RelationalProgressDetails cloudDetail(detail);
641             cb(cxt, &cloudDetail);
642             break;
643         }
644     };
645     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Sync(syncOption, tableNames, progressCallback));
646 }
647 
OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)648 int OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
649 {
650     auto rdbStore = GetRelationalStore(store);
651     if (rdbStore == nullptr || callback == nullptr) {
652         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
653     }
654     return ConvertorErrorCode::NativeToNdk(rdbStore->SubscribeAutoSyncProgress(callback));
655 }
656 
OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)657 int OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
658 {
659     auto rdbStore = GetRelationalStore(store);
660     if (rdbStore == nullptr) {
661         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
662     }
663     return ConvertorErrorCode::NativeToNdk(rdbStore->UnsubscribeAutoSyncProgress(callback));
664 }
665 
OH_Rdb_LockRow(OH_Rdb_Store * store,OH_Predicates * predicates)666 int OH_Rdb_LockRow(OH_Rdb_Store *store, OH_Predicates *predicates)
667 {
668     auto rdbStore = GetRelationalStore(store);
669     auto predicate = RelationalPredicate::GetSelf(predicates);
670     if (rdbStore == nullptr || predicate == nullptr) {
671         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
672     }
673     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->ModifyLockStatus(predicate->Get(), true));
674 }
675 
OH_Rdb_UnlockRow(OH_Rdb_Store * store,OH_Predicates * predicates)676 int OH_Rdb_UnlockRow(OH_Rdb_Store *store, OH_Predicates *predicates)
677 {
678     auto rdbStore = GetRelationalStore(store);
679     auto predicate = RelationalPredicate::GetSelf(predicates);
680     if (rdbStore == nullptr || predicate == nullptr) {
681         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
682     }
683     return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->ModifyLockStatus(predicate->Get(), false));
684 }
685 
OH_Rdb_QueryLockedRow(OH_Rdb_Store * store,OH_Predicates * predicates,const char * const * columnNames,int length)686 OH_Cursor *OH_Rdb_QueryLockedRow(
687     OH_Rdb_Store *store, OH_Predicates *predicates, const char *const *columnNames, int length)
688 {
689     auto rdbStore = GetRelationalStore(store);
690     auto predicate = RelationalPredicate::GetSelf(predicates);
691     if (rdbStore == nullptr || predicate == nullptr) {
692         LOG_ERROR("rdbStore or predicate is nullptr.");
693         return nullptr;
694     }
695     std::vector<std::string> columns;
696     if (columnNames != nullptr && length > 0) {
697         columns.reserve(length);
698         for (int i = 0; i < length; i++) {
699             columns.push_back(columnNames[i]);
700         }
701     }
702     predicate->Get().BeginWrap();
703     predicate->Get().EqualTo(OHOS::NativeRdb::AbsRdbPredicates::LOCK_STATUS, OHOS::NativeRdb::AbsRdbPredicates::LOCKED);
704     predicate->Get().Or();
705     predicate->Get().EqualTo(
706         OHOS::NativeRdb::AbsRdbPredicates::LOCK_STATUS, OHOS::NativeRdb::AbsRdbPredicates::LOCK_CHANGED);
707     predicate->Get().EndWrap();
708     std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
709         rdbStore->GetStore()->QueryByStep(predicate->Get(), columns);
710     if (resultSet == nullptr) {
711         return nullptr;
712     }
713     return new OHOS::RdbNdk::RelationalCursor(std::move(resultSet));
714 }
715 
NDKDetailProgressObserver(const Rdb_ProgressObserver * callback)716 NDKDetailProgressObserver::NDKDetailProgressObserver(const Rdb_ProgressObserver *callback):callback_(callback)
717 {
718 }
719 
ProgressNotification(const Details & details)720 void NDKDetailProgressObserver::ProgressNotification(const Details &details)
721 {
722     if (callback_ == nullptr || details.empty()) {
723         return;
724     }
725     RelationalProgressDetails progressDetails = RelationalProgressDetails(details.begin()->second);
726     (*(callback_->callback))(callback_->context, &progressDetails);
727     progressDetails.DestroyTableDetails();
728 }
729 
operator ==(const Rdb_ProgressObserver * callback)730 bool NDKDetailProgressObserver::operator==(const Rdb_ProgressObserver *callback)
731 {
732     return callback == callback_;
733 }
734 
NDKStoreObserver(const Rdb_DataObserver * observer,int mode)735 NDKStoreObserver::NDKStoreObserver(const Rdb_DataObserver *observer, int mode) : mode_(mode), observer_(observer) {}
736 
OnChange(const std::vector<std::string> & devices)737 void NDKStoreObserver::OnChange(const std::vector<std::string> &devices)
738 {
739     if (mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD) {
740         auto count = devices.size();
741         std::unique_ptr<const char *[]> deviceIds = std::make_unique<const char *[]>(count);
742         for (uint32_t i = 0; i < count; ++i) {
743             deviceIds[i] = devices[i].c_str();
744         }
745         (*observer_->callback.briefObserver)(observer_->context, deviceIds.get(), count);
746     }
747 }
748 
GetKeyInfoSize(RdbStoreObserver::ChangeInfo && changeInfo)749 size_t NDKStoreObserver::GetKeyInfoSize(RdbStoreObserver::ChangeInfo &&changeInfo)
750 {
751     size_t size = 0;
752     for (auto it = changeInfo.begin(); it != changeInfo.end(); ++it) {
753         size += it->second[RdbStoreObserver::CHG_TYPE_INSERT].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
754         size += it->second[RdbStoreObserver::CHG_TYPE_UPDATE].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
755         size += it->second[RdbStoreObserver::CHG_TYPE_DELETE].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
756     }
757     return size;
758 }
759 
GetKeyDataType(std::vector<RdbStoreObserver::PrimaryKey> & primaryKey)760 int32_t NDKStoreObserver::GetKeyDataType(std::vector<RdbStoreObserver::PrimaryKey> &primaryKey)
761 {
762     if (primaryKey.size() == 0) {
763         return OH_ColumnType::TYPE_NULL;
764     }
765     if (std::holds_alternative<int64_t>(primaryKey[0]) || std::holds_alternative<double>(primaryKey[0])) {
766         return OH_ColumnType::TYPE_INT64;
767     }
768     if (std::holds_alternative<std::string>(primaryKey[0])) {
769         return OH_ColumnType::TYPE_TEXT;
770     }
771     return OH_ColumnType::TYPE_NULL;
772 }
773 
OnChange(const Origin & origin,const RdbStoreObserver::PrimaryFields & fields,RdbStoreObserver::ChangeInfo && changeInfo)774 void NDKStoreObserver::OnChange(const Origin &origin, const RdbStoreObserver::PrimaryFields &fields,
775                                 RdbStoreObserver::ChangeInfo &&changeInfo)
776 {
777     uint32_t count = changeInfo.size();
778     if (count == 0) {
779         LOG_ERROR("No any infos.");
780         return;
781     }
782 
783     if (mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS ||
784         mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) {
785         size_t size = count * (sizeof(Rdb_ChangeInfo *) + sizeof(Rdb_ChangeInfo)) +
786             GetKeyInfoSize(std::forward<RdbStoreObserver::ChangeInfo &&>(changeInfo));
787         std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(size);
788         Rdb_ChangeInfo **infos = (Rdb_ChangeInfo **)(buffer.get());
789         if (infos == nullptr) {
790             LOG_ERROR("Failed to allocate memory for Rdb_ChangeInfo.");
791             return;
792         }
793 
794         Rdb_ChangeInfo *details = (Rdb_ChangeInfo *)(infos + count);
795         Rdb_KeyInfo::Rdb_KeyData *data = (Rdb_KeyInfo::Rdb_KeyData *)(details + count);
796 
797         int index = 0;
798         for (auto it = changeInfo.begin(); it != changeInfo.end(); ++it) {
799             infos[index] = &details[index];
800             infos[index]->version = DISTRIBUTED_CHANGE_INFO_VERSION;
801             infos[index]->tableName = it->first.c_str();
802             infos[index]->ChangeType = origin.dataType;
803             infos[index]->inserted.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_INSERT].size());
804             infos[index]->inserted.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_INSERT]);
805             infos[index]->updated.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_UPDATE].size());
806             infos[index]->updated.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_UPDATE]);
807             infos[index]->deleted.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_DELETE].size());
808             infos[index]->deleted.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_DELETE]);
809             ConvertKeyInfoData(data, it->second[RdbStoreObserver::CHG_TYPE_INSERT]);
810             infos[index]->inserted.data = data;
811             ConvertKeyInfoData(data+infos[index]->inserted.count, it->second[RdbStoreObserver::CHG_TYPE_UPDATE]);
812             infos[index]->updated.data = data+infos[index]->inserted.count;
813             ConvertKeyInfoData(data+infos[index]->inserted.count+infos[index]->updated.count,
814                 it->second[RdbStoreObserver::CHG_TYPE_DELETE]);
815             infos[index]->deleted.data = data+infos[index]->inserted.count+infos[index]->updated.count;
816             index++;
817         }
818 
819         (*observer_->callback.detailsObserver)(observer_->context, const_cast<const Rdb_ChangeInfo**>(infos), count);
820     }
821 }
822 
OnChange()823 void NDKStoreObserver::OnChange()
824 {
825     RdbStoreObserver::OnChange();
826 }
827 
ConvertKeyInfoData(Rdb_KeyInfo::Rdb_KeyData * keyInfoData,std::vector<RdbStoreObserver::PrimaryKey> & primaryKey)828 void NDKStoreObserver::ConvertKeyInfoData(Rdb_KeyInfo::Rdb_KeyData *keyInfoData,
829     std::vector<RdbStoreObserver::PrimaryKey> &primaryKey)
830 {
831     if (keyInfoData == nullptr || primaryKey.empty()) {
832         LOG_WARN("no data, keyInfoData is nullptr:%{public}d", keyInfoData == nullptr);
833         return;
834     }
835 
836     for (size_t i = 0; i < primaryKey.size(); ++i) {
837         const auto &key = primaryKey[i];
838         if (auto val = std::get_if<double>(&key)) {
839             keyInfoData[i].real = *val;
840         } else if (auto val = std::get_if<int64_t>(&key)) {
841             keyInfoData[i].integer = *val;
842         } else if (auto val = std::get_if<std::string>(&key)) {
843             keyInfoData[i].text = val->c_str();
844         } else {
845             LOG_ERROR("Not support the data type.");
846             return;
847         }
848     }
849 }
850 
operator ==(const Rdb_DataObserver * other)851 bool NDKStoreObserver::operator==(const Rdb_DataObserver *other)
852 {
853     if (other == nullptr) {
854         return false;
855     }
856     return other->context == observer_->context && &(other->callback) == &(observer_->callback);
857 }