• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "relational_store.h"
17 
18 #include "logger.h"
19 #include "modify_time_cursor.h"
20 #include "raw_data_parser.h"
21 #include "rdb_errno.h"
22 #include "rdb_helper.h"
23 #include "rdb_predicates.h"
24 #include "rdb_sql_utils.h"
25 #include "relational_cursor.h"
26 #include "relational_predicates.h"
27 #include "relational_predicates_objects.h"
28 #include "relational_store_error_code.h"
29 #include "relational_store_impl.h"
30 #include "relational_types_v0.h"
31 #include "relational_values_bucket.h"
32 #include "securec.h"
33 #include "sqlite_global_config.h"
34 
35 using namespace OHOS::RdbNdk;
36 using namespace OHOS::DistributedRdb;
37 constexpr int RDB_STORE_CID = 1234560; // The class id used to uniquely identify the OH_Rdb_Store class.
38 constexpr int RDB_CONFIG_SIZE_V0 = 41;
39 constexpr int RDB_CONFIG_SIZE_V1 = 45;
OH_Rdb_CreateValueObject()40 OH_VObject *OH_Rdb_CreateValueObject()
41 {
42     return new (std::nothrow) RelationalPredicatesObjects();
43 }
44 
OH_Rdb_CreateValuesBucket()45 OH_VBucket *OH_Rdb_CreateValuesBucket()
46 {
47     return new (std::nothrow) RelationalValuesBucket();
48 }
49 
OH_Rdb_CreatePredicates(const char * table)50 OH_Predicates *OH_Rdb_CreatePredicates(const char *table)
51 {
52     if (table == nullptr) {
53         return nullptr;
54     }
55     return new (std::nothrow) RelationalPredicate(table);
56 }
57 
RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store)58 OHOS::RdbNdk::RelationalStore::RelationalStore(std::shared_ptr<OHOS::NativeRdb::RdbStore> store) : store_(store)
59 {
60     id = RDB_STORE_CID;
61 }
62 
SubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)63 int RelationalStore::SubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
64 {
65     std::lock_guard<decltype(mutex_)> lock(mutex_) ;
66     bool result = std::any_of(callbacks_.begin(), callbacks_.end(), [callback](const auto &observer) {
67         return *observer == callback;
68     });
69     if (result) {
70         LOG_INFO("duplicate subscribe");
71         return OH_Rdb_ErrCode::RDB_OK;
72     }
73     auto obs = std::make_shared<NDKDetailProgressObserver>(callback);
74     int errCode = store_->RegisterAutoSyncCallback(obs);
75     if (errCode == NativeRdb::E_OK) {
76         LOG_ERROR("subscribe failed");
77         return errCode;
78     }
79     callbacks_.push_back(std::move(obs));
80     return NativeRdb::E_OK;
81 }
82 
UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver * callback)83 int RelationalStore::UnsubscribeAutoSyncProgress(const Rdb_ProgressObserver *callback)
84 {
85     std::lock_guard<decltype(mutex_)> lock(mutex_) ;
86     for (auto it = callbacks_.begin(); it != callbacks_.end();) {
87         if (callback != nullptr && !(**it == callback)) {
88             ++it;
89             continue;
90         }
91 
92         int errCode = store_->UnregisterAutoSyncCallback(*it);
93         if (errCode != NativeRdb::E_OK) {
94             LOG_ERROR("unsubscribe failed");
95             return errCode;
96         }
97         it = callbacks_.erase(it);
98         LOG_DEBUG("progress unsubscribe success");
99     }
100     return NativeRdb::E_OK;
101 }
102 
~RelationalStore()103 RelationalStore::~RelationalStore()
104 {
105     if (store_ == nullptr || callbacks_.empty()) {
106         return;
107     }
108     for (auto &callback : callbacks_) {
109         store_->UnregisterAutoSyncCallback(callback);
110     }
111 }
112 
TransformMode(Rdb_SyncMode & mode)113 SyncMode NDKUtils::TransformMode(Rdb_SyncMode &mode)
114 {
115     switch (mode) {
116         case RDB_SYNC_MODE_TIME_FIRST:
117             return TIME_FIRST;
118         case RDB_SYNC_MODE_NATIVE_FIRST:
119             return NATIVE_FIRST;
120         case RDB_SYNC_MODE_CLOUD_FIRST:
121             return CLOUD_FIRST;
122         default:
123             return static_cast<SyncMode>(-1);
124     }
125 }
126 
GetSubscribeType(Rdb_SubscribeType & type)127 OHOS::DistributedRdb::SubscribeMode NDKUtils::GetSubscribeType(Rdb_SubscribeType &type)
128 {
129     switch (type) {
130         case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD:
131             return SubscribeMode::CLOUD;
132         case Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS:
133             return SubscribeMode::CLOUD_DETAIL;
134         default:
135             return SubscribeMode::SUBSCRIBE_MODE_MAX;
136     }
137 }
138 
139 class MainOpenCallback : public OHOS::NativeRdb::RdbOpenCallback {
140 public:
141     int OnCreate(OHOS::NativeRdb::RdbStore &rdbStore) override;
142     int OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion) override;
143 };
144 
OnCreate(OHOS::NativeRdb::RdbStore & rdbStore)145 int MainOpenCallback::OnCreate(OHOS::NativeRdb::RdbStore &rdbStore)
146 {
147     return OHOS::NativeRdb::E_OK;
148 }
149 
OnUpgrade(OHOS::NativeRdb::RdbStore & rdbStore,int oldVersion,int newVersion)150 int MainOpenCallback::OnUpgrade(OHOS::NativeRdb::RdbStore &rdbStore, int oldVersion, int newVersion)
151 {
152     return OHOS::NativeRdb::E_OK;
153 }
154 
GetRelationalStore(OH_Rdb_Store * store)155 RelationalStore *GetRelationalStore(OH_Rdb_Store *store)
156 {
157     if (store == nullptr || store->id != RDB_STORE_CID) {
158         LOG_ERROR("store is invalid. is null %{public}d", (store == nullptr));
159         return nullptr;
160     }
161     return static_cast<RelationalStore *>(store);
162 }
163 
OH_Rdb_GetOrOpen(const OH_Rdb_Config * config,int * errCode)164 OH_Rdb_Store *OH_Rdb_GetOrOpen(const OH_Rdb_Config *config, int *errCode)
165 {
166     if (config == nullptr || config->selfSize > RDB_CONFIG_SIZE_V1 || errCode == nullptr) {
167         LOG_ERROR("Parameters set error:config is NULL ? %{public}d and config size is %{public}zu or "
168                   "errCode is NULL ? %{public}d ",
169                   (config == nullptr), sizeof(OH_Rdb_Config), (errCode == nullptr));
170         return nullptr;
171     }
172 
173     std::string realPath =
174         OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, *errCode);
175     if (*errCode != 0) {
176         LOG_ERROR("Get database path failed, ret %{public}d ", *errCode);
177         return nullptr;
178     }
179     OHOS::NativeRdb::RdbStoreConfig rdbStoreConfig(realPath);
180     rdbStoreConfig.SetSecurityLevel(OHOS::NativeRdb::SecurityLevel(config->securityLevel));
181     rdbStoreConfig.SetEncryptStatus(config->isEncrypt);
182     if (config->selfSize > RDB_CONFIG_SIZE_V0) {
183         rdbStoreConfig.SetArea(config->area);
184     }
185     if (config->bundleName != nullptr) {
186         rdbStoreConfig.SetBundleName(config->bundleName);
187     }
188     rdbStoreConfig.SetName(config->storeName);
189 
190     MainOpenCallback callback;
191     std::shared_ptr<OHOS::NativeRdb::RdbStore> store =
192         OHOS::NativeRdb::RdbHelper::GetRdbStore(rdbStoreConfig, -1, callback, *errCode);
193     if (store == nullptr) {
194         LOG_ERROR("Get RDB Store fail %{public}s", realPath.c_str());
195         return nullptr;
196     }
197     return new (std::nothrow) RelationalStore(store);
198 }
199 
OH_Rdb_CloseStore(OH_Rdb_Store * store)200 int OH_Rdb_CloseStore(OH_Rdb_Store *store)
201 {
202     auto rdbStore = GetRelationalStore(store);
203     if (rdbStore == nullptr) {
204         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
205     }
206     delete rdbStore;
207     return OH_Rdb_ErrCode::RDB_OK;
208 }
209 
OH_Rdb_DeleteStore(const OH_Rdb_Config * config)210 int OH_Rdb_DeleteStore(const OH_Rdb_Config *config)
211 {
212     if (config == nullptr || config->dataBaseDir == nullptr || config->storeName == nullptr) {
213         LOG_ERROR("Parameters set error:path is NULL ? %{public}d", (config == nullptr));
214         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
215     }
216     int errCode = OHOS::NativeRdb::E_OK;
217     std::string realPath =
218         OHOS::NativeRdb::RdbSqlUtils::GetDefaultDatabasePath(config->dataBaseDir, config->storeName, errCode);
219     if (errCode != OHOS::NativeRdb::E_OK) {
220         return errCode;
221     }
222     return OHOS::NativeRdb::RdbHelper::DeleteRdbStore(realPath);
223 }
224 
OH_Rdb_Insert(OH_Rdb_Store * store,const char * table,OH_VBucket * valuesBucket)225 int OH_Rdb_Insert(OH_Rdb_Store *store, const char *table, OH_VBucket *valuesBucket)
226 {
227     auto rdbStore = GetRelationalStore(store);
228     auto bucket = RelationalValuesBucket::GetSelf(valuesBucket);
229     if (rdbStore == nullptr || table == nullptr || bucket == nullptr) {
230         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
231     }
232     int64_t rowId = -1;
233     rdbStore->GetStore()->Insert(rowId, table, bucket->Get());
234     return rowId >= 0 ? rowId : OH_Rdb_ErrCode::RDB_ERR;
235 }
236 
OH_Rdb_Update(OH_Rdb_Store * store,OH_VBucket * valueBucket,OH_Predicates * predicates)237 int OH_Rdb_Update(OH_Rdb_Store *store, OH_VBucket *valueBucket, OH_Predicates *predicates)
238 {
239     auto rdbStore = GetRelationalStore(store);
240     auto predicate = RelationalPredicate::GetSelf(predicates);
241     auto bucket = RelationalValuesBucket::GetSelf(valueBucket);
242     if (rdbStore == nullptr || predicate == nullptr || bucket == nullptr) {
243         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
244     }
245     int updatedRows = -1;
246     rdbStore->GetStore()->Update(updatedRows, bucket->Get(), predicate->Get());
247     return updatedRows >= 0 ? updatedRows : OH_Rdb_ErrCode::RDB_ERR;
248 }
249 
OH_Rdb_Delete(OH_Rdb_Store * store,OH_Predicates * predicates)250 int OH_Rdb_Delete(OH_Rdb_Store *store, OH_Predicates *predicates)
251 {
252     auto rdbStore = GetRelationalStore(store);
253     auto predicate = RelationalPredicate::GetSelf(predicates);
254     if (rdbStore == nullptr || predicate == nullptr) {
255         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
256     }
257     int deletedRows = -1;
258     rdbStore->GetStore()->Delete(deletedRows, predicate->Get());
259     return deletedRows >= 0 ? deletedRows : OH_Rdb_ErrCode::RDB_ERR;
260 }
261 
OH_Rdb_Query(OH_Rdb_Store * store,OH_Predicates * predicates,const char * const * columnNames,int length)262 OH_Cursor *OH_Rdb_Query(OH_Rdb_Store *store, OH_Predicates *predicates, const char *const *columnNames, int length)
263 {
264     auto rdbStore = GetRelationalStore(store);
265     auto predicate = RelationalPredicate::GetSelf(predicates);
266     if (rdbStore == nullptr || predicate == nullptr) {
267         return nullptr;
268     }
269     std::vector<std::string> columns;
270     if (columnNames != nullptr && length > 0) {
271         columns.reserve(length);
272         for (int i = 0; i < length; i++) {
273             columns.push_back(columnNames[i]);
274         }
275     }
276 
277     std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
278         rdbStore->GetStore()->QueryByStep(predicate->Get(), columns);
279     if (resultSet == nullptr) {
280         return nullptr;
281     }
282     return new (std::nothrow) RelationalCursor(std::move(resultSet));
283 }
284 
OH_Rdb_ExecuteQuery(OH_Rdb_Store * store,const char * sql)285 OH_Cursor *OH_Rdb_ExecuteQuery(OH_Rdb_Store *store, const char *sql)
286 {
287     auto rdbStore = GetRelationalStore(store);
288     if (rdbStore == nullptr || sql == nullptr) {
289         return nullptr;
290     }
291     std::shared_ptr<OHOS::NativeRdb::ResultSet> resultSet =
292         rdbStore->GetStore()->QuerySql(sql, std::vector<std::string>{});
293     if (resultSet == nullptr) {
294         return nullptr;
295     }
296     return new OHOS::RdbNdk::RelationalCursor(std::move(resultSet));
297 }
298 
OH_Rdb_Execute(OH_Rdb_Store * store,const char * sql)299 int OH_Rdb_Execute(OH_Rdb_Store *store, const char *sql)
300 {
301     auto rdbStore = GetRelationalStore(store);
302     if (rdbStore == nullptr || sql == nullptr) {
303         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
304     }
305     return rdbStore->GetStore()->ExecuteSql(sql, std::vector<OHOS::NativeRdb::ValueObject>{});
306 }
307 
OH_Rdb_BeginTransaction(OH_Rdb_Store * store)308 int OH_Rdb_BeginTransaction(OH_Rdb_Store *store)
309 {
310     auto rdbStore = GetRelationalStore(store);
311     if (rdbStore == nullptr) {
312         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
313     }
314     return rdbStore->GetStore()->BeginTransaction();
315 }
316 
OH_Rdb_RollBack(OH_Rdb_Store * store)317 int OH_Rdb_RollBack(OH_Rdb_Store *store)
318 {
319     auto rdbStore = GetRelationalStore(store);
320     if (rdbStore == nullptr) {
321         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
322     }
323     return rdbStore->GetStore()->RollBack();
324 }
325 
OH_Rdb_Commit(OH_Rdb_Store * store)326 int OH_Rdb_Commit(OH_Rdb_Store *store)
327 {
328     auto rdbStore = GetRelationalStore(store);
329     if (rdbStore == nullptr) {
330         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
331     }
332     return rdbStore->GetStore()->Commit();
333 }
334 
OH_Rdb_Backup(OH_Rdb_Store * store,const char * databasePath)335 int OH_Rdb_Backup(OH_Rdb_Store *store, const char *databasePath)
336 {
337     auto rdbStore = GetRelationalStore(store);
338     if (rdbStore == nullptr || databasePath == nullptr) {
339         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
340     }
341     return rdbStore->GetStore()->Backup(databasePath);
342 }
343 
OH_Rdb_Restore(OH_Rdb_Store * store,const char * databasePath)344 int OH_Rdb_Restore(OH_Rdb_Store *store, const char *databasePath)
345 {
346     auto rdbStore = GetRelationalStore(store);
347     if (rdbStore == nullptr || databasePath == nullptr) {
348         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
349     }
350     return rdbStore->GetStore()->Restore(databasePath);
351 }
352 
OH_Rdb_GetVersion(OH_Rdb_Store * store,int * version)353 int OH_Rdb_GetVersion(OH_Rdb_Store *store, int *version)
354 {
355     auto rdbStore = GetRelationalStore(store);
356     if (rdbStore == nullptr || version == nullptr) {
357         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
358     }
359     return rdbStore->GetStore()->GetVersion(*version);
360 }
361 
OH_Rdb_SetVersion(OH_Rdb_Store * store,int version)362 int OH_Rdb_SetVersion(OH_Rdb_Store *store, int version)
363 {
364     auto rdbStore = GetRelationalStore(store);
365     if (rdbStore == nullptr) {
366         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
367     }
368     return rdbStore->GetStore()->SetVersion(version);
369 }
370 
Convert(const Rdb_DistributedConfig * config)371 static std::pair<int32_t, Rdb_DistributedConfig> Convert(const Rdb_DistributedConfig *config)
372 {
373     std::pair<int32_t, Rdb_DistributedConfig> result = { OH_Rdb_ErrCode::RDB_E_INVALID_ARGS, {} };
374     auto &[errCode, cfg] = result;
375     switch (config->version) {
376         case DISTRIBUTED_CONFIG_V0: {
377             const auto *realCfg = reinterpret_cast<const DistributedConfig_V0 *>(config);
378             cfg.version = realCfg->version;
379             cfg.isAutoSync = realCfg->isAutoSync;
380             errCode = OH_Rdb_ErrCode::RDB_OK;
381             break;
382         }
383         default:
384             break;
385     }
386     return result;
387 }
388 
OH_Rdb_SetDistributedTables(OH_Rdb_Store * store,const char * tables[],uint32_t count,Rdb_DistributedType type,const Rdb_DistributedConfig * config)389 int OH_Rdb_SetDistributedTables(OH_Rdb_Store *store, const char *tables[], uint32_t count, Rdb_DistributedType type,
390     const Rdb_DistributedConfig *config)
391 {
392     auto rdbStore = GetRelationalStore(store);
393     if (rdbStore == nullptr || type != Rdb_DistributedType::RDB_DISTRIBUTED_CLOUD || (count > 0 && tables == nullptr)) {
394         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
395     }
396 
397     auto [errCode, cfg] = Convert(config);
398     if (errCode != OH_Rdb_ErrCode::RDB_OK) {
399         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
400     }
401     std::vector<std::string> tableNames;
402     tableNames.reserve(count);
403     for (uint32_t i = 0; i < count; i++) {
404         if (tables[i] == nullptr) {
405             return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
406         }
407         tableNames.emplace_back(tables[i]);
408     }
409     return rdbStore->GetStore()->SetDistributedTables(tableNames, DistributedTableType::DISTRIBUTED_CLOUD,
410                                                       { cfg.isAutoSync });
411 }
412 
OH_Rdb_FindModifyTime(OH_Rdb_Store * store,const char * tableName,const char * columnName,OH_VObject * values)413 OH_Cursor *OH_Rdb_FindModifyTime(OH_Rdb_Store *store, const char *tableName, const char *columnName, OH_VObject *values)
414 {
415     auto rdbStore = GetRelationalStore(store);
416     auto selfObjects = RelationalPredicatesObjects::GetSelf(values);
417     if (rdbStore == nullptr || selfObjects == nullptr || tableName == nullptr) {
418         return nullptr;
419     }
420     std::vector<ValueObject> objects = selfObjects->Get();
421     std::vector<OHOS::NativeRdb::RdbStore::PRIKey> keys;
422     keys.reserve(objects.size());
423     for (auto &object : objects) {
424         OHOS::NativeRdb::RdbStore::PRIKey priKey;
425         OHOS::NativeRdb::RawDataParser::Convert(std::move(object.value), priKey);
426         keys.push_back(std::move(priKey));
427     }
428     auto results = rdbStore->GetStore()->GetModifyTime(tableName, columnName, keys);
429     return new (std::nothrow) ModifyTimeCursor(std::move(results));
430 }
431 
OH_Rdb_Subscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)432 int OH_Rdb_Subscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
433 {
434     auto rdbStore = GetRelationalStore(store);
435     if (rdbStore == nullptr || observer == nullptr) {
436         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
437     }
438     return (OH_Rdb_ErrCode)rdbStore->DoSubScribe(type, observer);
439 }
440 
OH_Rdb_Unsubscribe(OH_Rdb_Store * store,Rdb_SubscribeType type,const Rdb_DataObserver * observer)441 int OH_Rdb_Unsubscribe(OH_Rdb_Store *store, Rdb_SubscribeType type, const Rdb_DataObserver *observer)
442 {
443     auto rdbStore = GetRelationalStore(store);
444     if (rdbStore == nullptr) {
445         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
446     }
447     return (OH_Rdb_ErrCode)rdbStore->DoUnsubScribe(type, observer);
448 }
449 
DoSubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)450 int RelationalStore::DoSubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
451 {
452     if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS ||
453         observer->callback.briefObserver == nullptr || observer->callback.detailsObserver == nullptr) {
454         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
455     }
456     std::lock_guard<decltype(mutex_)> lock(mutex_);
457     auto result = std::any_of(dataObservers_[type].begin(), dataObservers_[type].end(),
458                               [observer](const std::shared_ptr<NDKStoreObserver> &item) {
459                                   return *item.get() == observer;
460                               });
461     if (result) {
462         LOG_INFO("duplicate subscribe");
463         return OH_Rdb_ErrCode::RDB_OK;
464     }
465     auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
466     auto ndkObserver = std::make_shared<NDKStoreObserver>(observer, type);
467     auto subscribeResult = store_->Subscribe(subscribeOption, ndkObserver.get());
468     if (subscribeResult != OHOS::NativeRdb::E_OK) {
469         LOG_ERROR("subscribe failed");
470     } else {
471         dataObservers_[type].emplace_back(std::move(ndkObserver));
472     }
473     return (OH_Rdb_ErrCode)subscribeResult;
474 }
475 
DoUnsubScribe(Rdb_SubscribeType type,const Rdb_DataObserver * observer)476 int RelationalStore::DoUnsubScribe(Rdb_SubscribeType type, const Rdb_DataObserver *observer)
477 {
478     if (store_ == nullptr || type < RDB_SUBSCRIBE_TYPE_CLOUD || type > RDB_SUBSCRIBE_TYPE_CLOUD_DETAILS) {
479         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
480     }
481     std::lock_guard<decltype(mutex_)> lock(mutex_);
482     for (auto it = dataObservers_[type].begin(); it != dataObservers_[type].end();) {
483         if (observer != nullptr && !(**it == observer)) {
484             ++it;
485             continue;
486         }
487         auto subscribeOption = SubscribeOption{ .mode = NDKUtils::GetSubscribeType(type), .event = "data_change" };
488         int errCode = store_->UnSubscribe(subscribeOption, it->get());
489         if (errCode != NativeRdb::E_OK) {
490             LOG_ERROR("unsubscribe failed");
491             return errCode;
492         }
493         it = dataObservers_[type].erase(it);
494         LOG_DEBUG("data observer unsubscribe success");
495     }
496     return NativeRdb::E_OK;
497 }
498 
499 namespace {
500     struct RelationalProgressDetails : public Rdb_ProgressDetails {
501         Rdb_TableDetails *details_ = nullptr;
502         explicit RelationalProgressDetails(const ProgressDetail &detail);
503         ~RelationalProgressDetails();
504 
505         Rdb_TableDetails *GetTableDetails(int paraVersion);
506         void DestroyTableDetails();
507 
508     private:
509         uint8_t *ResizeBuff(size_t size);
510 
511         TableDetails tableDetails_;
512         size_t size_ = 0;
513         uint8_t *buffer_ = nullptr;
514     };
515 
DestroyTableDetails()516     void RelationalProgressDetails::DestroyTableDetails()
517     {
518         delete[] details_;
519         details_ = nullptr;
520     }
521 
RelationalProgressDetails(const ProgressDetail & detail)522     RelationalProgressDetails::RelationalProgressDetails(const ProgressDetail &detail)
523     {
524         version = DISTRIBUTED_PROGRESS_DETAIL_VERSION;
525         schedule = detail.progress;
526         code = detail.code;
527         tableLength = (int32_t)detail.details.size();
528         tableDetails_ = detail.details;
529     }
530 
~RelationalProgressDetails()531     RelationalProgressDetails::~RelationalProgressDetails()
532     {
533         if (buffer_ != nullptr) {
534             free(buffer_);
535         }
536         buffer_ = nullptr;
537     }
538 
GetTableDetails(int paraVersion)539     Rdb_TableDetails *RelationalProgressDetails::GetTableDetails(int paraVersion)
540     {
541         switch (paraVersion) {
542             case TABLE_DETAIL_V0: {
543                 auto length = sizeof(TableDetails_V0) * (tableLength + 1);
544                 auto *detailsV0 = (TableDetails_V0 *)ResizeBuff(length);
545                 (void)memset_s(detailsV0, length, 0, length);
546                 int index = 0;
547                 for (const auto &pair : tableDetails_) {
548                     detailsV0[index].table = pair.first.c_str();
549                     detailsV0[index].upload = Statistic_V0{
550                             .total = (int)pair.second.upload.total,
551                             .successful = (int)pair.second.upload.success,
552                             .failed = (int)pair.second.upload.failed,
553                             .remained = (int)pair.second.upload.untreated,
554                     };
555                     detailsV0[index].download = Statistic_V0{
556                             .total = (int)pair.second.download.total,
557                             .successful = (int)pair.second.download.success,
558                             .failed = (int)pair.second.download.failed,
559                             .remained = (int)pair.second.download.untreated,
560                     };
561                     index++;
562                 }
563                 return reinterpret_cast<Rdb_TableDetails *>(reinterpret_cast<uint8_t *>(detailsV0));
564             }
565             default:
566                 return nullptr;
567         }
568     }
569 
ResizeBuff(size_t size)570     uint8_t *RelationalProgressDetails::ResizeBuff(size_t size)
571     {
572         if (size_ >= size) {
573             return buffer_;
574         }
575         if (buffer_ != nullptr) {
576             free(buffer_);
577         }
578         buffer_ = (uint8_t *)malloc(size);
579         return buffer_;
580     }
581 } // namespace
582 
GetDetails(Rdb_ProgressDetails * progress)583 static std::pair<int, RelationalProgressDetails *> GetDetails(Rdb_ProgressDetails *progress)
584 {
585     if (progress->version != DISTRIBUTED_PROGRESS_DETAIL_VERSION) {
586         return { -1, nullptr };
587     }
588     return { 0, (RelationalProgressDetails *)progress };
589 }
590 
OH_Rdb_GetTableDetails(Rdb_ProgressDetails * progress,int32_t version)591 Rdb_TableDetails *OH_Rdb_GetTableDetails(Rdb_ProgressDetails *progress, int32_t version)
592 {
593     auto [errCode, details] = GetDetails(progress);
594     if (errCode == -1 || details == nullptr) {
595         return nullptr;
596     }
597     return details->GetTableDetails(version);
598 }
599 
OH_Rdb_CloudSync(OH_Rdb_Store * store,Rdb_SyncMode mode,const char * tables[],uint32_t count,const Rdb_ProgressObserver * observer)600 int OH_Rdb_CloudSync(OH_Rdb_Store *store, Rdb_SyncMode mode, const char *tables[], uint32_t count,
601                      const Rdb_ProgressObserver *observer)
602 {
603     auto rdbStore = GetRelationalStore(store);
604     if (rdbStore == nullptr || mode < RDB_SYNC_MODE_TIME_FIRST || mode > RDB_SYNC_MODE_CLOUD_FIRST ||
605         observer == nullptr || observer->callback == nullptr || (count > 0 && tables == nullptr)) {
606         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
607     }
608     SyncOption syncOption{ .mode = NDKUtils::TransformMode(mode), .isBlock = false };
609     std::vector<std::string> tableNames;
610     for (uint32_t i = 0; i < count; ++i) {
611         if (tables[i] == nullptr) {
612             return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
613         }
614         tableNames.emplace_back(tables[i]);
615     }
616 
617     auto progressCallback = [cxt = (*observer).context, cb = (*observer).callback](Details &&details) {
618         if (details.size() > 1) {
619             LOG_ERROR("Not support edge to edge detail notify");
620             return;
621         }
622         if (details.empty()) {
623             LOG_ERROR("No device or cloud synced");
624             return;
625         }
626         for (auto &[device, detail] : details) {
627             RelationalProgressDetails cloudDetail(detail);
628             cb(cxt, &cloudDetail);
629             break;
630         }
631     };
632     return rdbStore->GetStore()->Sync(syncOption, tableNames, progressCallback);
633 }
634 
OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)635 int OH_Rdb_SubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
636 {
637     auto rdbStore = GetRelationalStore(store);
638     if (rdbStore == nullptr || callback == nullptr) {
639         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
640     }
641     return rdbStore->SubscribeAutoSyncProgress(callback);
642 }
643 
OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store * store,const Rdb_ProgressObserver * callback)644 int OH_Rdb_UnsubscribeAutoSyncProgress(OH_Rdb_Store *store, const Rdb_ProgressObserver *callback)
645 {
646     auto rdbStore = GetRelationalStore(store);
647     if (rdbStore == nullptr) {
648         return OH_Rdb_ErrCode::RDB_E_INVALID_ARGS;
649     }
650     return rdbStore->UnsubscribeAutoSyncProgress(callback);
651 }
652 
NDKDetailProgressObserver(const Rdb_ProgressObserver * callback)653 NDKDetailProgressObserver::NDKDetailProgressObserver(const Rdb_ProgressObserver *callback):callback_(callback)
654 {
655 }
656 
ProgressNotification(const Details & details)657 void NDKDetailProgressObserver::ProgressNotification(const Details &details)
658 {
659     if (callback_ == nullptr || details.empty()) {
660         return;
661     }
662     RelationalProgressDetails progressDetails = RelationalProgressDetails(details.begin()->second);
663     (*(callback_->callback))(callback_->context, &progressDetails);
664     progressDetails.DestroyTableDetails();
665 }
666 
operator ==(const Rdb_ProgressObserver * callback)667 bool NDKDetailProgressObserver::operator==(const Rdb_ProgressObserver *callback)
668 {
669     return callback == callback_;
670 }
671 
NDKStoreObserver(const Rdb_DataObserver * observer,int mode)672 NDKStoreObserver::NDKStoreObserver(const Rdb_DataObserver *observer, int mode) : mode_(mode), observer_(observer) {}
673 
OnChange(const std::vector<std::string> & devices)674 void NDKStoreObserver::OnChange(const std::vector<std::string> &devices)
675 {
676     if (mode_ == Rdb_SubscribeType::RDB_SUBSCRIBE_TYPE_CLOUD) {
677         auto count = devices.size();
678         std::unique_ptr<const char *[]> deviceIds = std::make_unique<const char *[]>(count);
679         for (auto i = 0; i < count; ++i) {
680             deviceIds[i] = devices[i].c_str();
681         }
682         (*observer_->callback.briefObserver)(observer_->context, deviceIds.get(), count);
683     }
684 }
685 
OnChange(const Origin & origin,const RdbStoreObserver::PrimaryFields & fields,RdbStoreObserver::ChangeInfo && changeInfo)686 void NDKStoreObserver::OnChange(const Origin &origin, const RdbStoreObserver::PrimaryFields &fields,
687                                 RdbStoreObserver::ChangeInfo &&changeInfo)
688 {
689 }
690 
OnChange()691 void NDKStoreObserver::OnChange()
692 {
693     RdbStoreObserver::OnChange();
694 }
695 
ConvertKeyInfo(Rdb_KeyInfo & keyInfo,std::vector<RdbStoreObserver::PrimaryKey> & primaryKey)696 void NDKStoreObserver::ConvertKeyInfo(Rdb_KeyInfo &keyInfo, std::vector<RdbStoreObserver::PrimaryKey> &primaryKey)
697 {
698 }
699 
operator ==(const Rdb_DataObserver * other)700 bool NDKStoreObserver::operator==(const Rdb_DataObserver *other)
701 {
702     if (other == nullptr) {
703         return false;
704     }
705     return other->context == observer_->context && &(other->callback) == &(observer_->callback);
706 }