1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "distributeddb_communicator_common.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "endian_convert.h"
24 #include "log_print.h"
25 #include "runtime_config.h"
26 #include "thread_pool_test_stub.h"
27 #include "virtual_communicator_aggregator.h"
28
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 using namespace DistributedDBUnitTest;
33
34 namespace {
35 EnvHandle g_envDeviceA;
36 EnvHandle g_envDeviceB;
37 EnvHandle g_envDeviceC;
38
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)39 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
40 {
41 if (isConnect) {
42 onlines.onlineDevices.insert(target);
43 onlines.latestOnlineDevice = target;
44 onlines.latestOfflineDevice.clear();
45 } else {
46 onlines.onlineDevices.erase(target);
47 onlines.latestOnlineDevice.clear();
48 onlines.latestOfflineDevice = target;
49 }
50 }
51
52 class DistributedDBCommunicatorTest : public testing::Test {
53 public:
54 static void SetUpTestCase(void);
55 static void TearDownTestCase(void);
56 void SetUp();
57 void TearDown();
58 };
59
SetUpTestCase(void)60 void DistributedDBCommunicatorTest::SetUpTestCase(void)
61 {
62 /**
63 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
64 */
65 LOGI("[UT][Test][SetUpTestCase] Enter.");
66 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
67 ASSERT_EQ(errCode, true);
68 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
69 ASSERT_EQ(errCode, true);
70 DoRegTransformFunction();
71 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
72 }
73
TearDownTestCase(void)74 void DistributedDBCommunicatorTest::TearDownTestCase(void)
75 {
76 /**
77 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
78 */
79 LOGI("[UT][Test][TearDownTestCase] Enter.");
80 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
81 TearDownEnv(g_envDeviceA);
82 TearDownEnv(g_envDeviceB);
83 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
84 }
85
SetUp()86 void DistributedDBCommunicatorTest::SetUp()
87 {
88 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
89 }
90
TearDown()91 void DistributedDBCommunicatorTest::TearDown()
92 {
93 /**
94 * @tc.teardown: Wait 100 ms to make sure all thread quiet
95 */
96 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
97 }
98
99 /**
100 * @tc.name: Communicator Management 001
101 * @tc.desc: Test alloc and release communicator
102 * @tc.type: FUNC
103 * @tc.require: AR000BVDGG AR000CQE0L
104 * @tc.author: xiaozhenjian
105 */
106 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
107 {
108 /**
109 * @tc.steps: step1. alloc communicator A using label A
110 * @tc.expected: step1. alloc return OK.
111 */
112 int errorNo = E_OK;
113 ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
114 EXPECT_EQ(errorNo, E_OK);
115 EXPECT_NE(commA, nullptr);
116
117 /**
118 * @tc.steps: step2. alloc communicator B using label B
119 * @tc.expected: step2. alloc return OK.
120 */
121 errorNo = E_OK;
122 ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
123 EXPECT_EQ(errorNo, E_OK);
124 EXPECT_NE(commA, nullptr);
125
126 /**
127 * @tc.steps: step3. alloc communicator C using label A
128 * @tc.expected: step3. alloc return not OK.
129 */
130 errorNo = E_OK;
131 ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
132 EXPECT_NE(errorNo, E_OK);
133 EXPECT_EQ(commC, nullptr);
134
135 /**
136 * @tc.steps: step4. release communicator A and communicator B
137 */
138 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
139 commA = nullptr;
140 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
141 commB = nullptr;
142
143 /**
144 * @tc.steps: step5. alloc communicator D using label A
145 * @tc.expected: step5. alloc return OK.
146 */
147 errorNo = E_OK;
148 ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
149 EXPECT_EQ(errorNo, E_OK);
150 EXPECT_NE(commD, nullptr);
151
152 /**
153 * @tc.steps: step6. release communicator D
154 */
155 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
156 commD = nullptr;
157 }
158
ConnectWaitDisconnect()159 static void ConnectWaitDisconnect()
160 {
161 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
162 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
163 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
164 }
165
166 /**
167 * @tc.name: Online And Offline 001
168 * @tc.desc: Test functionality triggered by physical devices online and offline
169 * @tc.type: FUNC
170 * @tc.require: AR000BVRNS AR000CQE0H
171 * @tc.author: wudongxing
172 */
173 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
174 {
175 /**
176 * @tc.steps: step1. device A alloc communicator AA using label A and register callback
177 * @tc.expected: step1. no callback.
178 */
179 int errorNo = E_OK;
180 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
181 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
182 OnOfflineDevice onlineForAA;
__anon8a05f61d0202(const std::string &target, bool isConnect) 183 commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
184 HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
185 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
186
187 /**
188 * @tc.steps: step2. connect device A with device B and then disconnect
189 * @tc.expected: step2. no callback.
190 */
191 ConnectWaitDisconnect();
192 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
193
194 /**
195 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
196 * @tc.expected: step3. no callback.
197 */
198 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
199 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
200 OnOfflineDevice onlineForBB;
__anon8a05f61d0302(const std::string &target, bool isConnect) 201 commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
202 HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
203 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
204 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
205
206 /**
207 * @tc.steps: step4. connect device A with device B and then disconnect
208 * @tc.expected: step4. no callback.
209 */
210 ConnectWaitDisconnect();
211 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
212 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
213
214 /**
215 * @tc.steps: step5. device B alloc communicator BA using label A and register callback
216 * @tc.expected: step5. no callback.
217 */
218 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
219 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
220 OnOfflineDevice onlineForBA;
__anon8a05f61d0402(const std::string &target, bool isConnect) 221 commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
222 HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
223 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
224 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
225 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
226
227 /**
228 * @tc.steps: step6. connect device A with device B
229 * @tc.expected: step6. communicator AA has callback of device B online;
230 * communicator BA has callback of device A online;
231 * communicator BB no callback
232 */
233 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
234 std::this_thread::sleep_for(std::chrono::milliseconds(100));
235 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
236 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
237 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
238 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
239 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
240
241 /**
242 * @tc.steps: step7. disconnect device A with device B
243 * @tc.expected: step7. communicator AA has callback of device B offline;
244 * communicator BA has callback of device A offline;
245 * communicator BB no callback
246 */
247 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
248 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
249 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
250 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
251 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
252 EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
253 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
254
255 // Clean up
256 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
257 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
258 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
259 }
260
261 #define REG_CONNECT_CALLBACK(communicator, online) \
262 { \
263 communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
264 HandleConnectChange(online, target, isConnect); \
265 }, nullptr); \
266 }
267
268 #define CONNECT_AND_WAIT(waitTime) \
269 { \
270 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
271 std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
272 }
273
274 /**
275 * @tc.name: Online And Offline 002
276 * @tc.desc: Test functionality triggered by alloc and release communicator
277 * @tc.type: FUNC
278 * @tc.require: AR000BVRNT AR000CQE0I
279 * @tc.author: wudongxing
280 */
281 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
282 {
283 /**
284 * @tc.steps: step1. connect device A with device B
285 */
286 CONNECT_AND_WAIT(200); // Sleep 200 ms
287
288 /**
289 * @tc.steps: step2. device A alloc communicator AA using label A and register callback
290 * @tc.expected: step2. no callback.
291 */
292 int errorNo = E_OK;
293 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
294 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
295 OnOfflineDevice onlineForAA;
296 REG_CONNECT_CALLBACK(commAA, onlineForAA);
297 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
298
299 /**
300 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
301 * @tc.expected: step3. no callback.
302 */
303 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
304 ASSERT_NOT_NULL_AND_ACTIVATE(commBB);
305 OnOfflineDevice onlineForBB;
306 REG_CONNECT_CALLBACK(commBB, onlineForBB);
307 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
308 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
309
310 /**
311 * @tc.steps: step4. device B alloc communicator BA using label A and register callback
312 * @tc.expected: step4. communicator AA has callback of device B online;
313 * communicator BA has callback of device A online;
314 * communicator BB no callback.
315 */
316 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
317 ASSERT_NOT_NULL_AND_ACTIVATE(commBA);
318 OnOfflineDevice onlineForBA;
319 REG_CONNECT_CALLBACK(commBA, onlineForBA);
320 std::this_thread::sleep_for(std::chrono::milliseconds(100));
321 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
322 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
323 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
324 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
325 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
326
327 /**
328 * @tc.steps: step5. device A alloc communicator AB using label B and register callback
329 * @tc.expected: step5. communicator AB has callback of device B online;
330 * communicator BB has callback of device A online;
331 */
332 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
333 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
334 OnOfflineDevice onlineForAB;
335 REG_CONNECT_CALLBACK(commAB, onlineForAB);
336 std::this_thread::sleep_for(std::chrono::milliseconds(100));
337 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
338 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
339 EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
340 EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
341
342 /**
343 * @tc.steps: step6. device A release communicator AA
344 * @tc.expected: step6. communicator BA has callback of device A offline;
345 * communicator AB and BB no callback;
346 */
347 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
348 std::this_thread::sleep_for(std::chrono::milliseconds(100));
349 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
350 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
351 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
352 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
353
354 /**
355 * @tc.steps: step7. device B release communicator BA
356 * @tc.expected: step7. communicator AB and BB no callback;
357 */
358 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
359 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
360 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
361
362 /**
363 * @tc.steps: step8. device B release communicator BB
364 * @tc.expected: step8. communicator AB has callback of device B offline;
365 */
366 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
367 std::this_thread::sleep_for(std::chrono::milliseconds(100));
368 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
369 EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
370
371 // Clean up
372 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
373 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
374 }
375
TestRemoteRestart()376 void TestRemoteRestart()
377 {
378 /**
379 * @tc.steps: step1. connect device D with device E
380 */
381 EnvHandle envDeviceD;
382 EnvHandle envDeviceE;
383 SetUpEnv(envDeviceD, "DEVICE_D");
384 SetUpEnv(envDeviceE, "DEVICE_E");
385
386 /**
387 * @tc.steps: step2. device D alloc communicator DD using label A and register callback
388 */
389 int errorNo = E_OK;
390 ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
391 ASSERT_NOT_NULL_AND_ACTIVATE(commDD);
392 OnOfflineDevice onlineForDD;
393 REG_CONNECT_CALLBACK(commDD, onlineForDD);
394
395 /**
396 * @tc.steps: step3. device E alloc communicator EE using label A and register callback
397 */
398 ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
399 ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
400 OnOfflineDevice onlineForEE;
401 REG_CONNECT_CALLBACK(commEE, onlineForEE);
402 /**
403 * @tc.steps: step4. deviceD connect to deviceE
404 * @tc.expected: step4. both communicator has callback;
405 */
406 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
407 std::this_thread::sleep_for(std::chrono::seconds(1));
408 EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
409 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
410
411 /**
412 * @tc.steps: step5. device E restart
413 */
414 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
415 std::this_thread::sleep_for(std::chrono::seconds(1));
416 TearDownEnv(envDeviceE);
417 SetUpEnv(envDeviceE, "DEVICE_E");
418
419 commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
420 ASSERT_NOT_NULL_AND_ACTIVATE(commEE);
421 REG_CONNECT_CALLBACK(commEE, onlineForEE);
422 onlineForEE.onlineDevices.clear();
423 /**
424 * @tc.steps: step6. deviceD connect to deviceE again
425 * @tc.expected: step6. communicatorE has callback;
426 */
427 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
428 int reTryTimes = 5;
429 while (onlineForEE.onlineDevices.size() != 1 && reTryTimes > 0) {
430 std::this_thread::sleep_for(std::chrono::seconds(1));
431 reTryTimes--;
432 }
433 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
434 // Clean up and disconnect
435 envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
436 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
437 std::this_thread::sleep_for(std::chrono::seconds(1));
438
439 AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
440 TearDownEnv(envDeviceD);
441 TearDownEnv(envDeviceE);
442 }
443 /**
444 * @tc.name: Online And Offline 003
445 * @tc.desc: Test functionality triggered by remote restart
446 * @tc.type: FUNC
447 * @tc.require: AR000CQE0I
448 * @tc.author: zhangqiquan
449 */
450 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
451 {
452 TestRemoteRestart();
453 }
454
455 /**
456 * @tc.name: Online And Offline 004
457 * @tc.desc: Test functionality triggered by remote restart with thread pool
458 * @tc.type: FUNC
459 * @tc.require: AR000CQE0I
460 * @tc.author: zhangqiquan
461 */
462 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
463 {
464 auto threadPool = std::make_shared<ThreadPoolTestStub>();
465 RuntimeContext::GetInstance()->SetThreadPool(threadPool);
466 TestRemoteRestart();
467 RuntimeContext::GetInstance()->SetThreadPool(nullptr);
468 }
469
470 /**
471 * @tc.name: Report Device Connect Change 001
472 * @tc.desc: Test CommunicatorAggregator support report device connect change event
473 * @tc.type: FUNC
474 * @tc.require: AR000DR9KV
475 * @tc.author: xiaozhenjian
476 */
477 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
478 {
479 /**
480 * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
481 */
482 OnOfflineDevice onlineForA;
483 int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anon8a05f61d0502(const std::string &target, bool isConnect) 484 [&onlineForA](const std::string &target, bool isConnect) {
485 HandleConnectChange(onlineForA, target, isConnect);
486 }, nullptr);
487 EXPECT_EQ(errCode, E_OK);
488 OnOfflineDevice onlineForB;
489 errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anon8a05f61d0602(const std::string &target, bool isConnect) 490 [&onlineForB](const std::string &target, bool isConnect) {
491 HandleConnectChange(onlineForB, target, isConnect);
492 }, nullptr);
493 EXPECT_EQ(errCode, E_OK);
494
495 /**
496 * @tc.steps: step2. connect device A with device B
497 * @tc.expected: step2. device A callback B online; device B callback A online;
498 */
499 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
500 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
501 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
502 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
503 EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
504 EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
505
506 /**
507 * @tc.steps: step3. connect device A with device B
508 * @tc.expected: step3. device A callback B offline; device B callback A offline;
509 */
510 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
511 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
512 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
513 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
514 EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
515 EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
516
517 // Clean up
518 g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
519 g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
520 }
521
522 namespace {
ToLabelType(uint64_t commLabel)523 LabelType ToLabelType(uint64_t commLabel)
524 {
525 uint64_t netOrderLabel = HostToNet(commLabel);
526 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
527 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
528 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
529 realLabel[i] = eachByte[i];
530 }
531 return realLabel;
532 }
533 }
534
535 /**
536 * @tc.name: Report Communicator Not Found 001
537 * @tc.desc: Test CommunicatorAggregator support report communicator not found event
538 * @tc.type: FUNC
539 * @tc.require: AR000DR9KV
540 * @tc.author: xiaozhenjian
541 */
542 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
543 {
544 /**
545 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
546 */
547 std::vector<LabelType> lackLabels;
548 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon8a05f61d0802(const LabelType &commLabel, const std::string &userId)549 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
550 lackLabels.push_back(commLabel);
551 return -E_NOT_FOUND;
552 }, nullptr);
553 EXPECT_EQ(errCode, E_OK);
554
555 /**
556 * @tc.steps: step2. connect device A with device B
557 */
558 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
559 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
560
561 /**
562 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
563 * @tc.expected: step3. device B callback that label A not found.
564 */
565 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
566 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
567 Message *msgForAA = BuildRegedTinyMessage();
568 ASSERT_NE(msgForAA, nullptr);
569 SendConfig conf = {true, false, 0};
570 errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
571 EXPECT_EQ(errCode, E_OK);
572 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
573 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
574 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
575
576 /**
577 * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
578 * @tc.expected: step4. communicator BA will not receive message.
579 */
580 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
581 ASSERT_NE(commBA, nullptr);
582 Message *recvMsgForBA = nullptr;
__anon8a05f61d0902(const std::string &srcTarget, Message *inMsg) 583 commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
584 recvMsgForBA = inMsg;
585 }, nullptr);
586 commBA->Activate();
587 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
588 EXPECT_EQ(recvMsgForBA, nullptr);
589
590 // Clean up
591 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
592 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
593 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
594 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
595 }
596
597 #define DO_SEND_MESSAGE(src, dst, label, session) \
598 { \
599 Message *msgFor##src##label = BuildRegedTinyMessage(); \
600 ASSERT_NE(msgFor##src##label, nullptr); \
601 msgFor##src##label->SetSessionId(session); \
602 SendConfig conf = {true, false, 0}; \
603 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
604 EXPECT_EQ(errCode, E_OK); \
605 }
606
607 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
608 { \
609 Message *msgFor##src##label = BuildRegedGiantMessage(size); \
610 ASSERT_NE(msgFor##src##label, nullptr); \
611 SendConfig conf = {false, false, 0}; \
612 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
613 EXPECT_EQ(errCode, E_OK); \
614 }
615
616 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
617 ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
618 ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label); \
619 DO_SEND_MESSAGE(src, dst, label, session)
620
621 #define REG_MESSAGE_CALLBACK(src, label) \
622 string srcTargetFor##src##label; \
623 Message *recvMsgFor##src##label = nullptr; \
624 comm##src##label->RegOnMessageCallback( \
625 [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
626 srcTargetFor##src##label = srcTarget; \
627 recvMsgFor##src##label = inMsg; \
628 }, nullptr);
629
630 /**
631 * @tc.name: ReDeliver Message 001
632 * @tc.desc: Test CommunicatorAggregator support redeliver message
633 * @tc.type: FUNC
634 * @tc.require: AR000DR9KV
635 * @tc.author: xiaozhenjian
636 */
637 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
638 {
639 /**
640 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
641 */
642 std::vector<LabelType> lackLabels;
643 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon8a05f61d0a02(const LabelType &commLabel, const std::string &userId)644 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
645 lackLabels.push_back(commLabel);
646 return E_OK;
647 }, nullptr);
648 EXPECT_EQ(errCode, E_OK);
649
650 /**
651 * @tc.steps: step2. connect device A with device B
652 */
653 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
654 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
655
656 /**
657 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
658 * @tc.expected: step3. device B callback that label A not found.
659 */
660 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
661 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
662 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
663 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
664
665 /**
666 * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
667 * @tc.expected: step4. device B callback that label B not found.
668 */
669 ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
670 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
671 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
672 EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
673
674 /**
675 * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
676 * @tc.expected: step5. communicator BA will receive message.
677 */
678 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
679 ASSERT_NE(commBA, nullptr);
680 REG_MESSAGE_CALLBACK(B, A);
681 commBA->Activate();
682 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
683 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
684 ASSERT_NE(recvMsgForBA, nullptr);
685 EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
686 delete recvMsgForBA;
687 recvMsgForBA = nullptr;
688
689 /**
690 * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
691 * @tc.expected: step6. communicator BB will receive message.
692 */
693 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
694 ASSERT_NE(commBB, nullptr);
695 REG_MESSAGE_CALLBACK(B, B);
696 commBB->Activate();
697 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
698 EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
699 ASSERT_NE(recvMsgForBB, nullptr);
700 EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
701 delete recvMsgForBB;
702 recvMsgForBB = nullptr;
703
704 // Clean up
705 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
706 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
707 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
708 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
709 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
710 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
711 }
712
713 /**
714 * @tc.name: ReDeliver Message 002
715 * @tc.desc: Test CommunicatorAggregator support redeliver message by order
716 * @tc.type: FUNC
717 * @tc.require: AR000DR9KV
718 * @tc.author: xiaozhenjian
719 */
720 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
721 {
722 /**
723 * @tc.steps: step1. device C create CommunicatorAggregator and initialize
724 */
725 bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
726 ASSERT_EQ(step1, true);
727
728 /**
729 * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
730 */
731 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon8a05f61d0b02(const LabelType &commLabel, const std::string &userId)732 const std::string &userId)->int {
733 return E_OK;
734 }, nullptr);
735 EXPECT_EQ(errCode, E_OK);
736
737 /**
738 * @tc.steps: step3. connect device A with device B, then device B with device C
739 */
740 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
741 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
742 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
743
744 /**
745 * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
746 */
747 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
748 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
749
750 /**
751 * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
752 */
753 ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
754 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
755 DO_SEND_MESSAGE(A, B, A, 300); // session id 300
756 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
757 DO_SEND_MESSAGE(C, B, A, 400); // session id 400
758 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
759
760 /**
761 * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
762 * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
763 */
764 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
765 ASSERT_NE(commBA, nullptr);
766 std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anon8a05f61d0c02(const std::string &srcTarget, Message *inMsg) 767 commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
768 msgCallbackForBA.push_back({srcTarget, inMsg});
769 }, nullptr);
770 commBA->Activate();
771 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
772 ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
773 EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
774 EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
775 EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
776 EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
777 for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
778 EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
779 delete msgCallbackForBA[i].second;
780 msgCallbackForBA[i].second = nullptr;
781 }
782
783 // Clean up
784 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
785 g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
786 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
787 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
788 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
789 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
790 TearDownEnv(g_envDeviceC);
791 }
792
793 /**
794 * @tc.name: ReDeliver Message 003
795 * @tc.desc: For observe memory in unusual scenario
796 * @tc.type: FUNC
797 * @tc.require: AR000DR9KV
798 * @tc.author: xiaozhenjian
799 */
800 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
801 {
802 /**
803 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
804 */
805 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon8a05f61d0d02(const LabelType &commLabel, const std::string &userId)806 const std::string &userId)->int {
807 return E_OK;
808 }, nullptr);
809 EXPECT_EQ(errCode, E_OK);
810
811 /**
812 * @tc.steps: step2. connect device A with device B
813 */
814 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
815 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
816
817 /**
818 * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
819 */
820 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
821 ASSERT_NOT_NULL_AND_ACTIVATE(commAA);
822 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
823 ASSERT_NOT_NULL_AND_ACTIVATE(commAB);
824 ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
825 ASSERT_NOT_NULL_AND_ACTIVATE(commAC);
826 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
827
828 /**
829 * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
830 */
831 for (int turn = 0; turn < 11; turn++) { // Total 11 turns
832 DO_SEND_MESSAGE(A, B, A, 0);
833 DO_SEND_MESSAGE(A, B, B, 0);
834 DO_SEND_MESSAGE(A, B, C, 0);
835 }
836
837 /**
838 * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
839 */
840 for (int turn = 0; turn < 5; turn++) { // Total 5 turns
841 DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
842 DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
843 DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
844 }
845 DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
846
847 /**
848 * @tc.steps: step6. wait a long time then send last frame
849 */
850 for (int sec = 0; sec < 15; sec++) { // Total 15 s
851 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
852 LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
853 }
854 DO_SEND_MESSAGE(A, B, A, 0);
855 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
856
857 // Clean up
858 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
859 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
860 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
861 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
862 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
863 }
864
865 namespace {
SetUpEnv(const std::string & localDev,const std::string & supportDev,const std::shared_ptr<DBStatusAdapter> & adapter,bool isSupport,EnvHandle & envDevice)866 void SetUpEnv(const std::string &localDev, const std::string &supportDev,
867 const std::shared_ptr<DBStatusAdapter> &adapter, bool isSupport, EnvHandle &envDevice)
868 {
869 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
870 adapter->SetDBInfoHandle(handle);
871 adapter->SetRemoteOptimizeCommunication(supportDev, isSupport);
872 SetUpEnv(envDevice, localDev, adapter);
873 }
874
InitCommunicator(DBInfo & dbInfo,EnvHandle & envDevice,OnOfflineDevice & onlineCallback,ICommunicator * & comm)875 void InitCommunicator(DBInfo &dbInfo, EnvHandle &envDevice, OnOfflineDevice &onlineCallback, ICommunicator *&comm)
876 {
877 dbInfo.userId = USER_ID;
878 dbInfo.appId = APP_ID;
879 dbInfo.storeId = STORE_ID_1;
880 dbInfo.isNeedSync = true;
881 dbInfo.syncDualTupleMode = false;
882 std::string label = DBCommon::GenerateHashLabel(dbInfo);
883 std::vector<uint8_t> commLabel(label.begin(), label.end());
884 int errorNo = E_OK;
885 comm = envDevice.commAggrHandle->AllocCommunicator(commLabel, errorNo);
886 ASSERT_NOT_NULL_AND_ACTIVATE(comm);
887 REG_CONNECT_CALLBACK(comm, onlineCallback);
888 }
889 }
890
891 /**
892 * @tc.name: CommunicationOptimization001
893 * @tc.desc: Test notify with isSupport true.
894 * @tc.type: FUNC
895 * @tc.require: AR000HGD0B
896 * @tc.author: zhangqiquan
897 */
898 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization001, TestSize.Level3)
899 {
900 auto *pAggregator = new VirtualCommunicatorAggregator();
901 ASSERT_NE(pAggregator, nullptr);
902 const std::string deviceA = "DEVICES_A";
903 const std::string deviceB = "DEVICES_B";
904 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
905 /**
906 * @tc.steps: step1. set up env
907 */
908 EnvHandle envDeviceA;
909 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
910 SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
911
912 EnvHandle envDeviceB;
913 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
914 SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
915
916 /**
917 * @tc.steps: step2. device alloc communicator using label and register callback
918 */
919 DBInfo dbInfo;
920 ICommunicator *commAA = nullptr;
921 OnOfflineDevice onlineForAA;
922 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
923
924 ICommunicator *commBB = nullptr;
925 OnOfflineDevice onlineForBB;
926 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
927
928 /**
929 * @tc.steps: step3. connect device A with device B
930 */
931 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
932
933 /**
934 * @tc.steps: step4. wait for label exchange
935 * @tc.expected: step4. both communicator has no callback;
936 */
937 std::this_thread::sleep_for(std::chrono::seconds(1));
938 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
939 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
940
941 /**
942 * @tc.steps: step5. both notify
943 * @tc.expected: step5. both has callback;
944 */
945 RuntimeConfig::NotifyDBInfos({ deviceB }, { dbInfo });
946 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
947 adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
948 std::this_thread::sleep_for(std::chrono::seconds(1));
949
950 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
951
952 dbInfo.isNeedSync = false;
953 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
954 adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
955 std::this_thread::sleep_for(std::chrono::seconds(1));
956 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
957
958 // Clean up and disconnect
959 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
960 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
961 std::this_thread::sleep_for(std::chrono::seconds(1));
962
963 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
964
965 TearDownEnv(envDeviceA);
966 TearDownEnv(envDeviceB);
967 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
968 }
969
970 /**
971 * @tc.name: CommunicationOptimization002
972 * @tc.desc: Test notify with isSupport true and can offline by device change.
973 * @tc.type: FUNC
974 * @tc.require: AR000HGD0B
975 * @tc.author: zhangqiquan
976 */
977 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization002, TestSize.Level3)
978 {
979 auto *pAggregator = new VirtualCommunicatorAggregator();
980 ASSERT_NE(pAggregator, nullptr);
981 const std::string deviceA = "DEVICES_A";
982 const std::string deviceB = "DEVICES_B";
983 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
984 /**
985 * @tc.steps: step1. set up env
986 */
987 EnvHandle envDeviceA;
988 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
989 SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
990
991 EnvHandle envDeviceB;
992 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
993 SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
994
995 /**
996 * @tc.steps: step2. device alloc communicator using label and register callback
997 */
998 DBInfo dbInfo;
999 ICommunicator *commAA = nullptr;
1000 OnOfflineDevice onlineForAA;
1001 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1002 /**
1003 * @tc.steps: step3. connect device A with device B
1004 */
1005 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1006 std::this_thread::sleep_for(std::chrono::seconds(1));
1007
1008 /**
1009 * @tc.steps: step5. A notify remote
1010 * @tc.expected: step5. A has callback;
1011 */
1012 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1013 std::this_thread::sleep_for(std::chrono::seconds(1));
1014 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1015
1016 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1017 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1018
1019 // Clean up and disconnect
1020 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1021 std::this_thread::sleep_for(std::chrono::seconds(1));
1022
1023 TearDownEnv(envDeviceA);
1024 TearDownEnv(envDeviceB);
1025 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1026 }
1027
1028 /**
1029 * @tc.name: CommunicationOptimization003
1030 * @tc.desc: Test notify with isSupport false.
1031 * @tc.type: FUNC
1032 * @tc.require: AR000HGD0B
1033 * @tc.author: zhangqiquan
1034 */
1035 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization003, TestSize.Level3)
1036 {
1037 auto *pAggregator = new VirtualCommunicatorAggregator();
1038 ASSERT_NE(pAggregator, nullptr);
1039 const std::string deviceA = "DEVICES_A";
1040 const std::string deviceB = "DEVICES_B";
1041 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1042 /**
1043 * @tc.steps: step1. set up env
1044 */
1045 EnvHandle envDeviceA;
1046 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1047 SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1048 EnvHandle envDeviceB;
1049 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1050 SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1051 /**
1052 * @tc.steps: step2. device alloc communicator using label and register callback
1053 */
1054 DBInfo dbInfo;
1055 ICommunicator *commAA = nullptr;
1056 OnOfflineDevice onlineForAA;
1057 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1058 ICommunicator *commBB = nullptr;
1059 OnOfflineDevice onlineForBB;
1060 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1061 /**
1062 * @tc.steps: step3. connect device A with device B
1063 */
1064 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1065 /**
1066 * @tc.steps: step4. wait for label exchange
1067 * @tc.expected: step4. both communicator has no callback;
1068 */
1069 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1070 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1071 /**
1072 * @tc.steps: step5. A notify remote
1073 * @tc.expected: step5. B has no callback;
1074 */
1075 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1076 std::this_thread::sleep_for(std::chrono::seconds(1));
1077 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1078 /**
1079 * @tc.steps: step6. A notify local
1080 * @tc.expected: step6. B has no callback;
1081 */
1082 dbInfo.isNeedSync = false;
1083 onlineForAA.onlineDevices.clear();
1084 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1085 std::this_thread::sleep_for(std::chrono::seconds(1));
1086 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1087 // Clean up and disconnect
1088 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1089 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1090 std::this_thread::sleep_for(std::chrono::seconds(1));
1091 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1092 TearDownEnv(envDeviceA);
1093 TearDownEnv(envDeviceB);
1094 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1095 }
1096
1097 /**
1098 * @tc.name: CommunicationOptimization004
1099 * @tc.desc: Test notify with isSupport false and it be will changed by communication.
1100 * @tc.type: FUNC
1101 * @tc.require: AR000HGD0B
1102 * @tc.author: zhangqiquan
1103 */
1104 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization004, TestSize.Level3)
1105 {
1106 const std::string deviceA = "DEVICES_A";
1107 const std::string deviceB = "DEVICES_B";
1108 /**
1109 * @tc.steps: step1. set up env
1110 */
1111 EnvHandle envDeviceA;
1112 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1113 SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1114 RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1115 EnvHandle envDeviceB;
1116 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1117 SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1118 /**
1119 * @tc.steps: step2. device alloc communicator using label and register callback
1120 */
1121 DBInfo dbInfo;
1122 ICommunicator *commAA = nullptr;
1123 OnOfflineDevice onlineForAA;
1124 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1125 ICommunicator *commBB = nullptr;
1126 OnOfflineDevice onlineForBB;
1127 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1128 /**
1129 * @tc.steps: step3. connect device A with device B
1130 */
1131 EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1132 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1133 std::this_thread::sleep_for(std::chrono::seconds(1));
1134 EXPECT_EQ(adapterA->IsSupport(deviceB), true);
1135 // Clean up and disconnect
1136 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1137 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1138 std::this_thread::sleep_for(std::chrono::seconds(1));
1139 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1140 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1141 envDeviceA.commAggrHandle = nullptr;
1142 TearDownEnv(envDeviceA);
1143 TearDownEnv(envDeviceB);
1144 }
1145
1146 /**
1147 * @tc.name: CommunicationOptimization005
1148 * @tc.desc: Test notify with isSupport false and send label exchange.
1149 * @tc.type: FUNC
1150 * @tc.require: AR000HGD0B
1151 * @tc.author: zhangqiquan
1152 */
1153 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization005, TestSize.Level3)
1154 {
1155 const std::string deviceA = "DEVICES_A";
1156 const std::string deviceB = "DEVICES_B";
1157 /**
1158 * @tc.steps: step1. set up env
1159 */
1160 EnvHandle envDeviceA;
1161 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1162 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1163 handle->SetLocalIsSupport(false);
1164 adapterA->SetDBInfoHandle(handle);
1165 SetUpEnv(envDeviceA, deviceA, adapterA);
1166 RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1167 EnvHandle envDeviceB;
1168 SetUpEnv(envDeviceB, deviceB, nullptr);
1169 /**
1170 * @tc.steps: step2. connect device A with device B
1171 */
1172 EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1173 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1174 std::this_thread::sleep_for(std::chrono::seconds(1));
1175 /**
1176 * @tc.steps: step3. device alloc communicator using label and register callback
1177 */
1178 DBInfo dbInfo;
1179 ICommunicator *commAA = nullptr;
1180 OnOfflineDevice onlineForAA;
1181 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1182 ICommunicator *commBB = nullptr;
1183 OnOfflineDevice onlineForBB;
1184 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1185 std::this_thread::sleep_for(std::chrono::seconds(1));
1186 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
1187
1188 // Clean up and disconnect
1189 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1190 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1191 std::this_thread::sleep_for(std::chrono::seconds(1));
1192 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1193 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1194 envDeviceA.commAggrHandle = nullptr;
1195 TearDownEnv(envDeviceA);
1196 TearDownEnv(envDeviceB);
1197 }
1198
1199 /**
1200 * @tc.name: DbStatusAdapter001
1201 * @tc.desc: Test notify with isSupport false.
1202 * @tc.type: FUNC
1203 * @tc.require: AR000HGD0B
1204 * @tc.author: zhangqiquan
1205 */
1206 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter001, TestSize.Level1)
1207 {
1208 auto *pAggregator = new VirtualCommunicatorAggregator();
1209 ASSERT_NE(pAggregator, nullptr);
1210 const std::string deviceA = "DEVICES_A";
1211 const std::string deviceB = "DEVICES_B";
1212 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1213
1214 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1215 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1216 adapterA->SetDBInfoHandle(handle);
1217 adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1218 std::string actualRemoteDevInfo;
1219 size_t remoteInfoCount = 0u;
1220 size_t localCount = 0u;
1221 DBInfo dbInfo;
1222 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1223
1224 dbInfo = {
1225 USER_ID,
1226 APP_ID,
1227 STORE_ID_1,
1228 true,
1229 false
1230 };
1231 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1232 dbInfo.isNeedSync = false;
1233 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1234 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1235
1236 size_t remoteChangeCount = 0u;
1237 adapterA->SetDBStatusChangeCallback(
__anon8a05f61d0f02(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1238 [&actualRemoteDevInfo, &remoteInfoCount](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
1239 actualRemoteDevInfo = devInfo;
1240 remoteInfoCount = dbInfos.size();
1241 },
__anon8a05f61d1002() 1242 [&localCount]() {
1243 localCount++;
1244 },
__anon8a05f61d1102(const std::string &dev) 1245 [&remoteChangeCount, deviceB](const std::string &dev) {
1246 remoteChangeCount++;
1247 EXPECT_EQ(dev, deviceB);
1248 });
1249 std::this_thread::sleep_for(std::chrono::seconds(1));
1250 EXPECT_EQ(actualRemoteDevInfo, deviceB);
1251 EXPECT_EQ(remoteInfoCount, 1u);
1252 adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1253 std::this_thread::sleep_for(std::chrono::seconds(1));
1254 EXPECT_EQ(remoteChangeCount, 1u);
1255 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1256 }
1257
1258 /**
1259 * @tc.name: DbStatusAdapter002
1260 * @tc.desc: Test adapter clear cache.
1261 * @tc.type: FUNC
1262 * @tc.require: AR000HGD0B
1263 * @tc.author: zhangqiquan
1264 */
1265 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter002, TestSize.Level1) {
1266 const std::string deviceB = "DEVICES_B";
1267 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1268 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1269 adapterA->SetDBInfoHandle(handle);
1270 adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1271 EXPECT_TRUE(adapterA->IsSupport(deviceB));
1272 adapterA->SetDBInfoHandle(handle);
1273 adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1274 EXPECT_FALSE(adapterA->IsSupport(deviceB));
1275 }
1276
1277 /**
1278 * @tc.name: DbStatusAdapter003
1279 * @tc.desc: Test adapter get local dbInfo.
1280 * @tc.type: FUNC
1281 * @tc.require: AR000HGD0B
1282 * @tc.author: zhangqiquan
1283 */
1284 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter003, TestSize.Level1) {
1285 const std::string deviceB = "DEVICES_B";
1286 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1287 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1288 adapterA->SetDBInfoHandle(handle);
1289 handle->SetLocalIsSupport(true);
1290 std::vector<DBInfo> dbInfos;
1291 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1292 handle->SetLocalIsSupport(false);
1293 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1294 adapterA->SetDBInfoHandle(handle);
1295 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), -E_NOT_SUPPORT);
1296 }
1297
1298 /**
1299 * @tc.name: DbStatusAdapter004
1300 * @tc.desc: Test adapter clear cache will get callback.
1301 * @tc.type: FUNC
1302 * @tc.require: AR000HGD0B
1303 * @tc.author: zhangqiquan
1304 */
1305 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter004, TestSize.Level1)
1306 {
1307 const std::string deviceB = "DEVICES_B";
1308 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1309 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1310 adapterA->SetDBInfoHandle(handle);
1311 handle->SetLocalIsSupport(true);
1312 std::vector<DBInfo> dbInfos;
1313 DBInfo dbInfo = {
1314 USER_ID,
1315 APP_ID,
1316 STORE_ID_1,
1317 true,
1318 true
1319 };
1320 dbInfos.push_back(dbInfo);
1321 dbInfo.storeId = STORE_ID_2;
1322 dbInfos.push_back(dbInfo);
1323 dbInfo.storeId = STORE_ID_3;
1324 dbInfo.isNeedSync = false;
1325 dbInfos.push_back(dbInfo);
1326 adapterA->NotifyDBInfos({"dev"}, dbInfos);
1327 std::this_thread::sleep_for(std::chrono::seconds(1));
1328 size_t notifyCount = 0u;
1329 adapterA->SetDBStatusChangeCallback([¬ifyCount](const std::string &devInfo,
__anon8a05f61d1202(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1330 const std::vector<DBInfo> &dbInfos) {
1331 LOGD("on callback");
1332 for (const auto &dbInfo: dbInfos) {
1333 EXPECT_FALSE(dbInfo.isNeedSync);
1334 }
1335 notifyCount = dbInfos.size();
1336 }, nullptr, nullptr);
1337 adapterA->SetDBInfoHandle(nullptr);
1338 EXPECT_EQ(notifyCount, 2u); // 2 dbInfo is need sync now it should be offline
1339 }
1340
1341 /**
1342 * @tc.name: DbStatusAdapter005
1343 * @tc.desc: Test adapter is need auto sync.
1344 * @tc.type: FUNC
1345 * @tc.require: AR000HGD0B
1346 * @tc.author: zhangqiquan
1347 */
1348 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter005, TestSize.Level1)
1349 {
1350 const std::string deviceB = "DEVICES_B";
1351 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1352 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1353 adapterA->SetDBInfoHandle(handle);
1354 handle->SetLocalIsSupport(true);
1355 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1356 handle->SetNeedAutoSync(false);
1357 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1358 handle->SetLocalIsSupport(false);
1359 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1360 adapterA->SetDBInfoHandle(handle);
1361 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1362 adapterA->SetDBInfoHandle(nullptr);
1363 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1364 }
1365 }