1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RelationalStore"
16 #include "relational_store.h"
17
18 #include "logger.h"
19 #include "modify_time_cursor.h"
20 #include "raw_data_parser.h"
21 #include "rdb_errno.h"
22 #include "rdb_helper.h"
23 #include "rdb_predicates.h"
24 #include "rdb_sql_utils.h"
25 #include "relational_cursor.h"
26 #include "relational_predicates.h"
27 #include "relational_predicates_objects.h"
28 #include "relational_store_error_code.h"
29 #include "relational_store_impl.h"
30 #include "relational_types_v0.h"
31 #include "relational_values_bucket.h"
32 #include "securec.h"
33 #include "sqlite_global_config.h"
34 #include "convertor_error_code.h"
35
36 using namespace OHOS::RdbNdk;
37 using namespace OHOS::DistributedRdb;
38 constexpr int RDB_STORE_CID = 1234560; // The class id used to uniquely identify the OH_Rdb_Store class.
39 constexpr int RDB_CONFIG_SIZE_V0 = 41;
40 constexpr int RDB_CONFIG_SIZE_V1 = 45;
OH_Rdb_CreateValueObject()41 OH_VObject *OH_Rdb_CreateValueObject()
42 {
43 return new (std::nothrow) RelationalPredicatesObjects();
44 }
45
OH_Rdb_CreateValuesBucket()46 OH_VBucket *OH_Rdb_CreateValuesBucket()
47 {
48 return new (std::nothrow) RelationalValuesBucket();
49 }
50
OH_Rdb_CreatePredicates(const char * table)51 OH_Predicates *OH_Rdb_CreatePredicates(const char *table)
52 {
53 if (table == nullptr) {
54 return nullptr;
55 }
56 return new (std::nothrow) RelationalPredicate(table);
57 }
58
RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store)59 OHOS::RdbNdk::RelationalStore::RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store) : store_(store)
60 {
61 id = RDB_STORE_CID;
62 }
63
SubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)64 int RelationalStore::SubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
65 {
66 std::lock_guard<decltype(mutex_)> lock(mutex_) ;
67 bool result = std::any_of(callbacks_.begin(), callbacks_.end(), [callback](const auto &observer) {
68 return *observer == callback;
69 });
70 if (result) {
71 LOG_INFO("duplicate subscribe.");
72 return OH_Rdb_ErrCode::RDB_OK;
73 }
74 auto obs = std::make_shared<NDKDetailProgressObserver>(callback);
75 int errCode = store_->RegisterAutoSyncCallback(obs);
76 if (errCode == NativeRdb::E_OK) {
77 LOG_ERROR("subscribe failed.");
78 return ConvertorErrorCode::NativeToNdk(errCode);
79 }
80 callbacks_.push_back(std::move(obs));
81 return OH_Rdb_ErrCode::RDB_OK;
82 }
83
UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)84 int RelationalStore::UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
85 {
86 std::lock_guard<decltype(mutex_)> lock(mutex_) ;
87 for (auto it = callbacks_.begin(); it != callbacks_.end();) {
88 if (callback != nullptr && !(**it == callback)) {
89 ++it;
90 continue;
91 }
92
93 int errCode = store_->UnregisterAutoSyncCallback(*it);
94 if (errCode != NativeRdb::E_OK) {
95 LOG_ERROR("unsubscribe failed.");
96 return ConvertorErrorCode::NativeToNdk(errCode);
97 }
98 it = callbacks_.erase(it);
99 LOG_DEBUG("progress unsubscribe success.");
100 }
101 return OH_Rdb_ErrCode::RDB_OK;
102 }
103
~RelationalStore()104 RelationalStore::~RelationalStore()
105 {
106 if (store_ == nullptr || callbacks_.empty()) {
107 return;
108 }
109 for (auto &callback : callbacks_) {
110 store_->UnregisterAutoSyncCallback(callback);
111 }
112 }
113
TransformMode(Rdb_SyncMode & mode)114 SyncMode NDKUtils::TransformMode(Rdb_SyncMode &mode)
115 {
116 switch (mode) {
117 case RDB_SYNC_MODE_TIME_FIRST:
118 return TIME_FIRST;
119 case RDB_SYNC_MODE_NATIVE_FIRST:
120 return NATIVE_FIRST;
121 case RDB_SYNC_MODE_CLOUD_FIRST:
122 return CLOUD_FIRST;
123 default:
124 return static_cast<SyncMode>(-1);
125 }
126 }
127
GetSubscribeType(Rdb_SubscribeType & type)128 OHOS::DistributedRdb::SubscribeMode NDKUtils::GetSubscribeType(Rdb_SubscribeType &type)
129 {
130 switch (type) {
131 case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD:
132 return SubscribeMode::CLOUD;
133 case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS:
134 return SubscribeMode::CLOUD_DETAIL;
135 case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS:
136 return SubscribeMode::LOCAL_DETAIL;
137 default:
138 return SubscribeMode::SUBSCRIBE_MODE_MAX;
139 }
140 }
141
142 class MainOpenCallback : public OHOS::NativeRdb::RdbOpenCallback {
143 public:
144 int OnCreate(OHOS::NativeRdb::RdbStore &rdbStore) override;
145 int OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion) override;
146 };
147
OnCreate(OHOS::NativeRdb::RdbStore & rdbStore)148 int MainOpenCallback::OnCreate(OHOS::NativeRdb::RdbStore &rdbStore)
149 {
150 return OH_Rdb_ErrCode::RDB_OK;
151 }
152
OnUpgrade(OHOS::NativeRdb::RdbStore & rdbStore,int oldVersion,int newVersion)153 int MainOpenCallback::OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion)
154 {
155 return OH_Rdb_ErrCode::RDB_OK;
156 }
157
GetRelationalStore(OH_Rdb_Store * store)158 RelationalStore *GetRelationalStore(OH_Rdb_Store *store)
159 {
160 if (store == nullptr || store->id != RDB_STORE_CID) {
161 LOG_ERROR("store is invalid. is null %{public}d", (store == nullptr));
162 return nullptr;
163 }
164 return static_cast<RelationalStore *>(store);
165 }
166
OH_Rdb_GetOrOpen(const OH_Rdb_Config * config,int * errCode)167 OH_Rdb_Store *OH_Rdb_GetOrOpen(const OH_Rdb_Config *config, int *errCode)
168 {
169 if (config == nullptr || config->selfSize > RDB_CONFIG_SIZE_V1 || errCode == nullptr) {
170 LOG_ERROR("Parameters set error:config is NULL ? %{public}d and config size is %{public}zu or "
171 "errCode is NULL ? %{public}d ",
172 (config == nullptr), sizeof(OH_Rdb_Config), (errCode == nullptr));
173 return nullptr;
174 }
175
176 std::string realPath =
177 OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, *errCode);
178 if (*errCode != 0) {
179 *errCode = ConvertorErrorCode::NativeToNdk(*errCode);
180 LOG_ERROR("Get database path failed, ret %{public}d ", *errCode);
181 return nullptr;
182 }
183 OHOS::NativeRdb::RdbStoreConfig rdbStoreConfig(realPath);
184 rdbStoreConfig.SetSecurityLevel(OHOS::NativeRdb::SecurityLevel(config->securityLevel));
185 rdbStoreConfig.SetEncryptStatus(config->isEncrypt);
186 if (config->selfSize > RDB_CONFIG_SIZE_V0) {
187 rdbStoreConfig.SetArea(config->area);
188 }
189 if (config->bundleName != nullptr) {
190 rdbStoreConfig.SetBundleName(config->bundleName);
191 }
192 rdbStoreConfig.SetName(config->storeName);
193
194 MainOpenCallback callback;
195 std::shared_ptr<OHOS::NativeRdb::RdbStore> store =
196 OHOS::NativeRdb::RdbHelper::GetRdbStore(rdbStoreConfig, -1, callback, *errCode);
197 *errCode = ConvertorErrorCode::NativeToNdk(*errCode);
198 if (store == nullptr) {
199 LOG_ERROR("Get RDB Store fail %{public}s", realPath.c_str());
200 return nullptr;
201 }
202 return new (std::nothrow) RelationalStore(store);
203 }
204
OH_Rdb_CloseStore(OH_Rdb_Store * store)205 int OH_Rdb_CloseStore(OH_Rdb_Store *store)
206 {
207 auto rdbStore = GetRelationalStore(store);
208 if (rdbStore == nullptr) {
209 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
210 }
211 delete rdbStore;
212 return OH_Rdb_ErrCode::RDB_OK;
213 }
214
OH_Rdb_DeleteStore(const OH_Rdb_Config * config)215 int OH_Rdb_DeleteStore(const OH_Rdb_Config *config)
216 {
217 if (config == nullptr || config->dataBaseDir == nullptr || config->storeName == nullptr) {
218 LOG_ERROR("Parameters set error:path is NULL ? %{public}d", (config == nullptr));
219 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
220 }
221 int errCode = OHOS::NativeRdb::E_OK;
222 std::string realPath =
223 OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, errCode);
224 if (errCode != OHOS::NativeRdb::E_OK) {
225 return ConvertorErrorCode::NativeToNdk(errCode);
226 }
227 return ConvertorErrorCode::NativeToNdk(OHOS::NativeRdb::RdbHelper::DeleteRdbStore(realPath));
228 }
229
OH_Rdb_Insert(OH_Rdb_Store * store,const char * table,OH_VBucket * valuesBucket)230 int OH_Rdb_Insert(OH_Rdb_Store *store, const char *table, OH_VBucket *valuesBucket)
231 {
232 auto rdbStore = GetRelationalStore(store);
233 auto bucket = RelationalValuesBucket::GetSelf(valuesBucket);
234 if (rdbStore == nullptr || table == nullptr || bucket == nullptr) {
235 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
236 }
237 int64_t rowId = -1;
238 rdbStore->GetStore()->Insert(rowId, table, bucket->Get());
239 return rowId >= 0 ? rowId : OH_Rdb_ErrCode::RDB_ERR;
240 }
241
OH_Rdb_Update(OH_Rdb_Store * store,OH_VBucket * valueBucket,OH_Predicates * predicates)242 int OH_Rdb_Update(OH_Rdb_Store *store, OH_VBucket *valueBucket, OH_Predicates *predicates)
243 {
244 auto rdbStore = GetRelationalStore(store);
245 auto predicate = RelationalPredicate::GetSelf(predicates);
246 auto bucket = RelationalValuesBucket::GetSelf(valueBucket);
247 if (rdbStore == nullptr || predicate == nullptr || bucket == nullptr) {
248 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
249 }
250 int updatedRows = -1;
251 rdbStore->GetStore()->Update(updatedRows, bucket->Get(), predicate->Get());
252 return updatedRows >= 0 ? updatedRows : OH_Rdb_ErrCode::RDB_ERR;
253 }
254
OH_Rdb_Delete(OH_Rdb_Store * store,OH_Predicates * predicates)255 int OH_Rdb_Delete(OH_Rdb_Store *store, OH_Predicates *predicates)
256 {
257 auto rdbStore = GetRelationalStore(store);
258 auto predicate = RelationalPredicate::GetSelf(predicates);
259 if (rdbStore == nullptr || predicate == nullptr) {
260 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
261 }
262 int deletedRows = -1;
263 rdbStore->GetStore()->Delete(deletedRows, predicate->Get());
264 return deletedRows >= 0 ? deletedRows : OH_Rdb_ErrCode::RDB_ERR;
265 }
266
OH_Rdb_Query(OH_Rdb_Store * store,OH_Predicates * predicates,const char * const * columnNames,int length)267 OH_Cursor *OH_Rdb_Query(OH_Rdb_Store *store, OH_Predicates *predicates, const char *const *columnNames, int length)
268 {
269 auto rdbStore = GetRelationalStore(store);
270 auto predicate = RelationalPredicate::GetSelf(predicates);
271 if (rdbStore == nullptr || predicate == nullptr) {
272 return nullptr;
273 }
274 std::vector<std::string> columns;
275 if (columnNames != nullptr && length > 0) {
276 columns.reserve(length);
277 for (int i = 0; i < length; i++) {
278 columns.push_back(columnNames[i]);
279 }
280 }
281
282 std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
283 rdbStore->GetStore()->QueryByStep(predicate->Get(), columns);
284 if (resultSet == nullptr) {
285 return nullptr;
286 }
287 return new (std::nothrow) RelationalCursor(std::move(resultSet));
288 }
289
OH_Rdb_ExecuteQuery(OH_Rdb_Store * store,const char * sql)290 OH_Cursor *OH_Rdb_ExecuteQuery(OH_Rdb_Store *store, const char *sql)
291 {
292 auto rdbStore = GetRelationalStore(store);
293 if (rdbStore == nullptr || sql == nullptr) {
294 return nullptr;
295 }
296 std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
297 rdbStore->GetStore()->QuerySql(sql, std::vector<std::string>{});
298 if (resultSet == nullptr) {
299 return nullptr;
300 }
301 return new OHOS::RdbNdk::RelationalCursor(std::move(resultSet));
302 }
303
OH_Rdb_Execute(OH_Rdb_Store * store,const char * sql)304 int OH_Rdb_Execute(OH_Rdb_Store *store, const char *sql)
305 {
306 auto rdbStore = GetRelationalStore(store);
307 if (rdbStore == nullptr || sql == nullptr) {
308 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
309 }
310 return ConvertorErrorCode::NativeToNdk(
311 rdbStore->GetStore()->ExecuteSql(sql, std::vector<OHOS::NativeRdb::ValueObject>{}));
312 }
313
OH_Rdb_BeginTransaction(OH_Rdb_Store * store)314 int OH_Rdb_BeginTransaction(OH_Rdb_Store *store)
315 {
316 auto rdbStore = GetRelationalStore(store);
317 if (rdbStore == nullptr) {
318 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
319 }
320 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->BeginTransaction());
321 }
322
OH_Rdb_RollBack(OH_Rdb_Store * store)323 int OH_Rdb_RollBack(OH_Rdb_Store *store)
324 {
325 auto rdbStore = GetRelationalStore(store);
326 if (rdbStore == nullptr) {
327 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
328 }
329 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->RollBack());
330 }
331
OH_Rdb_Commit(OH_Rdb_Store * store)332 int OH_Rdb_Commit(OH_Rdb_Store *store)
333 {
334 auto rdbStore = GetRelationalStore(store);
335 if (rdbStore == nullptr) {
336 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
337 }
338 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Commit());
339 }
340
OH_Rdb_Backup(OH_Rdb_Store * store,const char * databasePath)341 int OH_Rdb_Backup(OH_Rdb_Store *store, const char *databasePath)
342 {
343 auto rdbStore = GetRelationalStore(store);
344 if (rdbStore == nullptr || databasePath == nullptr) {
345 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
346 }
347 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Backup(databasePath));
348 }
349
OH_Rdb_Restore(OH_Rdb_Store * store,const char * databasePath)350 int OH_Rdb_Restore(OH_Rdb_Store *store, const char *databasePath)
351 {
352 auto rdbStore = GetRelationalStore(store);
353 if (rdbStore == nullptr || databasePath == nullptr) {
354 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
355 }
356 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Restore(databasePath));
357 }
358
OH_Rdb_GetVersion(OH_Rdb_Store * store,int * version)359 int OH_Rdb_GetVersion(OH_Rdb_Store *store, int *version)
360 {
361 auto rdbStore = GetRelationalStore(store);
362 if (rdbStore == nullptr || version == nullptr) {
363 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
364 }
365 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->GetVersion(*version));
366 }
367
OH_Rdb_SetVersion(OH_Rdb_Store * store,int version)368 int OH_Rdb_SetVersion(OH_Rdb_Store *store, int version)
369 {
370 auto rdbStore = GetRelationalStore(store);
371 if (rdbStore == nullptr) {
372 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
373 }
374 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->SetVersion(version));
375 }
376
Convert(const Rdb_DistributedConfig * config)377 static std::pair<int32_t, Rdb_DistributedConfig> Convert(const Rdb_DistributedConfig *config)
378 {
379 std::pair<int32_t, Rdb_DistributedConfig> result = { OH_Rdb_ErrCode::RDB_E_INVALID_ARGS, {} };
380 auto &[errCode, cfg] = result;
381 switch (config->version) {
382 case DISTRIBUTED_CONFIG_V0: {
383 const auto *realCfg = reinterpret_cast<const DistributedConfigV0 *>(config);
384 cfg.version = realCfg->version;
385 cfg.isAutoSync = realCfg->isAutoSync;
386 errCode = OH_Rdb_ErrCode::RDB_OK;
387 break;
388 }
389 default:
390 break;
391 }
392 return result;
393 }
394
OH_Rdb_SetDistributedTables(OH_Rdb_Store * store,const char * tables[],uint32_t count,Rdb_DistributedType type,const Rdb_DistributedConfig * config)395 int OH_Rdb_SetDistributedTables(OH_Rdb_Store *store, const char *tables[], uint32_t count, Rdb_DistributedType type,
396 const Rdb_DistributedConfig *config)
397 {
398 auto rdbStore = GetRelationalStore(store);
399 if (rdbStore == nullptr || type != Rdb_DistributedType::RDB_DISTRIBUTED_CLOUD || (count > 0 && tables == nullptr)) {
400 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
401 }
402
403 auto [errCode, cfg] = Convert(config);
404 if (errCode != OH_Rdb_ErrCode::RDB_OK) {
405 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
406 }
407 std::vector<std::string> tableNames;
408 tableNames.reserve(count);
409 for (uint32_t i = 0; i < count; i++) {
410 if (tables[i] == nullptr) {
411 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
412 }
413 tableNames.emplace_back(tables[i]);
414 }
415 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->SetDistributedTables(tableNames,
416 DistributedTableType::DISTRIBUTED_CLOUD, { cfg.isAutoSync }));
417 }
418
OH_Rdb_FindModifyTime(OH_Rdb_Store * store,const char * tableName,const char * columnName,OH_VObject * values)419 OH_Cursor *OH_Rdb_FindModifyTime(OH_Rdb_Store *store, const char *tableName, const char *columnName, OH_VObject *values)
420 {
421 auto rdbStore = GetRelationalStore(store);
422 auto selfObjects = RelationalPredicatesObjects::GetSelf(values);
423 if (rdbStore == nullptr || selfObjects == nullptr || tableName == nullptr) {
424 return nullptr;
425 }
426 std::vector<ValueObject> objects = selfObjects->Get();
427 std::vector<OHOS::NativeRdb::RdbStore::PRIKey> keys;
428 keys.reserve(objects.size());
429 for (auto &object : objects) {
430 OHOS::NativeRdb::RdbStore::PRIKey priKey;
431 OHOS::NativeRdb::RawDataParser::Convert(std::move(object.value), priKey);
432 keys.push_back(std::move(priKey));
433 }
434 auto results = rdbStore->GetStore()->GetModifyTime(tableName, columnName, keys);
435 return new (std::nothrow) ModifyTimeCursor(std::move(results));
436 }
437
OH_Rdb_Subscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)438 int OH_Rdb_Subscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
439 {
440 auto rdbStore = GetRelationalStore(store);
441 if (rdbStore == nullptr || observer == nullptr) {
442 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
443 }
444 return rdbStore->DoSubScribe(type, observer);
445 }
446
OH_Rdb_Unsubscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)447 int OH_Rdb_Unsubscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
448 {
449 auto rdbStore = GetRelationalStore(store);
450 if (rdbStore == nullptr) {
451 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
452 }
453 return rdbStore->DoUnsubScribe(type, observer);
454 }
455
DoSubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)456 int RelationalStore::DoSubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
457 {
458 if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS ||
459 observer == nullptr || observer->callback.briefObserver == nullptr ||
460 observer->callback.detailsObserver == nullptr) {
461 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
462 }
463
464 std::lock_guard<decltype(mutex_)> lock(mutex_);
465 auto result = std::any_of(dataObservers_[type].begin(), dataObservers_[type].end(),
466 [observer](const std::shared_ptr<NDKStoreObserver> &item) {
467 return *item.get() == observer;
468 });
469 if (result) {
470 LOG_INFO("duplicate subscribe.");
471 return OH_Rdb_ErrCode::RDB_OK;
472 }
473 auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
474 auto ndkObserver = std::make_shared<NDKStoreObserver>(observer, type);
475 int subscribeResult = (type == RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) ?
476 store_->SubscribeObserver(subscribeOption, ndkObserver) : store_->Subscribe(subscribeOption, ndkObserver.get());
477 if (subscribeResult != OHOS::NativeRdb::E_OK) {
478 LOG_ERROR("subscribe failed.");
479 } else {
480 dataObservers_[type].emplace_back(std::move(ndkObserver));
481 }
482 return ConvertorErrorCode::NativeToNdk(subscribeResult);
483 }
484
DoUnsubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)485 int RelationalStore::DoUnsubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
486 {
487 if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) {
488 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
489 }
490 std::lock_guard<decltype(mutex_)> lock(mutex_);
491 for (auto it = dataObservers_[type].begin(); it != dataObservers_[type].end();) {
492 if (observer != nullptr && !(**it == observer)) {
493 ++it;
494 continue;
495 }
496 auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
497 int errCode = (type == RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) ?
498 store_->UnsubscribeObserver(subscribeOption, *it) : store_->UnSubscribe(subscribeOption, it->get());
499 if (errCode != NativeRdb::E_OK) {
500 LOG_ERROR("unsubscribe failed.");
501 return ConvertorErrorCode::NativeToNdk(errCode);
502 }
503 it = dataObservers_[type].erase(it);
504 LOG_DEBUG("data observer unsubscribe success.");
505 }
506 return OH_Rdb_ErrCode::RDB_OK;
507 }
508
509 namespace {
510 struct RelationalProgressDetails : public Rdb_ProgressDetails {
511 Rdb_TableDetails *details_ = nullptr;
512 explicit RelationalProgressDetails(const ProgressDetail &detail);
513 ~RelationalProgressDetails();
514
515 Rdb_TableDetails *GetTableDetails(int paraVersion);
516 void DestroyTableDetails();
517
518 private:
519 uint8_t *ResizeBuff(size_t size);
520
521 TableDetails tableDetails_;
522 size_t size_ = 0;
523 uint8_t *buffer_ = nullptr;
524 };
525
DestroyTableDetails()526 void RelationalProgressDetails::DestroyTableDetails()
527 {
528 delete[] details_;
529 details_ = nullptr;
530 }
531
RelationalProgressDetails(const ProgressDetail & detail)532 RelationalProgressDetails::RelationalProgressDetails(const ProgressDetail &detail)
533 {
534 version = DISTRIBUTED_PROGRESS_DETAIL_VERSION;
535 schedule = detail.progress;
536 code = detail.code;
537 tableLength = (int32_t)detail.details.size();
538 tableDetails_ = detail.details;
539 }
540
~RelationalProgressDetails()541 RelationalProgressDetails::~RelationalProgressDetails()
542 {
543 if (buffer_ != nullptr) {
544 free(buffer_);
545 }
546 buffer_ = nullptr;
547 }
548
GetTableDetails(int paraVersion)549 Rdb_TableDetails *RelationalProgressDetails::GetTableDetails(int paraVersion)
550 {
551 switch (paraVersion) {
552 case TABLE_DETAIL_V0: {
553 auto length = sizeof(TableDetailsV0) * (tableLength + 1);
554 auto *detailsV0 = (TableDetailsV0 *)ResizeBuff(length);
555 if (detailsV0 == nullptr) {
556 return nullptr;
557 }
558 (void)memset_s(detailsV0, length, 0, length);
559 int index = 0;
560 for (const auto &pair : tableDetails_) {
561 detailsV0[index].table = pair.first.c_str();
562 detailsV0[index].upload = StatisticV0{
563 .total = (int)pair.second.upload.total,
564 .successful = (int)pair.second.upload.success,
565 .failed = (int)pair.second.upload.failed,
566 .remained = (int)pair.second.upload.untreated,
567 };
568 detailsV0[index].download = StatisticV0{
569 .total = (int)pair.second.download.total,
570 .successful = (int)pair.second.download.success,
571 .failed = (int)pair.second.download.failed,
572 .remained = (int)pair.second.download.untreated,
573 };
574 index++;
575 }
576 return reinterpret_cast<Rdb_TableDetails *>(reinterpret_cast<uint8_t *>(detailsV0));
577 }
578 default:
579 return nullptr;
580 }
581 }
582
ResizeBuff(size_t size)583 uint8_t *RelationalProgressDetails::ResizeBuff(size_t size)
584 {
585 if (size_ >= size) {
586 return buffer_;
587 }
588 if (buffer_ != nullptr) {
589 free(buffer_);
590 }
591 buffer_ = (uint8_t *)malloc(size);
592 return buffer_;
593 }
594 } // namespace
595
GetDetails(Rdb_ProgressDetails * progress)596 static std::pair<int, RelationalProgressDetails *> GetDetails(Rdb_ProgressDetails *progress)
597 {
598 if (progress->version != DISTRIBUTED_PROGRESS_DETAIL_VERSION) {
599 return { -1, nullptr };
600 }
601 return { 0, (RelationalProgressDetails *)progress };
602 }
603
OH_Rdb_GetTableDetails(Rdb_ProgressDetails * progress,int32_t version)604 Rdb_TableDetails *OH_Rdb_GetTableDetails(Rdb_ProgressDetails *progress, int32_t version)
605 {
606 auto [errCode, details] = GetDetails(progress);
607 if (errCode == -1 || details == nullptr) {
608 return nullptr;
609 }
610 return details->GetTableDetails(version);
611 }
612
OH_Rdb_CloudSync(OH_Rdb_Store * store,Rdb_SyncMode mode,const char * tables[],uint32_t count,const Rdb_ProgressObserver * observer)613 int OH_Rdb_CloudSync(OH_Rdb_Store *store, Rdb_SyncMode mode, const char *tables[], uint32_t count,
614 const Rdb_ProgressObserver *observer)
615 {
616 auto rdbStore = GetRelationalStore(store);
617 if (rdbStore == nullptr || mode < RDB_SYNC_MODE_TIME_FIRST || mode > RDB_SYNC_MODE_CLOUD_FIRST ||
618 observer == nullptr || observer->callback == nullptr || (count > 0 && tables == nullptr)) {
619 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
620 }
621 SyncOption syncOption{ .mode = NDKUtils::TransformMode(mode), .isBlock = false };
622 std::vector<std::string> tableNames;
623 for (uint32_t i = 0; i < count; ++i) {
624 if (tables[i] == nullptr) {
625 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
626 }
627 tableNames.emplace_back(tables[i]);
628 }
629
630 auto progressCallback = [cxt = (*observer).context, cb = (*observer).callback](Details &&details) {
631 if (details.size() > 1) {
632 LOG_ERROR("Not support edge to edge detail notify.");
633 return;
634 }
635 if (details.empty()) {
636 LOG_ERROR("No device or cloud synced.");
637 return;
638 }
639 for (auto &[device, detail] : details) {
640 RelationalProgressDetails cloudDetail(detail);
641 cb(cxt, &cloudDetail);
642 break;
643 }
644 };
645 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->Sync(syncOption, tableNames, progressCallback));
646 }
647
OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)648 int OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
649 {
650 auto rdbStore = GetRelationalStore(store);
651 if (rdbStore == nullptr || callback == nullptr) {
652 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
653 }
654 return ConvertorErrorCode::NativeToNdk(rdbStore->SubscribeAutoSyncProgress(callback));
655 }
656
OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)657 int OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
658 {
659 auto rdbStore = GetRelationalStore(store);
660 if (rdbStore == nullptr) {
661 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
662 }
663 return ConvertorErrorCode::NativeToNdk(rdbStore->UnsubscribeAutoSyncProgress(callback));
664 }
665
OH_Rdb_LockRow(OH_Rdb_Store * store,OH_Predicates * predicates)666 int OH_Rdb_LockRow(OH_Rdb_Store *store, OH_Predicates *predicates)
667 {
668 auto rdbStore = GetRelationalStore(store);
669 auto predicate = RelationalPredicate::GetSelf(predicates);
670 if (rdbStore == nullptr || predicate == nullptr) {
671 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
672 }
673 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->ModifyLockStatus(predicate->Get(), true));
674 }
675
OH_Rdb_UnlockRow(OH_Rdb_Store * store,OH_Predicates * predicates)676 int OH_Rdb_UnlockRow(OH_Rdb_Store *store, OH_Predicates *predicates)
677 {
678 auto rdbStore = GetRelationalStore(store);
679 auto predicate = RelationalPredicate::GetSelf(predicates);
680 if (rdbStore == nullptr || predicate == nullptr) {
681 return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
682 }
683 return ConvertorErrorCode::NativeToNdk(rdbStore->GetStore()->ModifyLockStatus(predicate->Get(), false));
684 }
685
OH_Rdb_QueryLockedRow(OH_Rdb_Store * store,OH_Predicates * predicates,const char * const * columnNames,int length)686 OH_Cursor *OH_Rdb_QueryLockedRow(
687 OH_Rdb_Store *store, OH_Predicates *predicates, const char *const *columnNames, int length)
688 {
689 auto rdbStore = GetRelationalStore(store);
690 auto predicate = RelationalPredicate::GetSelf(predicates);
691 if (rdbStore == nullptr || predicate == nullptr) {
692 LOG_ERROR("rdbStore or predicate is nullptr.");
693 return nullptr;
694 }
695 std::vector<std::string> columns;
696 if (columnNames != nullptr && length > 0) {
697 columns.reserve(length);
698 for (int i = 0; i < length; i++) {
699 columns.push_back(columnNames[i]);
700 }
701 }
702 predicate->Get().BeginWrap();
703 predicate->Get().EqualTo(OHOS::NativeRdb::AbsRdbPredicates::LOCK_STATUS, OHOS::NativeRdb::AbsRdbPredicates::LOCKED);
704 predicate->Get().Or();
705 predicate->Get().EqualTo(
706 OHOS::NativeRdb::AbsRdbPredicates::LOCK_STATUS, OHOS::NativeRdb::AbsRdbPredicates::LOCK_CHANGED);
707 predicate->Get().EndWrap();
708 std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
709 rdbStore->GetStore()->QueryByStep(predicate->Get(), columns);
710 if (resultSet == nullptr) {
711 return nullptr;
712 }
713 return new OHOS::RdbNdk::RelationalCursor(std::move(resultSet));
714 }
715
NDKDetailProgressObserver(const Rdb_ProgressObserver * callback)716 NDKDetailProgressObserver::NDKDetailProgressObserver(const Rdb_ProgressObserver *callback):callback_(callback)
717 {
718 }
719
ProgressNotification(const Details & details)720 void NDKDetailProgressObserver::ProgressNotification(const Details &details)
721 {
722 if (callback_ == nullptr || details.empty()) {
723 return;
724 }
725 RelationalProgressDetails progressDetails = RelationalProgressDetails(details.begin()->second);
726 (*(callback_->callback))(callback_->context, &progressDetails);
727 progressDetails.DestroyTableDetails();
728 }
729
operator ==(const Rdb_ProgressObserver * callback)730 bool NDKDetailProgressObserver::operator==(const Rdb_ProgressObserver *callback)
731 {
732 return callback == callback_;
733 }
734
NDKStoreObserver(const Rdb_DataObserver * observer,int mode)735 NDKStoreObserver::NDKStoreObserver(const Rdb_DataObserver *observer, int mode) : mode_(mode), observer_(observer) {}
736
OnChange(const std::vector<std::string> & devices)737 void NDKStoreObserver::OnChange(const std::vector<std::string> &devices)
738 {
739 if (mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD) {
740 auto count = devices.size();
741 std::unique_ptr<const char *[]> deviceIds = std::make_unique<const char *[]>(count);
742 for (uint32_t i = 0; i < count; ++i) {
743 deviceIds[i] = devices[i].c_str();
744 }
745 (*observer_->callback.briefObserver)(observer_->context, deviceIds.get(), count);
746 }
747 }
748
GetKeyInfoSize(RdbStoreObserver::ChangeInfo && changeInfo)749 size_t NDKStoreObserver::GetKeyInfoSize(RdbStoreObserver::ChangeInfo &&changeInfo)
750 {
751 size_t size = 0;
752 for (auto it = changeInfo.begin(); it != changeInfo.end(); ++it) {
753 size += it->second[RdbStoreObserver::CHG_TYPE_INSERT].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
754 size += it->second[RdbStoreObserver::CHG_TYPE_UPDATE].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
755 size += it->second[RdbStoreObserver::CHG_TYPE_DELETE].size() * sizeof(Rdb_KeyInfo::Rdb_KeyData);
756 }
757 return size;
758 }
759
GetKeyDataType(std::vector<RdbStoreObserver::PrimaryKey> & primaryKey)760 int32_t NDKStoreObserver::GetKeyDataType(std::vector<RdbStoreObserver::PrimaryKey> &primaryKey)
761 {
762 if (primaryKey.size() == 0) {
763 return OH_ColumnType::TYPE_NULL;
764 }
765 if (std::holds_alternative<int64_t>(primaryKey[0]) || std::holds_alternative<double>(primaryKey[0])) {
766 return OH_ColumnType::TYPE_INT64;
767 }
768 if (std::holds_alternative<std::string>(primaryKey[0])) {
769 return OH_ColumnType::TYPE_TEXT;
770 }
771 return OH_ColumnType::TYPE_NULL;
772 }
773
OnChange(const Origin & origin,const RdbStoreObserver::PrimaryFields & fields,RdbStoreObserver::ChangeInfo && changeInfo)774 void NDKStoreObserver::OnChange(const Origin &origin, const RdbStoreObserver::PrimaryFields &fields,
775 RdbStoreObserver::ChangeInfo &&changeInfo)
776 {
777 uint32_t count = changeInfo.size();
778 if (count == 0) {
779 LOG_ERROR("No any infos.");
780 return;
781 }
782
783 if (mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS ||
784 mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_LOCAL_DETAILS) {
785 size_t size = count * (sizeof(Rdb_ChangeInfo *) + sizeof(Rdb_ChangeInfo)) +
786 GetKeyInfoSize(std::forward<RdbStoreObserver::ChangeInfo &&>(changeInfo));
787 std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(size);
788 Rdb_ChangeInfo **infos = (Rdb_ChangeInfo **)(buffer.get());
789 if (infos == nullptr) {
790 LOG_ERROR("Failed to allocate memory for Rdb_ChangeInfo.");
791 return;
792 }
793
794 Rdb_ChangeInfo *details = (Rdb_ChangeInfo *)(infos + count);
795 Rdb_KeyInfo::Rdb_KeyData *data = (Rdb_KeyInfo::Rdb_KeyData *)(details + count);
796
797 int index = 0;
798 for (auto it = changeInfo.begin(); it != changeInfo.end(); ++it) {
799 infos[index] = &details[index];
800 infos[index]->version = DISTRIBUTED_CHANGE_INFO_VERSION;
801 infos[index]->tableName = it->first.c_str();
802 infos[index]->ChangeType = origin.dataType;
803 infos[index]->inserted.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_INSERT].size());
804 infos[index]->inserted.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_INSERT]);
805 infos[index]->updated.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_UPDATE].size());
806 infos[index]->updated.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_UPDATE]);
807 infos[index]->deleted.count = static_cast<int>(it->second[RdbStoreObserver::CHG_TYPE_DELETE].size());
808 infos[index]->deleted.type = GetKeyDataType(it->second[RdbStoreObserver::CHG_TYPE_DELETE]);
809 ConvertKeyInfoData(data, it->second[RdbStoreObserver::CHG_TYPE_INSERT]);
810 infos[index]->inserted.data = data;
811 ConvertKeyInfoData(data+infos[index]->inserted.count, it->second[RdbStoreObserver::CHG_TYPE_UPDATE]);
812 infos[index]->updated.data = data+infos[index]->inserted.count;
813 ConvertKeyInfoData(data+infos[index]->inserted.count+infos[index]->updated.count,
814 it->second[RdbStoreObserver::CHG_TYPE_DELETE]);
815 infos[index]->deleted.data = data+infos[index]->inserted.count+infos[index]->updated.count;
816 index++;
817 }
818
819 (*observer_->callback.detailsObserver)(observer_->context, const_cast<const Rdb_ChangeInfo**>(infos), count);
820 }
821 }
822
OnChange()823 void NDKStoreObserver::OnChange()
824 {
825 RdbStoreObserver::OnChange();
826 }
827
ConvertKeyInfoData(Rdb_KeyInfo::Rdb_KeyData * keyInfoData,std::vector<RdbStoreObserver::PrimaryKey> & primaryKey)828 void NDKStoreObserver::ConvertKeyInfoData(Rdb_KeyInfo::Rdb_KeyData *keyInfoData,
829 std::vector<RdbStoreObserver::PrimaryKey> &primaryKey)
830 {
831 if (keyInfoData == nullptr || primaryKey.empty()) {
832 LOG_WARN("no data, keyInfoData is nullptr:%{public}d", keyInfoData == nullptr);
833 return;
834 }
835
836 for (size_t i = 0; i < primaryKey.size(); ++i) {
837 const auto &key = primaryKey[i];
838 if (auto val = std::get_if<double>(&key)) {
839 keyInfoData[i].real = *val;
840 } else if (auto val = std::get_if<int64_t>(&key)) {
841 keyInfoData[i].integer = *val;
842 } else if (auto val = std::get_if<std::string>(&key)) {
843 keyInfoData[i].text = val->c_str();
844 } else {
845 LOG_ERROR("Not support the data type.");
846 return;
847 }
848 }
849 }
850
operator ==(const Rdb_DataObserver * other)851 bool NDKStoreObserver::operator==(const Rdb_DataObserver *other)
852 {
853 if (other == nullptr) {
854 return false;
855 }
856 return other->context == observer_->context && &(other->callback) == &(observer_->callback);
857 }