1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "combine_status.h"
20 #include "communicator.h"
21 #include "communicator_aggregator.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "distributeddb_communicator_common.h"
25 #include "distributeddb_data_generate_unit_test.h"
26 #include "distributeddb_tools_unit_test.h"
27 #include "endian_convert.h"
28 #include "log_print.h"
29 #include "runtime_config.h"
30 #include "runtime_context_impl.h"
31 #include "thread_pool_test_stub.h"
32 #include "virtual_communicator_aggregator.h"
33
34 using namespace std;
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38
39 namespace {
40 EnvHandle g_envDeviceA;
41 EnvHandle g_envDeviceB;
42 EnvHandle g_envDeviceC;
43
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)44 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
45 {
46 if (isConnect) {
47 onlines.onlineDevices.insert(target);
48 onlines.latestOnlineDevice = target;
49 onlines.latestOfflineDevice.clear();
50 } else {
51 onlines.onlineDevices.erase(target);
52 onlines.latestOnlineDevice.clear();
53 onlines.latestOfflineDevice = target;
54 }
55 }
56
57 class DistributedDBCommunicatorTest : public testing::Test {
58 public:
59 static void SetUpTestCase(void);
60 static void TearDownTestCase(void);
61 void SetUp();
62 void TearDown();
63 };
64
SetUpTestCase(void)65 void DistributedDBCommunicatorTest::SetUpTestCase(void)
66 {
67 /**
68 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
69 */
70 LOGI("[UT][Test][SetUpTestCase] Enter.");
71 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
72 ASSERT_EQ(errCode, true);
73 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
74 ASSERT_EQ(errCode, true);
75 DoRegTransformFunction();
76 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
77 }
78
TearDownTestCase(void)79 void DistributedDBCommunicatorTest::TearDownTestCase(void)
80 {
81 /**
82 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
83 */
84 LOGI("[UT][Test][TearDownTestCase] Enter.");
85 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
86 TearDownEnv(g_envDeviceA);
87 TearDownEnv(g_envDeviceB);
88 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
89 }
90
SetUp()91 void DistributedDBCommunicatorTest::SetUp()
92 {
93 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
94 }
95
TearDown()96 void DistributedDBCommunicatorTest::TearDown()
97 {
98 /**
99 * @tc.teardown: Wait 100 ms to make sure all thread quiet
100 */
101 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
102 }
103
104 /**
105 * @tc.name: Communicator Management 001
106 * @tc.desc: Test alloc and release communicator
107 * @tc.type: FUNC
108 * @tc.require:
109 * @tc.author: xiaozhenjian
110 */
111 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
112 {
113 /**
114 * @tc.steps: step1. alloc communicator A using label A
115 * @tc.expected: step1. alloc return OK.
116 */
117 int errorNo = E_OK;
118 ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
119 EXPECT_EQ(errorNo, E_OK);
120 EXPECT_NE(commA, nullptr);
121
122 /**
123 * @tc.steps: step2. alloc communicator B using label B
124 * @tc.expected: step2. alloc return OK.
125 */
126 errorNo = E_OK;
127 ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
128 EXPECT_EQ(errorNo, E_OK);
129 EXPECT_NE(commA, nullptr);
130
131 /**
132 * @tc.steps: step3. alloc communicator C using label A
133 * @tc.expected: step3. alloc return not OK.
134 */
135 errorNo = E_OK;
136 ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
137 EXPECT_NE(errorNo, E_OK);
138 EXPECT_EQ(commC, nullptr);
139
140 /**
141 * @tc.steps: step4. release communicator A and communicator B
142 */
143 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
144 commA = nullptr;
145 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
146 commB = nullptr;
147
148 /**
149 * @tc.steps: step5. alloc communicator D using label A
150 * @tc.expected: step5. alloc return OK.
151 */
152 errorNo = E_OK;
153 ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
154 EXPECT_EQ(errorNo, E_OK);
155 EXPECT_NE(commD, nullptr);
156
157 /**
158 * @tc.steps: step6. release communicator D
159 */
160 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
161 commD = nullptr;
162 }
163
164 /**
165 * @tc.name: Communicator Management 002
166 * @tc.desc: Test alloc and release communicator with different userId
167 * @tc.type: FUNC
168 * @tc.require:
169 * @tc.author: liaoyonghuang
170 */
171 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement002, TestSize.Level1)
172 {
173 /**
174 * @tc.steps: step1. alloc communicator A using label A with USER_ID_1
175 * @tc.expected: step1. alloc return OK.
176 */
177 int errorNo = E_OK;
178 ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_1);
179 commA->Activate(USER_ID_1);
180 EXPECT_EQ(errorNo, E_OK);
181 EXPECT_NE(commA, nullptr);
182
183 /**
184 * @tc.steps: step2. alloc communicator B using label A with USER_ID_2
185 * @tc.expected: step2. alloc return OK, and commA == commB is TRUE
186 */
187 errorNo = E_OK;
188 ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_2);
189 commB->Activate(USER_ID_2);
190 EXPECT_EQ(errorNo, E_OK);
191 EXPECT_NE(commA, nullptr);
192
193 /**
194 * @tc.steps: step3. alloc communicator C using label A with USER_ID_1
195 * @tc.expected: step3. alloc return not OK.
196 */
197 errorNo = E_OK;
198 ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_1);
199 EXPECT_NE(errorNo, E_OK);
200 EXPECT_EQ(commC, nullptr);
201
202 /**
203 * @tc.steps: step4. release communicator A and communicator B
204 * @tc.expected: step4. OK.
205 */
206 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA, USER_ID_1);
207 commA = nullptr;
208 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB, USER_ID_2);
209 commB = nullptr;
210 }
211
ConnectWaitDisconnect()212 static void ConnectWaitDisconnect()
213 {
214 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
215 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
216 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
217 }
218
219 /**
220 * @tc.name: Online And Offline 001
221 * @tc.desc: Test functionality triggered by physical devices online and offline
222 * @tc.type: FUNC
223 * @tc.require:
224 * @tc.author: wudongxing
225 */
226 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
227 {
228 /**
229 * @tc.steps: step1. device A alloc communicator AA using label A and register callback
230 * @tc.expected: step1. no callback.
231 */
232 int errorNo = E_OK;
233 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
234 ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
235 OnOfflineDevice onlineForAA;
__anon9f0e15bf0202(const std::string &target, bool isConnect) 236 commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
237 HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
238 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
239
240 /**
241 * @tc.steps: step2. connect device A with device B and then disconnect
242 * @tc.expected: step2. no callback.
243 */
244 ConnectWaitDisconnect();
245 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
246
247 /**
248 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
249 * @tc.expected: step3. no callback.
250 */
251 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
252 ASSERT_NOT_NULL_AND_ACTIVATE(commBB, "");
253 OnOfflineDevice onlineForBB;
__anon9f0e15bf0302(const std::string &target, bool isConnect) 254 commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
255 HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
256 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
257 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
258
259 /**
260 * @tc.steps: step4. connect device A with device B and then disconnect
261 * @tc.expected: step4. no callback.
262 */
263 ConnectWaitDisconnect();
264 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
265 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
266
267 /**
268 * @tc.steps: step5. device B alloc communicator BA using label A and register callback
269 * @tc.expected: step5. no callback.
270 */
271 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
272 ASSERT_NOT_NULL_AND_ACTIVATE(commBA, "");
273 OnOfflineDevice onlineForBA;
__anon9f0e15bf0402(const std::string &target, bool isConnect) 274 commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
275 HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
276 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
277 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
278 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
279
280 /**
281 * @tc.steps: step6. connect device A with device B
282 * @tc.expected: step6. communicator AA has callback of device B online;
283 * communicator BA has callback of device A online;
284 * communicator BB no callback
285 */
286 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
287 std::this_thread::sleep_for(std::chrono::milliseconds(100));
288 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
289 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
290 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
291 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
292 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
293
294 /**
295 * @tc.steps: step7. disconnect device A with device B
296 * @tc.expected: step7. communicator AA has callback of device B offline;
297 * communicator BA has callback of device A offline;
298 * communicator BB no callback
299 */
300 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
301 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
302 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
305 EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
306 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
307
308 // Clean up
309 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
310 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
311 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
312 }
313
314 #define REG_CONNECT_CALLBACK(communicator, online) \
315 { \
316 communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
317 HandleConnectChange(online, target, isConnect); \
318 }, nullptr); \
319 }
320
321 #define CONNECT_AND_WAIT(waitTime) \
322 { \
323 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
324 std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
325 }
326
327 /**
328 * @tc.name: Online And Offline 002
329 * @tc.desc: Test functionality triggered by alloc and release communicator
330 * @tc.type: FUNC
331 * @tc.require:
332 * @tc.author: wudongxing
333 */
334 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
335 {
336 /**
337 * @tc.steps: step1. connect device A with device B
338 */
339 CONNECT_AND_WAIT(200); // Sleep 200 ms
340
341 /**
342 * @tc.steps: step2. device A alloc communicator AA using label A and register callback
343 * @tc.expected: step2. no callback.
344 */
345 int errorNo = E_OK;
346 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
347 ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
348 OnOfflineDevice onlineForAA;
349 REG_CONNECT_CALLBACK(commAA, onlineForAA);
350 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
351
352 /**
353 * @tc.steps: step3. device B alloc communicator BB using label B and register callback
354 * @tc.expected: step3. no callback.
355 */
356 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
357 ASSERT_NOT_NULL_AND_ACTIVATE(commBB, "");
358 OnOfflineDevice onlineForBB;
359 REG_CONNECT_CALLBACK(commBB, onlineForBB);
360 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
361 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
362
363 /**
364 * @tc.steps: step4. device B alloc communicator BA using label A and register callback
365 * @tc.expected: step4. communicator AA has callback of device B online;
366 * communicator BA has callback of device A online;
367 * communicator BB no callback.
368 */
369 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
370 ASSERT_NOT_NULL_AND_ACTIVATE(commBA, "");
371 OnOfflineDevice onlineForBA;
372 REG_CONNECT_CALLBACK(commBA, onlineForBA);
373 std::this_thread::sleep_for(std::chrono::milliseconds(100));
374 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
375 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
376 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
377 EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
378 EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
379
380 /**
381 * @tc.steps: step5. device A alloc communicator AB using label B and register callback
382 * @tc.expected: step5. communicator AB has callback of device B online;
383 * communicator BB has callback of device A online;
384 */
385 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
386 ASSERT_NOT_NULL_AND_ACTIVATE(commAB, "");
387 OnOfflineDevice onlineForAB;
388 REG_CONNECT_CALLBACK(commAB, onlineForAB);
389 std::this_thread::sleep_for(std::chrono::milliseconds(100));
390 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
391 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
392 EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
393 EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
394
395 /**
396 * @tc.steps: step6. device A release communicator AA
397 * @tc.expected: step6. communicator BA has callback of device A offline;
398 * communicator AB and BB no callback;
399 */
400 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
401 std::this_thread::sleep_for(std::chrono::milliseconds(100));
402 EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
403 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
404 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
405 EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
406
407 /**
408 * @tc.steps: step7. device B release communicator BA
409 * @tc.expected: step7. communicator AB and BB no callback;
410 */
411 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
412 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
413 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
414
415 /**
416 * @tc.steps: step8. device B release communicator BB
417 * @tc.expected: step8. communicator AB has callback of device B offline;
418 */
419 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
420 std::this_thread::sleep_for(std::chrono::milliseconds(100));
421 EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
422 EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
423
424 // Clean up
425 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
426 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
427 }
428
TestRemoteRestart()429 void TestRemoteRestart()
430 {
431 /**
432 * @tc.steps: step1. connect device D with device E
433 */
434 EnvHandle envDeviceD;
435 EnvHandle envDeviceE;
436 SetUpEnv(envDeviceD, "DEVICE_D");
437 SetUpEnv(envDeviceE, "DEVICE_E");
438
439 /**
440 * @tc.steps: step2. device D alloc communicator DD using label A and register callback
441 */
442 int errorNo = E_OK;
443 ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
444 ASSERT_NOT_NULL_AND_ACTIVATE(commDD, "");
445 OnOfflineDevice onlineForDD;
446 REG_CONNECT_CALLBACK(commDD, onlineForDD);
447
448 /**
449 * @tc.steps: step3. device E alloc communicator EE using label A and register callback
450 */
451 ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
452 ASSERT_NOT_NULL_AND_ACTIVATE(commEE, "");
453 OnOfflineDevice onlineForEE;
454 REG_CONNECT_CALLBACK(commEE, onlineForEE);
455 /**
456 * @tc.steps: step4. deviceD connect to deviceE
457 * @tc.expected: step4. both communicator has callback;
458 */
459 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
460 std::this_thread::sleep_for(std::chrono::seconds(1));
461 EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
462 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
463
464 /**
465 * @tc.steps: step5. device E restart
466 */
467 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
468 std::this_thread::sleep_for(std::chrono::seconds(1));
469 TearDownEnv(envDeviceE);
470 SetUpEnv(envDeviceE, "DEVICE_E");
471
472 commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
473 ASSERT_NOT_NULL_AND_ACTIVATE(commEE, "");
474 REG_CONNECT_CALLBACK(commEE, onlineForEE);
475 onlineForEE.onlineDevices.clear();
476 /**
477 * @tc.steps: step6. deviceD connect to deviceE again
478 * @tc.expected: step6. communicatorE has callback;
479 */
480 AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
481 int reTryTimes = 5;
482 while (onlineForEE.onlineDevices.size() != 1 && reTryTimes > 0) {
483 std::this_thread::sleep_for(std::chrono::seconds(1));
484 reTryTimes--;
485 }
486 EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
487 // Clean up and disconnect
488 envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
489 envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
490 std::this_thread::sleep_for(std::chrono::seconds(1));
491
492 AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
493 TearDownEnv(envDeviceD);
494 TearDownEnv(envDeviceE);
495 }
496 /**
497 * @tc.name: Online And Offline 003
498 * @tc.desc: Test functionality triggered by remote restart
499 * @tc.type: FUNC
500 * @tc.require:
501 * @tc.author: zhangqiquan
502 */
503 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
504 {
505 TestRemoteRestart();
506 }
507
508 /**
509 * @tc.name: Online And Offline 004
510 * @tc.desc: Test functionality triggered by remote restart with thread pool
511 * @tc.type: FUNC
512 * @tc.require:
513 * @tc.author: zhangqiquan
514 */
515 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
516 {
517 auto threadPool = std::make_shared<ThreadPoolTestStub>();
518 RuntimeContext::GetInstance()->SetThreadPool(threadPool);
519 TestRemoteRestart();
520 RuntimeContext::GetInstance()->SetThreadPool(nullptr);
521 }
522
523 /**
524 * @tc.name: Report Device Connect Change 001
525 * @tc.desc: Test CommunicatorAggregator support report device connect change event
526 * @tc.type: FUNC
527 * @tc.require:
528 * @tc.author: xiaozhenjian
529 */
530 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
531 {
532 /**
533 * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
534 */
535 OnOfflineDevice onlineForA;
536 int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anon9f0e15bf0502(const std::string &target, bool isConnect) 537 [&onlineForA](const std::string &target, bool isConnect) {
538 HandleConnectChange(onlineForA, target, isConnect);
539 }, nullptr);
540 EXPECT_EQ(errCode, E_OK);
541 OnOfflineDevice onlineForB;
542 errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anon9f0e15bf0602(const std::string &target, bool isConnect) 543 [&onlineForB](const std::string &target, bool isConnect) {
544 HandleConnectChange(onlineForB, target, isConnect);
545 }, nullptr);
546 EXPECT_EQ(errCode, E_OK);
547
548 /**
549 * @tc.steps: step2. connect device A with device B
550 * @tc.expected: step2. device A callback B online; device B callback A online;
551 */
552 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
553 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
554 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
555 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
556 EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
557 EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
558
559 /**
560 * @tc.steps: step3. connect device A with device B
561 * @tc.expected: step3. device A callback B offline; device B callback A offline;
562 */
563 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
564 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
565 EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
566 EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
567 EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
568 EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
569
570 // Clean up
571 g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
572 g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
573 }
574
575 namespace {
ToLabelType(uint64_t commLabel)576 LabelType ToLabelType(uint64_t commLabel)
577 {
578 uint64_t netOrderLabel = HostToNet(commLabel);
579 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
580 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
581 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
582 realLabel[i] = eachByte[i];
583 }
584 return realLabel;
585 }
586 }
587
588 /**
589 * @tc.name: Report Communicator Not Found 001
590 * @tc.desc: Test CommunicatorAggregator support report communicator not found event
591 * @tc.type: FUNC
592 * @tc.require:
593 * @tc.author: xiaozhenjian
594 */
595 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
596 {
597 /**
598 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
599 */
600 std::vector<LabelType> lackLabels;
601 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon9f0e15bf0802(const LabelType &commLabel, const std::string &userId)602 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
603 lackLabels.push_back(commLabel);
604 return -E_NOT_FOUND;
605 }, nullptr);
606 EXPECT_EQ(errCode, E_OK);
607
608 /**
609 * @tc.steps: step2. connect device A with device B
610 */
611 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
612 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
613
614 /**
615 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
616 * @tc.expected: step3. device B callback that label A not found.
617 */
618 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
619 ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
620 Message *msgForAA = BuildRegedTinyMessage();
621 ASSERT_NE(msgForAA, nullptr);
622 SendConfig conf = {true, false, true, 0};
623 errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
624 EXPECT_EQ(errCode, E_OK);
625 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
626 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
627 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
628
629 /**
630 * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
631 * @tc.expected: step4. communicator BA will not receive message.
632 */
633 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
634 ASSERT_NE(commBA, nullptr);
635 Message *recvMsgForBA = nullptr;
__anon9f0e15bf0902(const std::string &srcTarget, Message *inMsg) 636 commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
637 recvMsgForBA = inMsg;
638 return E_OK;
639 }, nullptr);
640 commBA->Activate(USER_ID_1);
641 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
642 EXPECT_EQ(recvMsgForBA, nullptr);
643
644 // Clean up
645 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
646 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
647 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
648 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
649 }
650
651 #define DO_SEND_MESSAGE(src, dst, label, session) \
652 { \
653 Message *msgFor##src##label = BuildRegedTinyMessage(); \
654 ASSERT_NE(msgFor##src##label, nullptr); \
655 msgFor##src##label->SetSessionId(session); \
656 SendConfig conf = {true, false, true, 0}; \
657 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
658 EXPECT_EQ(errCode, E_OK); \
659 }
660
661 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
662 { \
663 Message *msgFor##src##label = BuildRegedGiantMessage(size); \
664 ASSERT_NE(msgFor##src##label, nullptr); \
665 SendConfig conf = {false, false, true, 0}; \
666 errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
667 EXPECT_EQ(errCode, E_OK); \
668 }
669
670 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
671 ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
672 ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label, ""); \
673 DO_SEND_MESSAGE(src, dst, label, session)
674
675 #define REG_MESSAGE_CALLBACK(src, label) \
676 string srcTargetFor##src##label; \
677 Message *recvMsgFor##src##label = nullptr; \
678 comm##src##label->RegOnMessageCallback( \
679 [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
680 srcTargetFor##src##label = srcTarget; \
681 recvMsgFor##src##label = inMsg; \
682 return E_OK; \
683 }, nullptr);
684
685 /**
686 * @tc.name: ReDeliver Message 001
687 * @tc.desc: Test CommunicatorAggregator support redeliver message
688 * @tc.type: FUNC
689 * @tc.require:
690 * @tc.author: xiaozhenjian
691 */
692 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
693 {
694 /**
695 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
696 */
697 std::vector<LabelType> lackLabels;
698 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon9f0e15bf0a02(const LabelType &commLabel, const std::string &userId)699 [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
700 lackLabels.push_back(commLabel);
701 return E_OK;
702 }, nullptr);
703 EXPECT_EQ(errCode, E_OK);
704
705 /**
706 * @tc.steps: step2. connect device A with device B
707 */
708 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
709 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
710
711 /**
712 * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
713 * @tc.expected: step3. device B callback that label A not found.
714 */
715 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
716 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
717 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
718 EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
719
720 /**
721 * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
722 * @tc.expected: step4. device B callback that label B not found.
723 */
724 ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
725 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
726 ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
727 EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
728
729 /**
730 * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
731 * @tc.expected: step5. communicator BA will receive message.
732 */
733 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
734 ASSERT_NE(commBA, nullptr);
735 REG_MESSAGE_CALLBACK(B, A);
736 commBA->Activate();
737 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
738 EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
739 ASSERT_NE(recvMsgForBA, nullptr);
740 EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
741 delete recvMsgForBA;
742 recvMsgForBA = nullptr;
743
744 /**
745 * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
746 * @tc.expected: step6. communicator BB will receive message.
747 */
748 ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
749 ASSERT_NE(commBB, nullptr);
750 REG_MESSAGE_CALLBACK(B, B);
751 commBB->Activate();
752 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
753 EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
754 ASSERT_NE(recvMsgForBB, nullptr);
755 EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
756 delete recvMsgForBB;
757 recvMsgForBB = nullptr;
758
759 // Clean up
760 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
761 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
762 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
763 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
764 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
765 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
766 }
767
768 /**
769 * @tc.name: ReDeliver Message 002
770 * @tc.desc: Test CommunicatorAggregator support redeliver message by order
771 * @tc.type: FUNC
772 * @tc.require:
773 * @tc.author: xiaozhenjian
774 */
775 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
776 {
777 /**
778 * @tc.steps: step1. device C create CommunicatorAggregator and initialize
779 */
780 bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
781 ASSERT_EQ(step1, true);
782
783 /**
784 * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
785 */
786 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon9f0e15bf0b02(const LabelType &commLabel, const std::string &userId)787 const std::string &userId)->int {
788 return E_OK;
789 }, nullptr);
790 EXPECT_EQ(errCode, E_OK);
791
792 /**
793 * @tc.steps: step3. connect device A with device B, then device B with device C
794 */
795 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
796 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
797 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
798
799 /**
800 * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
801 */
802 ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
803 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
804
805 /**
806 * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
807 */
808 ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
809 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
810 DO_SEND_MESSAGE(A, B, A, 300); // session id 300
811 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
812 DO_SEND_MESSAGE(C, B, A, 400); // session id 400
813 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
814
815 /**
816 * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
817 * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
818 */
819 ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
820 ASSERT_NE(commBA, nullptr);
821 std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anon9f0e15bf0c02(const std::string &srcTarget, Message *inMsg) 822 commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
823 msgCallbackForBA.push_back({srcTarget, inMsg});
824 return E_OK;
825 }, nullptr);
826 commBA->Activate();
827 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
828 ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
829 EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
830 EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
831 EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
832 EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
833 for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
834 EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
835 delete msgCallbackForBA[i].second;
836 msgCallbackForBA[i].second = nullptr;
837 }
838
839 // Clean up
840 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
841 g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
842 g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
843 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
844 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
845 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
846 TearDownEnv(g_envDeviceC);
847 }
848
849 /**
850 * @tc.name: ReDeliver Message 003
851 * @tc.desc: For observe memory in unusual scenario
852 * @tc.type: FUNC
853 * @tc.require:
854 * @tc.author: xiaozhenjian
855 */
856 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
857 {
858 /**
859 * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
860 */
861 int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon9f0e15bf0d02(const LabelType &commLabel, const std::string &userId)862 const std::string &userId)->int {
863 return E_OK;
864 }, nullptr);
865 EXPECT_EQ(errCode, E_OK);
866
867 /**
868 * @tc.steps: step2. connect device A with device B
869 */
870 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
871 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
872
873 /**
874 * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
875 */
876 ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
877 ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
878 ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
879 ASSERT_NOT_NULL_AND_ACTIVATE(commAB, "");
880 ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
881 ASSERT_NOT_NULL_AND_ACTIVATE(commAC, "");
882 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
883
884 /**
885 * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
886 */
887 for (int turn = 0; turn < 11; turn++) { // Total 11 turns
888 DO_SEND_MESSAGE(A, B, A, 0);
889 DO_SEND_MESSAGE(A, B, B, 0);
890 DO_SEND_MESSAGE(A, B, C, 0);
891 }
892
893 /**
894 * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
895 */
896 for (int turn = 0; turn < 5; turn++) { // Total 5 turns
897 DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
898 DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
899 DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
900 }
901 DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
902
903 /**
904 * @tc.steps: step6. wait a long time then send last frame
905 */
906 for (int sec = 0; sec < 15; sec++) { // Total 15 s
907 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
908 LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
909 }
910 DO_SEND_MESSAGE(A, B, A, 0);
911 std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
912
913 // Clean up
914 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
915 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
916 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
917 g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
918 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
919 }
920
921 namespace {
SetUpEnv(const std::string & localDev,const std::string & supportDev,const std::shared_ptr<DBStatusAdapter> & adapter,bool isSupport,EnvHandle & envDevice)922 void SetUpEnv(const std::string &localDev, const std::string &supportDev,
923 const std::shared_ptr<DBStatusAdapter> &adapter, bool isSupport, EnvHandle &envDevice)
924 {
925 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
926 adapter->SetDBInfoHandle(handle);
927 adapter->SetRemoteOptimizeCommunication(supportDev, isSupport);
928 SetUpEnv(envDevice, localDev, adapter);
929 }
930
InitCommunicator(DBInfo & dbInfo,EnvHandle & envDevice,OnOfflineDevice & onlineCallback,ICommunicator * & comm)931 void InitCommunicator(DBInfo &dbInfo, EnvHandle &envDevice, OnOfflineDevice &onlineCallback, ICommunicator *&comm)
932 {
933 dbInfo.userId = USER_ID;
934 dbInfo.appId = APP_ID;
935 dbInfo.storeId = STORE_ID_1;
936 dbInfo.isNeedSync = true;
937 dbInfo.syncDualTupleMode = false;
938 std::string label = DBCommon::GenerateHashLabel(dbInfo);
939 std::vector<uint8_t> commLabel(label.begin(), label.end());
940 int errorNo = E_OK;
941 comm = envDevice.commAggrHandle->AllocCommunicator(commLabel, errorNo);
942 ASSERT_NOT_NULL_AND_ACTIVATE(comm, "");
943 REG_CONNECT_CALLBACK(comm, onlineCallback);
944 }
945 }
946
947 /**
948 * @tc.name: CommunicationOptimization001
949 * @tc.desc: Test notify with isSupport true.
950 * @tc.type: FUNC
951 * @tc.require:
952 * @tc.author: zhangqiquan
953 */
954 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization001, TestSize.Level3)
955 {
956 auto *pAggregator = new VirtualCommunicatorAggregator();
957 ASSERT_NE(pAggregator, nullptr);
958 const std::string deviceA = "DEVICES_A";
959 const std::string deviceB = "DEVICES_B";
960 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
961 /**
962 * @tc.steps: step1. set up env
963 */
964 EnvHandle envDeviceA;
965 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
966 SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
967
968 EnvHandle envDeviceB;
969 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
970 SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
971
972 /**
973 * @tc.steps: step2. device alloc communicator using label and register callback
974 */
975 DBInfo dbInfo;
976 ICommunicator *commAA = nullptr;
977 OnOfflineDevice onlineForAA;
978 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
979
980 ICommunicator *commBB = nullptr;
981 OnOfflineDevice onlineForBB;
982 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
983
984 /**
985 * @tc.steps: step3. connect device A with device B
986 */
987 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
988
989 /**
990 * @tc.steps: step4. wait for label exchange
991 * @tc.expected: step4. both communicator has no callback;
992 */
993 std::this_thread::sleep_for(std::chrono::seconds(1));
994 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
995 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
996
997 /**
998 * @tc.steps: step5. both notify
999 * @tc.expected: step5. both has callback;
1000 */
1001 RuntimeConfig::NotifyDBInfos({ deviceB }, { dbInfo });
1002 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1003 adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
1004 std::this_thread::sleep_for(std::chrono::seconds(1));
1005
1006 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1007
1008 dbInfo.isNeedSync = false;
1009 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1010 adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
1011 std::this_thread::sleep_for(std::chrono::seconds(1));
1012 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1013
1014 // Clean up and disconnect
1015 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1016 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1017 std::this_thread::sleep_for(std::chrono::seconds(1));
1018
1019 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1020
1021 TearDownEnv(envDeviceA);
1022 TearDownEnv(envDeviceB);
1023 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1024 }
1025
1026 /**
1027 * @tc.name: CommunicationOptimization002
1028 * @tc.desc: Test notify with isSupport true and can offline by device change.
1029 * @tc.type: FUNC
1030 * @tc.require:
1031 * @tc.author: zhangqiquan
1032 */
1033 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization002, TestSize.Level3)
1034 {
1035 auto *pAggregator = new VirtualCommunicatorAggregator();
1036 ASSERT_NE(pAggregator, nullptr);
1037 const std::string deviceA = "DEVICES_A";
1038 const std::string deviceB = "DEVICES_B";
1039 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1040 /**
1041 * @tc.steps: step1. set up env
1042 */
1043 EnvHandle envDeviceA;
1044 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1045 SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
1046
1047 EnvHandle envDeviceB;
1048 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1049 SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
1050
1051 /**
1052 * @tc.steps: step2. device alloc communicator using label and register callback
1053 */
1054 DBInfo dbInfo;
1055 ICommunicator *commAA = nullptr;
1056 OnOfflineDevice onlineForAA;
1057 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1058 /**
1059 * @tc.steps: step3. connect device A with device B
1060 */
1061 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1062 std::this_thread::sleep_for(std::chrono::seconds(1));
1063
1064 /**
1065 * @tc.steps: step5. A notify remote
1066 * @tc.expected: step5. A has callback;
1067 */
1068 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1069 std::this_thread::sleep_for(std::chrono::seconds(1));
1070 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1071
1072 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1073 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1074
1075 // Clean up and disconnect
1076 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1077 std::this_thread::sleep_for(std::chrono::seconds(1));
1078
1079 TearDownEnv(envDeviceA);
1080 TearDownEnv(envDeviceB);
1081 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1082 }
1083
1084 /**
1085 * @tc.name: CommunicationOptimization003
1086 * @tc.desc: Test notify with isSupport false.
1087 * @tc.type: FUNC
1088 * @tc.require:
1089 * @tc.author: zhangqiquan
1090 */
1091 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization003, TestSize.Level3)
1092 {
1093 auto *pAggregator = new VirtualCommunicatorAggregator();
1094 ASSERT_NE(pAggregator, nullptr);
1095 const std::string deviceA = "DEVICES_A";
1096 const std::string deviceB = "DEVICES_B";
1097 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1098 /**
1099 * @tc.steps: step1. set up env
1100 */
1101 EnvHandle envDeviceA;
1102 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1103 SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1104 EnvHandle envDeviceB;
1105 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1106 SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1107 /**
1108 * @tc.steps: step2. device alloc communicator using label and register callback
1109 */
1110 DBInfo dbInfo;
1111 ICommunicator *commAA = nullptr;
1112 OnOfflineDevice onlineForAA;
1113 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1114 ICommunicator *commBB = nullptr;
1115 OnOfflineDevice onlineForBB;
1116 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1117 /**
1118 * @tc.steps: step3. connect device A with device B
1119 */
1120 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1121 /**
1122 * @tc.steps: step4. wait for label exchange
1123 * @tc.expected: step4. both communicator has no callback;
1124 */
1125 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1126 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1127 /**
1128 * @tc.steps: step5. A notify remote
1129 * @tc.expected: step5. B has no callback;
1130 */
1131 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1132 std::this_thread::sleep_for(std::chrono::seconds(1));
1133 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1134 /**
1135 * @tc.steps: step6. A notify local
1136 * @tc.expected: step6. B has no callback;
1137 */
1138 dbInfo.isNeedSync = false;
1139 onlineForAA.onlineDevices.clear();
1140 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1141 std::this_thread::sleep_for(std::chrono::seconds(1));
1142 EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1143 // Clean up and disconnect
1144 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1145 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1146 std::this_thread::sleep_for(std::chrono::seconds(1));
1147 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1148 TearDownEnv(envDeviceA);
1149 TearDownEnv(envDeviceB);
1150 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1151 }
1152
1153 /**
1154 * @tc.name: CommunicationOptimization004
1155 * @tc.desc: Test notify with isSupport false and it be will changed by communication.
1156 * @tc.type: FUNC
1157 * @tc.require:
1158 * @tc.author: zhangqiquan
1159 */
1160 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization004, TestSize.Level3)
1161 {
1162 const std::string deviceA = "DEVICES_A";
1163 const std::string deviceB = "DEVICES_B";
1164 /**
1165 * @tc.steps: step1. set up env
1166 */
1167 EnvHandle envDeviceA;
1168 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1169 SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1170 RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1171 EnvHandle envDeviceB;
1172 std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1173 SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1174 /**
1175 * @tc.steps: step2. device alloc communicator using label and register callback
1176 */
1177 DBInfo dbInfo;
1178 ICommunicator *commAA = nullptr;
1179 OnOfflineDevice onlineForAA;
1180 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1181 ICommunicator *commBB = nullptr;
1182 OnOfflineDevice onlineForBB;
1183 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1184 /**
1185 * @tc.steps: step3. connect device A with device B
1186 */
1187 EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1188 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1189 std::this_thread::sleep_for(std::chrono::seconds(1));
1190 EXPECT_EQ(adapterA->IsSupport(deviceB), true);
1191 // Clean up and disconnect
1192 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1193 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1194 std::this_thread::sleep_for(std::chrono::seconds(1));
1195 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1196 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1197 envDeviceA.commAggrHandle = nullptr;
1198 TearDownEnv(envDeviceA);
1199 TearDownEnv(envDeviceB);
1200 }
1201
1202 /**
1203 * @tc.name: CommunicationOptimization005
1204 * @tc.desc: Test notify with isSupport false and send label exchange.
1205 * @tc.type: FUNC
1206 * @tc.require:
1207 * @tc.author: zhangqiquan
1208 */
1209 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization005, TestSize.Level3)
1210 {
1211 const std::string deviceA = "DEVICES_A";
1212 const std::string deviceB = "DEVICES_B";
1213 /**
1214 * @tc.steps: step1. set up env
1215 */
1216 EnvHandle envDeviceA;
1217 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1218 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1219 handle->SetLocalIsSupport(false);
1220 adapterA->SetDBInfoHandle(handle);
1221 SetUpEnv(envDeviceA, deviceA, adapterA);
1222 RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1223 EnvHandle envDeviceB;
1224 SetUpEnv(envDeviceB, deviceB, nullptr);
1225 /**
1226 * @tc.steps: step2. connect device A with device B
1227 */
1228 EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1229 AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1230 std::this_thread::sleep_for(std::chrono::seconds(1));
1231 /**
1232 * @tc.steps: step3. device alloc communicator using label and register callback
1233 */
1234 DBInfo dbInfo;
1235 ICommunicator *commAA = nullptr;
1236 OnOfflineDevice onlineForAA;
1237 InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1238 ICommunicator *commBB = nullptr;
1239 OnOfflineDevice onlineForBB;
1240 InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1241 std::this_thread::sleep_for(std::chrono::seconds(1));
1242 EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
1243
1244 // Clean up and disconnect
1245 envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1246 envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1247 std::this_thread::sleep_for(std::chrono::seconds(1));
1248 AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1249 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1250 envDeviceA.commAggrHandle = nullptr;
1251 TearDownEnv(envDeviceA);
1252 TearDownEnv(envDeviceB);
1253 }
1254
1255 /**
1256 * @tc.name: DbStatusAdapter001
1257 * @tc.desc: Test notify with isSupport false.
1258 * @tc.type: FUNC
1259 * @tc.require:
1260 * @tc.author: zhangqiquan
1261 */
1262 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter001, TestSize.Level1)
1263 {
1264 auto *pAggregator = new VirtualCommunicatorAggregator();
1265 ASSERT_NE(pAggregator, nullptr);
1266 const std::string deviceA = "DEVICES_A";
1267 const std::string deviceB = "DEVICES_B";
1268 RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1269
1270 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1271 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1272 adapterA->SetDBInfoHandle(handle);
1273 adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1274 std::string actualRemoteDevInfo;
1275 size_t remoteInfoCount = 0u;
1276 size_t localCount = 0u;
1277 DBInfo dbInfo;
1278 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1279
1280 dbInfo = {
1281 USER_ID,
1282 APP_ID,
1283 STORE_ID_1,
1284 true,
1285 false
1286 };
1287 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1288 dbInfo.isNeedSync = false;
1289 adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1290 adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1291
1292 size_t remoteChangeCount = 0u;
1293 adapterA->SetDBStatusChangeCallback(
__anon9f0e15bf0f02(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1294 [&actualRemoteDevInfo, &remoteInfoCount](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
1295 actualRemoteDevInfo = devInfo;
1296 remoteInfoCount = dbInfos.size();
1297 },
__anon9f0e15bf1002() 1298 [&localCount]() {
1299 localCount++;
1300 },
__anon9f0e15bf1102(const std::string &dev) 1301 [&remoteChangeCount, deviceB](const std::string &dev) {
1302 remoteChangeCount++;
1303 EXPECT_EQ(dev, deviceB);
1304 });
1305 std::this_thread::sleep_for(std::chrono::seconds(1));
1306 EXPECT_EQ(actualRemoteDevInfo, deviceB);
1307 EXPECT_EQ(remoteInfoCount, 1u);
1308 adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1309 std::this_thread::sleep_for(std::chrono::seconds(1));
1310 EXPECT_EQ(remoteChangeCount, 1u);
1311 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1312 }
1313
1314 /**
1315 * @tc.name: DbStatusAdapter002
1316 * @tc.desc: Test adapter clear cache.
1317 * @tc.type: FUNC
1318 * @tc.require:
1319 * @tc.author: zhangqiquan
1320 */
1321 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter002, TestSize.Level1) {
1322 const std::string deviceB = "DEVICES_B";
1323 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1324 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1325 adapterA->SetDBInfoHandle(handle);
1326 adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1327 EXPECT_TRUE(adapterA->IsSupport(deviceB));
1328 adapterA->SetDBInfoHandle(handle);
1329 adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1330 EXPECT_FALSE(adapterA->IsSupport(deviceB));
1331 }
1332
1333 /**
1334 * @tc.name: DbStatusAdapter003
1335 * @tc.desc: Test adapter get local dbInfo.
1336 * @tc.type: FUNC
1337 * @tc.require:
1338 * @tc.author: zhangqiquan
1339 */
1340 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter003, TestSize.Level1) {
1341 const std::string deviceB = "DEVICES_B";
1342 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1343 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1344 adapterA->SetDBInfoHandle(handle);
1345 handle->SetLocalIsSupport(true);
1346 std::vector<DBInfo> dbInfos;
1347 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1348 handle->SetLocalIsSupport(false);
1349 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1350 adapterA->SetDBInfoHandle(handle);
1351 EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), -E_NOT_SUPPORT);
1352 }
1353
1354 /**
1355 * @tc.name: DbStatusAdapter004
1356 * @tc.desc: Test adapter clear cache will get callback.
1357 * @tc.type: FUNC
1358 * @tc.require:
1359 * @tc.author: zhangqiquan
1360 */
1361 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter004, TestSize.Level1)
1362 {
1363 const std::string deviceB = "DEVICES_B";
1364 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1365 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1366 adapterA->SetDBInfoHandle(handle);
1367 handle->SetLocalIsSupport(true);
1368 std::vector<DBInfo> dbInfos;
1369 DBInfo dbInfo = {
1370 USER_ID,
1371 APP_ID,
1372 STORE_ID_1,
1373 true,
1374 true
1375 };
1376 dbInfos.push_back(dbInfo);
1377 dbInfo.storeId = STORE_ID_2;
1378 dbInfos.push_back(dbInfo);
1379 dbInfo.storeId = STORE_ID_3;
1380 dbInfo.isNeedSync = false;
1381 dbInfos.push_back(dbInfo);
1382 adapterA->NotifyDBInfos({"dev"}, dbInfos);
1383 std::this_thread::sleep_for(std::chrono::seconds(1));
1384 size_t notifyCount = 0u;
1385 adapterA->SetDBStatusChangeCallback([¬ifyCount](const std::string &devInfo,
__anon9f0e15bf1202(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1386 const std::vector<DBInfo> &dbInfos) {
1387 LOGD("on callback");
1388 for (const auto &dbInfo: dbInfos) {
1389 EXPECT_FALSE(dbInfo.isNeedSync);
1390 }
1391 notifyCount = dbInfos.size();
1392 }, nullptr, nullptr);
1393 adapterA->SetDBInfoHandle(nullptr);
1394 EXPECT_EQ(notifyCount, 2u); // 2 dbInfo is need sync now it should be offline
1395 }
1396
1397 /**
1398 * @tc.name: DbStatusAdapter005
1399 * @tc.desc: Test adapter is need auto sync.
1400 * @tc.type: FUNC
1401 * @tc.require:
1402 * @tc.author: zhangqiquan
1403 */
1404 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter005, TestSize.Level1)
1405 {
1406 const std::string deviceB = "DEVICES_B";
1407 std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1408 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1409 adapterA->SetDBInfoHandle(handle);
1410 handle->SetLocalIsSupport(true);
1411 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1412 handle->SetNeedAutoSync(false);
1413 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1414 handle->SetLocalIsSupport(false);
1415 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1416 adapterA->SetDBInfoHandle(handle);
1417 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1418 adapterA->SetDBInfoHandle(nullptr);
1419 EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1420 }
1421
1422 /**
1423 * @tc.name: AbnormalCommunicatorTest001
1424 * @tc.desc: Test communicator abnormal.
1425 * @tc.type: FUNC
1426 * @tc.require:
1427 * @tc.author: caihaoting
1428 */
1429 HWTEST_F(DistributedDBCommunicatorTest, AbnormalCommunicatorTest001, TestSize.Level1)
1430 {
1431 ICommunicator *commAA = nullptr;
1432 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1433 EXPECT_EQ(commAA, nullptr);
1434 int errCode = E_OK;
1435 commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
1436 SendConfig conf = {true, false, true, 0};
1437 Message *msgForAA = nullptr;
1438 errCode = commAA->SendMessage(DEVICE_NAME_A, msgForAA, conf);
1439 EXPECT_NE(errCode, E_OK);
1440 msgForAA = BuildRegedTinyMessage();
1441 ASSERT_NE(msgForAA, nullptr);
1442 errCode = commAA->SendMessage("", msgForAA, conf);
1443 EXPECT_NE(errCode, E_OK);
1444 errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
1445 EXPECT_EQ(errCode, E_OK);
1446 EXPECT_EQ(commAA->GetTargetUserId({}), std::string(""));
1447 g_envDeviceA.commAggrHandle->ClearOnlineLabel();
1448 std::vector<uint8_t> label;
1449 auto aggregator = std::make_shared<CommunicatorAggregator>();
1450 ASSERT_NE(aggregator, nullptr);
1451 auto communicator = std::make_shared<Communicator>(aggregator.get(), label);
1452 ASSERT_NE(communicator, nullptr);
1453 EXPECT_EQ(communicator->GetTargetUserId({}), DBConstant::DEFAULT_USER);
1454 g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1455 }
1456
1457 /**
1458 * @tc.name: CheckInFragmentNoTest001
1459 * @tc.desc: Test CheckInFragmentNo func.
1460 * @tc.type: FUNC
1461 * @tc.require:
1462 * @tc.author: tiansimiao
1463 */
1464 HWTEST_F(DistributedDBCommunicatorTest, CheckInFragmentNoTest001, TestSize.Level1)
1465 {
1466 CombineStatus status;
1467 /**
1468 * @tc.steps: step1. set fragmentCount by inFragCount
1469 */
1470 uint16_t inFragCount = 5;
1471 status.SetFragmentCount(inFragCount);
1472 /**
1473 * @tc.steps: step2. make inFragNo greater than fragmentCount
1474 */
1475 uint16_t inFragNo = inFragCount + 1;
1476 status.CheckInFragmentNo(inFragNo);
1477 EXPECT_FALSE(status.IsFragNoAlreadyExist(inFragNo));
1478 }
1479
1480 /**
1481 * @tc.name: GetRemoteCommunicatorVersionTest001
1482 * @tc.desc: Test GetRemoteCommunicatorVersion func.
1483 * @tc.type: FUNC
1484 * @tc.require:
1485 * @tc.author: tiansimiao
1486 */
1487 HWTEST_F(DistributedDBCommunicatorTest, GetRemoteCommunicatorVersionTest001, TestSize.Level1)
1488 {
1489 auto aggregator = std::make_unique<CommunicatorAggregator>();
1490 ASSERT_NE(aggregator, nullptr);
1491 uint16_t outVersion = 0;
1492 EXPECT_EQ(aggregator->GetRemoteCommunicatorVersion("notExistTarget", outVersion), -E_NOT_FOUND);
1493 }
1494 }