1 /*
2 * Copyright (C) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16 #include <unistd.h>
17
18 #include "callbacks_manager.h"
19 #include "data_ability_observer_stub.h"
20 #include "datashare_errno.h"
21 #include "datashare_helper.h"
22 #include "datashare_log.h"
23 #include "datashare_template.h"
24 #include "datashare_string_utils.h"
25 #include "iservice_registry.h"
26 #include "published_data_subscriber_manager.h"
27 #include "rdb_subscriber_manager.h"
28
29 namespace OHOS {
30 namespace DataShare {
31 using namespace testing::ext;
32 using RdbBaseCallbacks = CallbacksManager<RdbObserverMapKey, RdbObserver>;
33 using RdbCallback = std::function<void(const RdbChangeNode &changeNode)>;
__anon2fa986560102(const RdbChangeNode &changeNode) 34 RdbCallback g_rbdCallback = [](const RdbChangeNode &changeNode) {};
35 RdbChangeNode g_rdbChangeNode;
36 using PublishedBaseCallbacks = CallbacksManager<PublishedObserverMapKey, PublishedDataObserver>;
37 using PublishedDataCallback = std::function<void(PublishedDataChangeNode &changeNode)>;
__anon2fa986560202(const PublishedDataChangeNode &changeNode) 38 PublishedDataCallback g_publishedCallback = [](const PublishedDataChangeNode &changeNode) {};
39 PublishedDataChangeNode g_publishedChangeNode;
40 constexpr int TEST_TIME = 20;
41 void *g_subscriber;
42
43 /**
44 * @Usage: add long_time/concurrent_test/ConcurrentTest to unittest.deps of file
45 * foundation/distributeddatamgr/data_share/test/native/BUILD.gn
46 */
47 class ConcurrentSubscriberTest : public testing::Test {
48 public:
49 static void SetUpTestCase(void);
50 static void TearDownTestCase(void);
51 void SetUp();
52 void TearDown();
53 };
54
SetUpTestCase(void)55 void ConcurrentSubscriberTest::SetUpTestCase(void)
56 {
57 }
58
TearDownTestCase(void)59 void ConcurrentSubscriberTest::TearDownTestCase(void)
60 {
61 }
62
SetUp(void)63 void ConcurrentSubscriberTest::SetUp(void)
64 {
65 // input testCase setup step,setup invoked before each testCase
66 testing::UnitTest *test = testing::UnitTest::GetInstance();
67 ASSERT_NE(test, nullptr);
68 const testing::TestInfo *testInfo = test->current_test_info();
69 ASSERT_NE(testInfo, nullptr);
70 string testCaseName = string(testInfo->name());
71 LOG_INFO("[SetUp] %{public}s start", testCaseName.c_str());
72 GTEST_LOG_(INFO) << testCaseName.append(" start");
73 }
TearDown(void)74 void ConcurrentSubscriberTest::TearDown(void)
75 {
76 // input testCase teardown step,teardown invoked after each testCase
77 testing::UnitTest *test = testing::UnitTest::GetInstance();
78 ASSERT_NE(test, nullptr);
79 const testing::TestInfo *testInfo = test->current_test_info();
80 ASSERT_NE(testInfo, nullptr);
81 string testCaseName = string(testInfo->name());
82 LOG_INFO("[SetUp] %{public}s end", testCaseName.c_str());
83 GTEST_LOG_(INFO) << testCaseName.append(" end");
84 }
85
86 class RdbSubscriberManagerTest : public CallbacksManager<RdbObserverMapKey, RdbObserver> {
87 public:
88 using Key = RdbObserverMapKey;
89 using Observer = RdbObserver;
90 void AddObservers(int64_t subscriberId, std::string &bundleName, std::string &uri, std::atomic<bool> &stop);
91 void DelObservers(int64_t subscriberId, std::string &bundleName, std::string &uri, std::atomic<bool> &stop);
92 };
93
AddObservers(int64_t subscriberId,std::string & bundleName,std::string & uri,std::atomic<bool> & stop)94 void RdbSubscriberManagerTest::AddObservers(
95 int64_t subscriberId, std::string &bundleName, std::string &uri, std::atomic<bool> &stop)
96 {
97 void *subscriber = g_subscriber;
98 RdbChangeNode *changeNode = &g_rdbChangeNode;
99 DataShare::TemplateId templateId;
100 templateId.subscriberId_ = subscriberId, templateId.bundleName_ = bundleName;
101 std::vector<std::string> uris;
102 uris.emplace_back(uri);
103 Key rdbKey(uri, templateId);
104 std::vector<Key> keys;
105 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) { keys.emplace_back(uri, templateId); });
106 while (!stop.load()) {
107 LOG_INFO("Rdb AddObservers start, subscriberId: %{public}d", static_cast<int>(subscriberId));
108 RdbBaseCallbacks::AddObservers(
109 keys, subscriber, std::make_shared<Observer>(g_rbdCallback),
110 [](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {},
111 [&subscriber, &templateId, &rdbKey, &changeNode](const std::vector<Key> &firstAddKeys,
112 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
113 std::vector<std::string> firstAddUris;
114 std::for_each(firstAddKeys.begin(), firstAddKeys.end(),
115 [&firstAddUris](auto &result) { firstAddUris.emplace_back(result); });
116 if (firstAddUris.empty()) {
117 return;
118 }
119 RdbSubscriberManager::GetInstance().lastChangeNodeMap_.InsertOrAssign(rdbKey, *changeNode);
120 });
121 LOG_INFO("Rdb AddObservers end, subscriberId: %{public}d", static_cast<int>(subscriberId));
122 }
123 }
124
DelObservers(int64_t subscriberId,std::string & bundleName,std::string & uri,std::atomic<bool> & stop)125 void RdbSubscriberManagerTest::DelObservers(
126 int64_t subscriberId, std::string &bundleName, std::string &uri, std::atomic<bool> &stop)
127 {
128 void *subscriber = g_subscriber;
129 DataShare::TemplateId templateId;
130 templateId.subscriberId_ = subscriberId, templateId.bundleName_ = bundleName;
131 std::vector<std::string> uris;
132 uris.emplace_back(uri);
133 std::vector<Key> keys;
134 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) { keys.emplace_back(uri, templateId); });
135 while (!stop.load()) {
136 LOG_INFO("Rdb DelObservers start, subscriberId: %{public}d", static_cast<int>(subscriberId));
137 RdbBaseCallbacks::DelObservers(
138 keys, subscriber, [](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
139 std::for_each(lastDelKeys.begin(), lastDelKeys.end(),
140 [](auto &result) { RdbSubscriberManager::GetInstance().lastChangeNodeMap_.Erase(result); });
141 });
142 LOG_INFO("Rdb DelObservers end, subscriberId: %{public}d", static_cast<int>(subscriberId));
143 }
144 }
145
146 /**
147 * @tc.name: ConcurrentRdbObserverTest
148 * @tc.desc: Verify concurrent SubscribeRdbData and UnsubscribeRdbData operations
149 * @tc.type: concurrent
150 * @tc.require: None
151 * @tc.precon: RdbSubscriberManager is properly initialized
152 * @tc.step:
153 1. Create an instance of RdbSubscriberManagerTest
154 2. Define two URIs and two bundle names for testing
155 3. Create four threads to concurrently perform:
156 - Add observers for URI0 with bundleName0
157 - Delete observers for URI0 with bundleName0
158 - Add observers for URI1 with bundleName1
159 - Delete observers for URI1 with bundleName1
160 4. Run the concurrent operations for a specified test duration
161 5. Stop all threads and wait for their completion
162 * @tc.expect:
163 1. All concurrent operations complete without crashes
164 2. No deadlocks occur during concurrent subscription management
165 3. Observer management maintains internal consistency
166 */
167 HWTEST_F(ConcurrentSubscriberTest, ConcurrentRdbObserverTest, TestSize.Level0)
168 {
169 std::atomic<bool> stop = false;
170 int testTime = TEST_TIME;
171 RdbSubscriberManagerTest instance;
172 std::string uri0 = "uri0";
173 std::string uri1 = "uri1";
174 std::string bundleName0 = "bundleName0";
175 std::string bundleName1 = "bundleName1";
__anon2fa986560a02() 176 std::function<void()> func1 = [&instance, &bundleName0, &uri0, &stop]() {
177 instance.AddObservers(0, bundleName0, uri0, stop);
178 };
__anon2fa986560b02() 179 std::function<void()> func2 = [&instance, &bundleName0, &uri0, &stop]() {
180 instance.DelObservers(0, bundleName0, uri0, stop);
181 };
__anon2fa986560c02() 182 std::function<void()> func3 = [&instance, &bundleName1, &uri1, &stop]() {
183 instance.AddObservers(1, bundleName1, uri1, stop);
184 };
__anon2fa986560d02() 185 std::function<void()> func4 = [&instance, &bundleName1, &uri1, &stop]() {
186 instance.DelObservers(1, bundleName1, uri1, stop);
187 };
188 std::thread t1(func1);
189 std::thread t2(func2);
190 std::thread t3(func3);
191 std::thread t4(func4);
192 while (testTime > 0) {
193 sleep(1);
194 testTime--;
195 }
196 stop = true;
197 t1.join();
198 t2.join();
199 t3.join();
200 t4.join();
201 }
202
203 class PublishedDataSubscriberManagerTest : public CallbacksManager<PublishedObserverMapKey, PublishedDataObserver> {
204 public:
205 using Callback = std::function<void(const PublishedDataChangeNode &changeNode)>;
206 using Key = PublishedObserverMapKey;
207 using Observer = PublishedDataObserver;
208 void AddObservers(int64_t subscriberId, std::string &uri, std::atomic<bool> &stop);
209 void DelObservers(int64_t subscriberId, std::string &uri, std::atomic<bool> &stop);
210 };
211
AddObservers(int64_t subscriberId,std::string & uri,std::atomic<bool> & stop)212 void PublishedDataSubscriberManagerTest::AddObservers(int64_t subscriberId, std::string &uri, std::atomic<bool> &stop)
213 {
214 void *subscriber = g_subscriber;
215 PublishedDataChangeNode *changeNode = &g_publishedChangeNode;
216 std::vector<std::string> uris;
217 uris.emplace_back(uri);
218 Key publishedKey(uri, subscriberId);
219 std::vector<Key> keys;
220 std::for_each(
221 uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) { keys.emplace_back(uri, subscriberId); });
222 while (!stop.load()) {
223 LOG_INFO("Published AddObservers start, subscriberId: %{public}d", static_cast<int>(subscriberId));
224 PublishedBaseCallbacks::AddObservers(
225 keys, subscriber, std::make_shared<Observer>(g_publishedCallback),
226 [](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {},
227 [&subscriber, &publishedKey, &changeNode](const std::vector<Key> &firstAddKeys,
228 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
229 std::vector<std::string> firstAddUris;
230 std::for_each(firstAddKeys.begin(), firstAddKeys.end(),
231 [&firstAddUris](auto &result) { firstAddUris.emplace_back(result); });
232 if (firstAddUris.empty()) {
233 return;
234 }
235 PublishedDataSubscriberManager::GetInstance().lastChangeNodeMap_.Compute(
236 publishedKey, [](const Key &, PublishedDataChangeNode &value) {
237 value.datas_.clear();
238 return true;
239 });
240 PublishedDataSubscriberManager::GetInstance().lastChangeNodeMap_.Compute(
241 publishedKey, [&publishedKey](const Key &, PublishedDataChangeNode &value) {
242 value.datas_.emplace_back(publishedKey.uri_, publishedKey.subscriberId_, "data");
243 value.ownerBundleName_ = "";
244 return true;
245 });
246 });
247 LOG_INFO("Published AddObservers end, subscriberId: %{public}d", static_cast<int>(subscriberId));
248 }
249 }
250
DelObservers(int64_t subscriberId,std::string & uri,std::atomic<bool> & stop)251 void PublishedDataSubscriberManagerTest::DelObservers(int64_t subscriberId, std::string &uri, std::atomic<bool> &stop)
252 {
253 void *subscriber = g_subscriber;
254 std::vector<std::string> uris;
255 uris.emplace_back(uri);
256 std::vector<Key> keys;
257 std::for_each(
258 uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) { keys.emplace_back(uri, subscriberId); });
259 while (!stop.load()) {
260 LOG_INFO("Published DelObservers start, subscriberId: %{public}d", static_cast<int>(subscriberId));
261 PublishedBaseCallbacks::DelObservers(
262 keys, subscriber, [](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
263 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [](auto &result) {
264 PublishedDataSubscriberManager::GetInstance().lastChangeNodeMap_.Erase(result);
265 });
266 });
267 LOG_INFO("Published DelObservers end, subscriberId: %{public}d", static_cast<int>(subscriberId));
268 }
269 }
270
271 /**
272 * @tc.name: ConcurrentPublishObserverTest
273 * @tc.desc: Verify concurrent SubscribePublishedData and UnsubscribePublishedData operations
274 * @tc.type: concurrent
275 * @tc.require: None
276 * @tc.precon: PublishedDataSubscriberManager is properly initialized
277 * @tc.step:
278 1. Create an instance of PublishedDataSubscriberManagerTest
279 2. Define two URIs for testing
280 3. Create four threads to concurrently perform:
281 - Add observers for URI0 with subscriber ID 0
282 - Delete observers for URI0 with subscriber ID 0
283 - Add observers for URI1 with subscriber ID 1
284 - Delete observers for URI1 with subscriber ID 1
285 4. Run the concurrent operations for a specified test duration
286 5. Stop all threads and wait for their completion
287 * @tc.expect:
288 1. All concurrent operations complete without crashes
289 2. No deadlocks occur during concurrent subscription management
290 3. Published data observer map maintains internal consistency
291 4. Change node data is properly managed during concurrent access
292 */
293 HWTEST_F(ConcurrentSubscriberTest, ConcurrentPublishObserverTest, TestSize.Level0)
294 {
295 std::atomic<bool> stop = false;
296 int testTime = TEST_TIME;
297 PublishedDataSubscriberManagerTest instance;
298 std::string uri0 = "uri0";
299 std::string uri1 = "uri1";
__anon2fa986561702() 300 std::function<void()> func1 = [&instance, &uri0, &stop]() { instance.AddObservers(0, uri0, stop); };
__anon2fa986561802() 301 std::function<void()> func2 = [&instance, &uri0, &stop]() { instance.DelObservers(0, uri0, stop); };
__anon2fa986561902() 302 std::function<void()> func3 = [&instance, &uri1, &stop]() { instance.AddObservers(1, uri1, stop); };
__anon2fa986561a02() 303 std::function<void()> func4 = [&instance, &uri1, &stop]() { instance.DelObservers(1, uri1, stop); };
304 std::thread t1(func1);
305 std::thread t2(func2);
306 std::thread t3(func3);
307 std::thread t4(func4);
308 while (testTime > 0) {
309 sleep(1);
310 testTime--;
311 }
312 stop = true;
313 t1.join();
314 t2.join();
315 t3.join();
316 t4.join();
317 }
318
319 template <typename T>
320 class ConditionLock {
321 public:
ConditionLock()322 explicit ConditionLock() {}
~ConditionLock()323 ~ConditionLock() {}
324 public:
Notify(const T & data)325 void Notify(const T &data)
326 {
327 std::lock_guard<std::mutex> lock(mutex_);
328 data_ = data;
329 isSet_ = true;
330 cv_.notify_one();
331 }
332
Wait()333 T Wait()
334 {
335 std::unique_lock<std::mutex> lock(mutex_);
336 cv_.wait_for(lock, std::chrono::seconds(INTERVAL), [this]() { return isSet_; });
337 T data = data_;
338 cv_.notify_one();
339 return data;
340 }
341
Clear()342 void Clear()
343 {
344 std::lock_guard<std::mutex> lock(mutex_);
345 isSet_ = false;
346 cv_.notify_one();
347 }
348
349 private:
350 bool isSet_ = false;
351 T data_;
352 std::mutex mutex_;
353 std::condition_variable cv_;
354 static constexpr int64_t INTERVAL = 2;
355 };
356
357 class DataShareObserverTest : public DataShare::DataShareObserver {
358 public:
DataShareObserverTest(std::string uri)359 explicit DataShareObserverTest(std::string uri)
360 {
361 uri_ = uri;
362 }
~DataShareObserverTest()363 ~DataShareObserverTest() {}
364
OnChange(const ChangeInfo & changeInfo)365 void OnChange(const ChangeInfo &changeInfo) override
366 {
367 changeInfo_ = changeInfo;
368 data.Notify(changeInfo);
369 }
370
Clear()371 void Clear()
372 {
373 changeInfo_.changeType_ = INVAILD;
374 changeInfo_.uris_.clear();
375 changeInfo_.data_ = nullptr;
376 changeInfo_.size_ = 0;
377 changeInfo_.valueBuckets_ = {};
378 data.Clear();
379 }
380
381 ChangeInfo changeInfo_;
382 ConditionLock<ChangeInfo> data;
383 std::string uri_;
384 };
385
386 class ConcurrentRegisterObserverExtProvider {
387 public:
RegisterObserverExtProvider(std::shared_ptr<DataShare::DataShareHelper> helper,const Uri & uri,std::shared_ptr<DataShareObserver> dataObserver,std::atomic<bool> & stop)388 void RegisterObserverExtProvider(std::shared_ptr<DataShare::DataShareHelper> helper, const Uri &uri,
389 std::shared_ptr<DataShareObserver> dataObserver, std::atomic<bool> &stop)
390 {
391 while (!stop.load()) {
392 LOG_INFO("RegisterObserverExtProvider start, uri: %{public}s",
393 DataShareStringUtils::Anonymous(uri.ToString()).c_str());
394
395 helper->RegisterObserverExtProvider(uri, dataObserver, true);
396
397 LOG_INFO("RegisterObserverExtProvider end, uri: %{public}s",
398 DataShareStringUtils::Anonymous(uri.ToString()).c_str());
399 }
400 }
401
UnregisterObserverExtProvider(std::shared_ptr<DataShare::DataShareHelper> helper,const Uri & uri,std::shared_ptr<DataShareObserver> dataObserver,std::atomic<bool> & stop)402 void UnregisterObserverExtProvider(std::shared_ptr<DataShare::DataShareHelper> helper, const Uri &uri,
403 std::shared_ptr<DataShareObserver> dataObserver, std::atomic<bool> &stop)
404 {
405 while (!stop.load()) {
406 LOG_INFO("UnregisterObserverExtProvider start, uri: %{public}s",
407 DataShareStringUtils::Anonymous(uri.ToString()).c_str());
408
409 helper->UnregisterObserverExtProvider(uri, dataObserver);
410
411 LOG_INFO("UnregisterObserverExtProvider end, uri: %{public}s",
412 DataShareStringUtils::Anonymous(uri.ToString()).c_str());
413 }
414 }
415 };
416
417 std::string DATA_SHARE_URI = "datashare:///com.acts.datasharetest";
418 std::string DATA_SHARE_URI1 = "datashare:///com.acts.datasharetest1";
419 std::string DATA_SHARE_URI2 = "datashare:///com.acts.datasharetest2";
420 constexpr int STORAGE_MANAGER_MANAGER_ID = 5003;
421
CreateDataShareHelper(int32_t systemAbilityId)422 std::shared_ptr<DataShare::DataShareHelper> CreateDataShareHelper(int32_t systemAbilityId)
423 {
424 LOG_INFO("CreateDataShareHelper start");
425 auto saManager = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
426 if (saManager == nullptr) {
427 LOG_ERROR("GetSystemAbilityManager get samgr failed.");
428 return nullptr;
429 }
430 auto remoteObj = saManager->GetSystemAbility(systemAbilityId);
431 if (remoteObj == nullptr) {
432 LOG_ERROR("GetSystemAbility service failed.");
433 return nullptr;
434 }
435 return DataShare::DataShareHelper::Creator(remoteObj, DATA_SHARE_URI);
436 }
437
438 /**
439 * @tc.name: ConcurrentRegisterObserverExtProviderTest
440 * @tc.desc: Verify concurrent RegisterObserverExtProvider and UnregisterObserverExtProvider operations
441 * @tc.type: concurrent
442 * @tc.require: None
443 * @tc.precon:
444 1. DataShare service is properly initialized
445 2. STORAGE_MANAGER_MANAGER_ID system ability is available
446 * @tc.step:
447 1. Create a DataShareHelper instance using STORAGE_MANAGER_MANAGER_ID
448 2. Define two URIs for testing (DATA_SHARE_URI1 and DATA_SHARE_URI2)
449 3. Create two DataShareObserverTest instances for the URIs
450 4. Create four threads to concurrently perform:
451 - Register observer for URI1
452 - Unregister observer for URI1
453 - Register observer for URI2
454 - Unregister observer for URI2
455 5. Run the concurrent operations for a specified test duration
456 6. Stop all threads and wait for their completion
457 * @tc.expect:
458 1. DataShareHelper is created successfully (not nullptr)
459 2. All concurrent registration and unregistration operations complete without crashes
460 3. No deadlocks occur during concurrent observer management
461 4. Observer registration state remains consistent during concurrent access
462 */
463 HWTEST_F(ConcurrentSubscriberTest, ConcurrentRegisterObserverExtProviderTest, TestSize.Level0)
464 {
465 LOG_INFO("ConcurrentRegisterObserverExtProviderTest::Start");
466 std::atomic<bool> stop = false;
467 int testTime = TEST_TIME;
468 ConcurrentRegisterObserverExtProvider instance;
469 Uri uri1(DATA_SHARE_URI1);
470 Uri uri2(DATA_SHARE_URI2);
471 std::shared_ptr<DataShareObserver> dataObserver1 = std::make_shared<DataShareObserverTest>(DATA_SHARE_URI1);
472 std::shared_ptr<DataShareObserver> dataObserver2 = std::make_shared<DataShareObserverTest>(DATA_SHARE_URI2);
473 std::shared_ptr<DataShare::DataShareHelper> helper = CreateDataShareHelper(STORAGE_MANAGER_MANAGER_ID);
474 ASSERT_NE(helper, nullptr);
475
__anon2fa986561c02() 476 std::function<void()> func1 = [&instance, &helper, &uri1, &dataObserver1, &stop]() {
477 instance.RegisterObserverExtProvider(helper, uri1, dataObserver1, stop);
478 };
__anon2fa986561d02() 479 std::function<void()> func2 = [&instance, &helper, &uri1, &dataObserver1, &stop]() {
480 instance.UnregisterObserverExtProvider(helper, uri1, dataObserver1, stop);
481 };
__anon2fa986561e02() 482 std::function<void()> func3 = [&instance, &helper, &uri2, &dataObserver2, &stop]() {
483 instance.RegisterObserverExtProvider(helper, uri2, dataObserver2, stop);
484 };
__anon2fa986561f02() 485 std::function<void()> func4 = [&instance, &helper, &uri2, &dataObserver2, &stop]() {
486 instance.UnregisterObserverExtProvider(helper, uri2, dataObserver2, stop);
487 };
488 std::thread t1(func1);
489 std::thread t2(func2);
490 std::thread t3(func3);
491 std::thread t4(func4);
492 sleep(testTime);
493 stop = true;
494 t1.join();
495 t2.join();
496 t3.join();
497 t4.join();
498 LOG_INFO("ConcurrentRegisterObserverExtProviderTest::end");
499 }
500 } // namespace DataShare
501 } // namespace OHOS