• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "combine_status.h"
20 #include "communicator.h"
21 #include "communicator_aggregator.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "distributeddb_communicator_common.h"
25 #include "distributeddb_data_generate_unit_test.h"
26 #include "distributeddb_tools_unit_test.h"
27 #include "endian_convert.h"
28 #include "log_print.h"
29 #include "runtime_config.h"
30 #include "runtime_context_impl.h"
31 #include "thread_pool_test_stub.h"
32 #include "virtual_communicator_aggregator.h"
33 
34 using namespace std;
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 
39 namespace {
40     EnvHandle g_envDeviceA;
41     EnvHandle g_envDeviceB;
42     EnvHandle g_envDeviceC;
43 
HandleConnectChange(OnOfflineDevice & onlines,const std::string & target,bool isConnect)44 static void HandleConnectChange(OnOfflineDevice &onlines, const std::string &target, bool isConnect)
45 {
46     if (isConnect) {
47         onlines.onlineDevices.insert(target);
48         onlines.latestOnlineDevice = target;
49         onlines.latestOfflineDevice.clear();
50     } else {
51         onlines.onlineDevices.erase(target);
52         onlines.latestOnlineDevice.clear();
53         onlines.latestOfflineDevice = target;
54     }
55 }
56 
57 class DistributedDBCommunicatorTest : public testing::Test {
58 public:
59     static void SetUpTestCase(void);
60     static void TearDownTestCase(void);
61     void SetUp();
62     void TearDown();
63 };
64 
SetUpTestCase(void)65 void DistributedDBCommunicatorTest::SetUpTestCase(void)
66 {
67     /**
68      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
69      */
70     LOGI("[UT][Test][SetUpTestCase] Enter.");
71     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
72     ASSERT_EQ(errCode, true);
73     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
74     ASSERT_EQ(errCode, true);
75     DoRegTransformFunction();
76     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
77 }
78 
TearDownTestCase(void)79 void DistributedDBCommunicatorTest::TearDownTestCase(void)
80 {
81     /**
82      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
83      */
84     LOGI("[UT][Test][TearDownTestCase] Enter.");
85     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
86     TearDownEnv(g_envDeviceA);
87     TearDownEnv(g_envDeviceB);
88     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
89 }
90 
SetUp()91 void DistributedDBCommunicatorTest::SetUp()
92 {
93     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
94 }
95 
TearDown()96 void DistributedDBCommunicatorTest::TearDown()
97 {
98     /**
99      * @tc.teardown: Wait 100 ms to make sure all thread quiet
100      */
101     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
102 }
103 
104 /**
105  * @tc.name: Communicator Management 001
106  * @tc.desc: Test alloc and release communicator
107  * @tc.type: FUNC
108  * @tc.require:
109  * @tc.author: xiaozhenjian
110  */
111 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement001, TestSize.Level1)
112 {
113     /**
114      * @tc.steps: step1. alloc communicator A using label A
115      * @tc.expected: step1. alloc return OK.
116      */
117     int errorNo = E_OK;
118     ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
119     EXPECT_EQ(errorNo, E_OK);
120     EXPECT_NE(commA, nullptr);
121 
122     /**
123      * @tc.steps: step2. alloc communicator B using label B
124      * @tc.expected: step2. alloc return OK.
125      */
126     errorNo = E_OK;
127     ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
128     EXPECT_EQ(errorNo, E_OK);
129     EXPECT_NE(commA, nullptr);
130 
131     /**
132      * @tc.steps: step3. alloc communicator C using label A
133      * @tc.expected: step3. alloc return not OK.
134      */
135     errorNo = E_OK;
136     ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
137     EXPECT_NE(errorNo, E_OK);
138     EXPECT_EQ(commC, nullptr);
139 
140     /**
141      * @tc.steps: step4. release communicator A and communicator B
142      */
143     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA);
144     commA = nullptr;
145     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB);
146     commB = nullptr;
147 
148     /**
149      * @tc.steps: step5. alloc communicator D using label A
150      * @tc.expected: step5. alloc return OK.
151      */
152     errorNo = E_OK;
153     ICommunicator *commD = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
154     EXPECT_EQ(errorNo, E_OK);
155     EXPECT_NE(commD, nullptr);
156 
157     /**
158      * @tc.steps: step6. release communicator D
159      */
160     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commD);
161     commD = nullptr;
162 }
163 
164 /**
165  * @tc.name: Communicator Management 002
166  * @tc.desc: Test alloc and release communicator with different userId
167  * @tc.type: FUNC
168  * @tc.require:
169  * @tc.author: liaoyonghuang
170  */
171 HWTEST_F(DistributedDBCommunicatorTest, CommunicatorManagement002, TestSize.Level1)
172 {
173     /**
174      * @tc.steps: step1. alloc communicator A using label A with USER_ID_1
175      * @tc.expected: step1. alloc return OK.
176      */
177     int errorNo = E_OK;
178     ICommunicator *commA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_1);
179     commA->Activate(USER_ID_1);
180     EXPECT_EQ(errorNo, E_OK);
181     EXPECT_NE(commA, nullptr);
182 
183     /**
184      * @tc.steps: step2. alloc communicator B using label A with USER_ID_2
185      * @tc.expected: step2. alloc return OK, and commA == commB is TRUE
186      */
187     errorNo = E_OK;
188     ICommunicator *commB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_2);
189     commB->Activate(USER_ID_2);
190     EXPECT_EQ(errorNo, E_OK);
191     EXPECT_NE(commA, nullptr);
192 
193     /**
194      * @tc.steps: step3. alloc communicator C using label A with USER_ID_1
195      * @tc.expected: step3. alloc return not OK.
196      */
197     errorNo = E_OK;
198     ICommunicator *commC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo, USER_ID_1);
199     EXPECT_NE(errorNo, E_OK);
200     EXPECT_EQ(commC, nullptr);
201 
202     /**
203      * @tc.steps: step4. release communicator A and communicator B
204      * @tc.expected: step4. OK.
205      */
206     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commA, USER_ID_1);
207     commA = nullptr;
208     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commB, USER_ID_2);
209     commB = nullptr;
210 }
211 
ConnectWaitDisconnect()212 static void ConnectWaitDisconnect()
213 {
214     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
215     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
216     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
217 }
218 
219 /**
220  * @tc.name: Online And Offline 001
221  * @tc.desc: Test functionality triggered by physical devices online and offline
222  * @tc.type: FUNC
223  * @tc.require:
224  * @tc.author: wudongxing
225  */
226 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline001, TestSize.Level1)
227 {
228     /**
229      * @tc.steps: step1. device A alloc communicator AA using label A and register callback
230      * @tc.expected: step1. no callback.
231      */
232     int errorNo = E_OK;
233     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
234     ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
235     OnOfflineDevice onlineForAA;
__anon9f0e15bf0202(const std::string &target, bool isConnect) 236     commAA->RegOnConnectCallback([&onlineForAA](const std::string &target, bool isConnect) {
237         HandleConnectChange(onlineForAA, target, isConnect);}, nullptr);
238     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
239 
240     /**
241      * @tc.steps: step2. connect device A with device B and then disconnect
242      * @tc.expected: step2. no callback.
243      */
244     ConnectWaitDisconnect();
245     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
246 
247     /**
248      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
249      * @tc.expected: step3. no callback.
250      */
251     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
252     ASSERT_NOT_NULL_AND_ACTIVATE(commBB, "");
253     OnOfflineDevice onlineForBB;
__anon9f0e15bf0302(const std::string &target, bool isConnect) 254     commBB->RegOnConnectCallback([&onlineForBB](const std::string &target, bool isConnect) {
255         HandleConnectChange(onlineForBB, target, isConnect);}, nullptr);
256     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
257     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
258 
259     /**
260      * @tc.steps: step4. connect device A with device B and then disconnect
261      * @tc.expected: step4. no callback.
262      */
263     ConnectWaitDisconnect();
264     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
265     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
266 
267     /**
268      * @tc.steps: step5. device B alloc communicator BA using label A and register callback
269      * @tc.expected: step5. no callback.
270      */
271     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
272     ASSERT_NOT_NULL_AND_ACTIVATE(commBA, "");
273     OnOfflineDevice onlineForBA;
__anon9f0e15bf0402(const std::string &target, bool isConnect) 274     commBA->RegOnConnectCallback([&onlineForBA](const std::string &target, bool isConnect) {
275         HandleConnectChange(onlineForBA, target, isConnect);}, nullptr);
276     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
277     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
278     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
279 
280     /**
281      * @tc.steps: step6. connect device A with device B
282      * @tc.expected: step6. communicator AA has callback of device B online;
283      *                      communicator BA has callback of device A online;
284      *                      communicator BB no callback
285      */
286     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
287     std::this_thread::sleep_for(std::chrono::milliseconds(100));
288     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
289     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
290     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
291     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
292     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
293 
294     /**
295      * @tc.steps: step7. disconnect device A with device B
296      * @tc.expected: step7. communicator AA has callback of device B offline;
297      *                      communicator BA has callback of device A offline;
298      *                      communicator BB no callback
299      */
300     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
301     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
302     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
303     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
304     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
305     EXPECT_EQ(onlineForAA.latestOfflineDevice, DEVICE_NAME_B);
306     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
307 
308     // Clean up
309     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
310     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
311     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
312 }
313 
314 #define REG_CONNECT_CALLBACK(communicator, online) \
315 { \
316     communicator->RegOnConnectCallback([&online](const std::string &target, bool isConnect) { \
317         HandleConnectChange(online, target, isConnect); \
318     }, nullptr); \
319 }
320 
321 #define CONNECT_AND_WAIT(waitTime) \
322 { \
323     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle); \
324     std::this_thread::sleep_for(std::chrono::milliseconds(waitTime)); \
325 }
326 
327 /**
328  * @tc.name: Online And Offline 002
329  * @tc.desc: Test functionality triggered by alloc and release communicator
330  * @tc.type: FUNC
331  * @tc.require:
332  * @tc.author: wudongxing
333  */
334 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline002, TestSize.Level1)
335 {
336     /**
337      * @tc.steps: step1. connect device A with device B
338      */
339     CONNECT_AND_WAIT(200); // Sleep 200 ms
340 
341     /**
342      * @tc.steps: step2. device A alloc communicator AA using label A and register callback
343      * @tc.expected: step2. no callback.
344      */
345     int errorNo = E_OK;
346     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
347     ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
348     OnOfflineDevice onlineForAA;
349     REG_CONNECT_CALLBACK(commAA, onlineForAA);
350     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
351 
352     /**
353      * @tc.steps: step3. device B alloc communicator BB using label B and register callback
354      * @tc.expected: step3. no callback.
355      */
356     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
357     ASSERT_NOT_NULL_AND_ACTIVATE(commBB, "");
358     OnOfflineDevice onlineForBB;
359     REG_CONNECT_CALLBACK(commBB, onlineForBB);
360     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
361     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
362 
363     /**
364      * @tc.steps: step4. device B alloc communicator BA using label A and register callback
365      * @tc.expected: step4. communicator AA has callback of device B online;
366      *                      communicator BA has callback of device A online;
367      *                      communicator BB no callback.
368      */
369     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
370     ASSERT_NOT_NULL_AND_ACTIVATE(commBA, "");
371     OnOfflineDevice onlineForBA;
372     REG_CONNECT_CALLBACK(commBA, onlineForBA);
373     std::this_thread::sleep_for(std::chrono::milliseconds(100));
374     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
375     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
376     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(1));
377     EXPECT_EQ(onlineForAA.latestOnlineDevice, DEVICE_NAME_B);
378     EXPECT_EQ(onlineForBA.latestOnlineDevice, DEVICE_NAME_A);
379 
380     /**
381      * @tc.steps: step5. device A alloc communicator AB using label B and register callback
382      * @tc.expected: step5. communicator AB has callback of device B online;
383      *                      communicator BB has callback of device A online;
384      */
385     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
386     ASSERT_NOT_NULL_AND_ACTIVATE(commAB, "");
387     OnOfflineDevice onlineForAB;
388     REG_CONNECT_CALLBACK(commAB, onlineForAB);
389     std::this_thread::sleep_for(std::chrono::milliseconds(100));
390     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
391     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
392     EXPECT_EQ(onlineForAB.latestOnlineDevice, DEVICE_NAME_B);
393     EXPECT_EQ(onlineForBB.latestOnlineDevice, DEVICE_NAME_A);
394 
395     /**
396      * @tc.steps: step6. device A release communicator AA
397      * @tc.expected: step6. communicator BA has callback of device A offline;
398      *                      communicator AB and BB no callback;
399      */
400     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
401     std::this_thread::sleep_for(std::chrono::milliseconds(100));
402     EXPECT_EQ(onlineForBA.onlineDevices.size(), static_cast<size_t>(0));
403     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
404     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
405     EXPECT_EQ(onlineForBA.latestOfflineDevice, DEVICE_NAME_A);
406 
407     /**
408      * @tc.steps: step7. device B release communicator BA
409      * @tc.expected: step7. communicator AB and BB no callback;
410      */
411     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
412     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(1));
413     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
414 
415     /**
416      * @tc.steps: step8. device B release communicator BB
417      * @tc.expected: step8. communicator AB has callback of device B offline;
418      */
419     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
420     std::this_thread::sleep_for(std::chrono::milliseconds(100));
421     EXPECT_EQ(onlineForAB.onlineDevices.size(), static_cast<size_t>(0));
422     EXPECT_EQ(onlineForAB.latestOfflineDevice, DEVICE_NAME_B);
423 
424     // Clean up
425     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
426     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
427 }
428 
TestRemoteRestart()429 void TestRemoteRestart()
430 {
431     /**
432      * @tc.steps: step1. connect device D with device E
433      */
434     EnvHandle envDeviceD;
435     EnvHandle envDeviceE;
436     SetUpEnv(envDeviceD, "DEVICE_D");
437     SetUpEnv(envDeviceE, "DEVICE_E");
438 
439     /**
440      * @tc.steps: step2. device D alloc communicator DD using label A and register callback
441      */
442     int errorNo = E_OK;
443     ICommunicator *commDD = envDeviceD.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
444     ASSERT_NOT_NULL_AND_ACTIVATE(commDD, "");
445     OnOfflineDevice onlineForDD;
446     REG_CONNECT_CALLBACK(commDD, onlineForDD);
447 
448     /**
449      * @tc.steps: step3. device E alloc communicator EE using label A and register callback
450      */
451     ICommunicator *commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
452     ASSERT_NOT_NULL_AND_ACTIVATE(commEE, "");
453     OnOfflineDevice onlineForEE;
454     REG_CONNECT_CALLBACK(commEE, onlineForEE);
455     /**
456      * @tc.steps: step4. deviceD connect to deviceE
457      * @tc.expected: step4. both communicator has callback;
458      */
459     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
460     std::this_thread::sleep_for(std::chrono::seconds(1));
461     EXPECT_EQ(onlineForDD.onlineDevices.size(), static_cast<size_t>(1));
462     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
463 
464     /**
465      * @tc.steps: step5. device E restart
466      */
467     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
468     std::this_thread::sleep_for(std::chrono::seconds(1));
469     TearDownEnv(envDeviceE);
470     SetUpEnv(envDeviceE, "DEVICE_E");
471 
472     commEE = envDeviceE.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
473     ASSERT_NOT_NULL_AND_ACTIVATE(commEE, "");
474     REG_CONNECT_CALLBACK(commEE, onlineForEE);
475     onlineForEE.onlineDevices.clear();
476     /**
477      * @tc.steps: step6. deviceD connect to deviceE again
478      * @tc.expected: step6. communicatorE has callback;
479      */
480     AdapterStub::ConnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
481     int reTryTimes = 5;
482     while (onlineForEE.onlineDevices.size() != 1 && reTryTimes > 0) {
483         std::this_thread::sleep_for(std::chrono::seconds(1));
484         reTryTimes--;
485     }
486     EXPECT_EQ(onlineForEE.onlineDevices.size(), static_cast<size_t>(1));
487     // Clean up and disconnect
488     envDeviceD.commAggrHandle->ReleaseCommunicator(commDD);
489     envDeviceE.commAggrHandle->ReleaseCommunicator(commEE);
490     std::this_thread::sleep_for(std::chrono::seconds(1));
491 
492     AdapterStub::DisconnectAdapterStub(envDeviceD.adapterHandle, envDeviceE.adapterHandle);
493     TearDownEnv(envDeviceD);
494     TearDownEnv(envDeviceE);
495 }
496 /**
497  * @tc.name: Online And Offline 003
498  * @tc.desc: Test functionality triggered by remote restart
499  * @tc.type: FUNC
500  * @tc.require:
501  * @tc.author: zhangqiquan
502  */
503 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline003, TestSize.Level1)
504 {
505     TestRemoteRestart();
506 }
507 
508 /**
509  * @tc.name: Online And Offline 004
510  * @tc.desc: Test functionality triggered by remote restart with thread pool
511  * @tc.type: FUNC
512  * @tc.require:
513  * @tc.author: zhangqiquan
514  */
515 HWTEST_F(DistributedDBCommunicatorTest, OnlineAndOffline004, TestSize.Level1)
516 {
517     auto threadPool = std::make_shared<ThreadPoolTestStub>();
518     RuntimeContext::GetInstance()->SetThreadPool(threadPool);
519     TestRemoteRestart();
520     RuntimeContext::GetInstance()->SetThreadPool(nullptr);
521 }
522 
523 /**
524  * @tc.name: Report Device Connect Change 001
525  * @tc.desc: Test CommunicatorAggregator support report device connect change event
526  * @tc.type: FUNC
527  * @tc.require:
528  * @tc.author: xiaozhenjian
529  */
530 HWTEST_F(DistributedDBCommunicatorTest, ReportDeviceConnectChange001, TestSize.Level1)
531 {
532     /**
533      * @tc.steps: step1. device A and device B register connect callback to CommunicatorAggregator
534      */
535     OnOfflineDevice onlineForA;
536     int errCode = g_envDeviceA.commAggrHandle->RegOnConnectCallback(
__anon9f0e15bf0502(const std::string &target, bool isConnect) 537         [&onlineForA](const std::string &target, bool isConnect) {
538             HandleConnectChange(onlineForA, target, isConnect);
539         }, nullptr);
540     EXPECT_EQ(errCode, E_OK);
541     OnOfflineDevice onlineForB;
542     errCode = g_envDeviceB.commAggrHandle->RegOnConnectCallback(
__anon9f0e15bf0602(const std::string &target, bool isConnect) 543         [&onlineForB](const std::string &target, bool isConnect) {
544             HandleConnectChange(onlineForB, target, isConnect);
545         }, nullptr);
546     EXPECT_EQ(errCode, E_OK);
547 
548     /**
549      * @tc.steps: step2. connect device A with device B
550      * @tc.expected: step2. device A callback B online; device B callback A online;
551      */
552     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
553     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
554     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(1));
555     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(1));
556     EXPECT_EQ(onlineForA.latestOnlineDevice, DEVICE_NAME_B);
557     EXPECT_EQ(onlineForB.latestOnlineDevice, DEVICE_NAME_A);
558 
559     /**
560      * @tc.steps: step3. connect device A with device B
561      * @tc.expected: step3. device A callback B offline; device B callback A offline;
562      */
563     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
564     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
565     EXPECT_EQ(onlineForA.onlineDevices.size(), static_cast<size_t>(0));
566     EXPECT_EQ(onlineForB.onlineDevices.size(), static_cast<size_t>(0));
567     EXPECT_EQ(onlineForA.latestOfflineDevice, DEVICE_NAME_B);
568     EXPECT_EQ(onlineForB.latestOfflineDevice, DEVICE_NAME_A);
569 
570     // Clean up
571     g_envDeviceA.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
572     g_envDeviceB.commAggrHandle->RegOnConnectCallback(nullptr, nullptr);
573 }
574 
575 namespace {
ToLabelType(uint64_t commLabel)576 LabelType ToLabelType(uint64_t commLabel)
577 {
578     uint64_t netOrderLabel = HostToNet(commLabel);
579     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
580     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
581     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
582         realLabel[i] = eachByte[i];
583     }
584     return realLabel;
585 }
586 }
587 
588 /**
589  * @tc.name: Report Communicator Not Found 001
590  * @tc.desc: Test CommunicatorAggregator support report communicator not found event
591  * @tc.type: FUNC
592  * @tc.require:
593  * @tc.author: xiaozhenjian
594  */
595 HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.Level1)
596 {
597     /**
598      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
599      */
600     std::vector<LabelType> lackLabels;
601     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon9f0e15bf0802(const LabelType &commLabel, const std::string &userId)602         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
603             lackLabels.push_back(commLabel);
604             return -E_NOT_FOUND;
605         }, nullptr);
606     EXPECT_EQ(errCode, E_OK);
607 
608     /**
609      * @tc.steps: step2. connect device A with device B
610      */
611     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
612     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
613 
614     /**
615      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
616      * @tc.expected: step3. device B callback that label A not found.
617      */
618     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
619     ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
620     Message *msgForAA = BuildRegedTinyMessage();
621     ASSERT_NE(msgForAA, nullptr);
622     SendConfig conf = {true, false, true, 0};
623     errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
624     EXPECT_EQ(errCode, E_OK);
625     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
626     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
627     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
628 
629     /**
630      * @tc.steps: step4. device B alloc communicator BA using label A and register message callback
631      * @tc.expected: step4. communicator BA will not receive message.
632      */
633     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
634     ASSERT_NE(commBA, nullptr);
635     Message *recvMsgForBA = nullptr;
__anon9f0e15bf0902(const std::string &srcTarget, Message *inMsg) 636     commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
637         recvMsgForBA = inMsg;
638         return E_OK;
639     }, nullptr);
640     commBA->Activate(USER_ID_1);
641     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
642     EXPECT_EQ(recvMsgForBA, nullptr);
643 
644     // Clean up
645     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
646     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
647     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
648     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
649 }
650 
651 #define DO_SEND_MESSAGE(src, dst, label, session) \
652 { \
653     Message *msgFor##src##label = BuildRegedTinyMessage(); \
654     ASSERT_NE(msgFor##src##label, nullptr); \
655     msgFor##src##label->SetSessionId(session); \
656     SendConfig conf = {true, false, true, 0}; \
657     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
658     EXPECT_EQ(errCode, E_OK); \
659 }
660 
661 #define DO_SEND_GIANT_MESSAGE(src, dst, label, size) \
662 { \
663     Message *msgFor##src##label = BuildRegedGiantMessage(size); \
664     ASSERT_NE(msgFor##src##label, nullptr); \
665     SendConfig conf = {false, false, true, 0}; \
666     errCode = comm##src##label->SendMessage(DEVICE_NAME_##dst, msgFor##src##label, conf); \
667     EXPECT_EQ(errCode, E_OK); \
668 }
669 
670 #define ALLOC_AND_SEND_MESSAGE(src, dst, label, session) \
671     ICommunicator *comm##src##label = g_envDevice##src.commAggrHandle->AllocCommunicator(LABEL_##label, errCode); \
672     ASSERT_NOT_NULL_AND_ACTIVATE(comm##src##label, ""); \
673     DO_SEND_MESSAGE(src, dst, label, session)
674 
675 #define REG_MESSAGE_CALLBACK(src, label) \
676     string srcTargetFor##src##label; \
677     Message *recvMsgFor##src##label = nullptr; \
678     comm##src##label->RegOnMessageCallback( \
679         [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
680         srcTargetFor##src##label = srcTarget; \
681         recvMsgFor##src##label = inMsg; \
682         return E_OK; \
683     }, nullptr);
684 
685 /**
686  * @tc.name: ReDeliver Message 001
687  * @tc.desc: Test CommunicatorAggregator support redeliver message
688  * @tc.type: FUNC
689  * @tc.require:
690  * @tc.author: xiaozhenjian
691  */
692 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage001, TestSize.Level1)
693 {
694     /**
695      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
696      */
697     std::vector<LabelType> lackLabels;
698     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(
__anon9f0e15bf0a02(const LabelType &commLabel, const std::string &userId)699         [&lackLabels](const LabelType &commLabel, const std::string &userId)->int {
700             lackLabels.push_back(commLabel);
701             return E_OK;
702         }, nullptr);
703     EXPECT_EQ(errCode, E_OK);
704 
705     /**
706      * @tc.steps: step2. connect device A with device B
707      */
708     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
709     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
710 
711     /**
712      * @tc.steps: step3. device A alloc communicator AA using label A and send message to B
713      * @tc.expected: step3. device B callback that label A not found.
714      */
715     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
716     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
717     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(1));
718     EXPECT_EQ(lackLabels[0], ToLabelType(LABEL_A));
719 
720     /**
721      * @tc.steps: step4. device A alloc communicator AB using label B and send message to B
722      * @tc.expected: step4. device B callback that label B not found.
723      */
724     ALLOC_AND_SEND_MESSAGE(A, B, B, 200); // session id 200
725     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
726     ASSERT_EQ(lackLabels.size(), static_cast<size_t>(2));
727     EXPECT_EQ(lackLabels[1], ToLabelType(LABEL_B)); // 1 for second element
728 
729     /**
730      * @tc.steps: step5. device B alloc communicator BA using label A and register message callback
731      * @tc.expected: step5. communicator BA will receive message.
732      */
733     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
734     ASSERT_NE(commBA, nullptr);
735     REG_MESSAGE_CALLBACK(B, A);
736     commBA->Activate();
737     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
738     EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
739     ASSERT_NE(recvMsgForBA, nullptr);
740     EXPECT_EQ(recvMsgForBA->GetSessionId(), 100U); // session id 100
741     delete recvMsgForBA;
742     recvMsgForBA = nullptr;
743 
744     /**
745      * @tc.steps: step6. device B alloc communicator BB using label B and register message callback
746      * @tc.expected: step6. communicator BB will receive message.
747      */
748     ICommunicator *commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
749     ASSERT_NE(commBB, nullptr);
750     REG_MESSAGE_CALLBACK(B, B);
751     commBB->Activate();
752     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
753     EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
754     ASSERT_NE(recvMsgForBB, nullptr);
755     EXPECT_EQ(recvMsgForBB->GetSessionId(), 200U); // session id 200
756     delete recvMsgForBB;
757     recvMsgForBB = nullptr;
758 
759     // Clean up
760     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
761     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
762     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
763     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
764     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
765     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
766 }
767 
768 /**
769  * @tc.name: ReDeliver Message 002
770  * @tc.desc: Test CommunicatorAggregator support redeliver message by order
771  * @tc.type: FUNC
772  * @tc.require:
773  * @tc.author: xiaozhenjian
774  */
775 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
776 {
777     /**
778      * @tc.steps: step1. device C create CommunicatorAggregator and initialize
779      */
780     bool step1 = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
781     ASSERT_EQ(step1, true);
782 
783     /**
784      * @tc.steps: step2. device B register communicator not found callback to CommunicatorAggregator
785      */
786     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon9f0e15bf0b02(const LabelType &commLabel, const std::string &userId)787         const std::string &userId)->int {
788         return E_OK;
789     }, nullptr);
790     EXPECT_EQ(errCode, E_OK);
791 
792     /**
793      * @tc.steps: step3. connect device A with device B, then device B with device C
794      */
795     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
796     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
797     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
798 
799     /**
800      * @tc.steps: step4. device A alloc communicator AA using label A and send message to B
801      */
802     ALLOC_AND_SEND_MESSAGE(A, B, A, 100); // session id 100
803     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
804 
805     /**
806      * @tc.steps: step5. device C alloc communicator CA using label A and send message to B
807      */
808     ALLOC_AND_SEND_MESSAGE(C, B, A, 200); // session id 200
809     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
810     DO_SEND_MESSAGE(A, B, A, 300); // session id 300
811     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
812     DO_SEND_MESSAGE(C, B, A, 400); // session id 400
813     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
814 
815     /**
816      * @tc.steps: step6. device B alloc communicator BA using label A and register message callback
817      * @tc.expected: step6. communicator BA will receive message in order of sessionid 100, 200, 300, 400.
818      */
819     ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
820     ASSERT_NE(commBA, nullptr);
821     std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
__anon9f0e15bf0c02(const std::string &srcTarget, Message *inMsg) 822     commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
823         msgCallbackForBA.push_back({srcTarget, inMsg});
824         return E_OK;
825     }, nullptr);
826     commBA->Activate();
827     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
828     ASSERT_EQ(msgCallbackForBA.size(), static_cast<size_t>(4)); // total 4 callback
829     EXPECT_EQ(msgCallbackForBA[0].first, DEVICE_NAME_A); // the 0 order element
830     EXPECT_EQ(msgCallbackForBA[1].first, DEVICE_NAME_C); // the 1 order element
831     EXPECT_EQ(msgCallbackForBA[2].first, DEVICE_NAME_A); // the 2 order element
832     EXPECT_EQ(msgCallbackForBA[3].first, DEVICE_NAME_C); // the 3 order element
833     for (uint32_t i = 0; i < msgCallbackForBA.size(); i++) {
834         EXPECT_EQ(msgCallbackForBA[i].second->GetSessionId(), static_cast<uint32_t>((i + 1) * 100)); // 1 sessionid 100
835         delete msgCallbackForBA[i].second;
836         msgCallbackForBA[i].second = nullptr;
837     }
838 
839     // Clean up
840     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
841     g_envDeviceC.commAggrHandle->ReleaseCommunicator(commCA);
842     g_envDeviceB.commAggrHandle->ReleaseCommunicator(commBA);
843     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
844     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
845     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
846     TearDownEnv(g_envDeviceC);
847 }
848 
849 /**
850  * @tc.name: ReDeliver Message 003
851  * @tc.desc: For observe memory in unusual scenario
852  * @tc.type: FUNC
853  * @tc.require:
854  * @tc.author: xiaozhenjian
855  */
856 HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage003, TestSize.Level2)
857 {
858     /**
859      * @tc.steps: step1. device B register communicator not found callback to CommunicatorAggregator
860      */
861     int errCode = g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback([](const LabelType &commLabel,
__anon9f0e15bf0d02(const LabelType &commLabel, const std::string &userId)862         const std::string &userId)->int {
863         return E_OK;
864     }, nullptr);
865     EXPECT_EQ(errCode, E_OK);
866 
867     /**
868      * @tc.steps: step2. connect device A with device B
869      */
870     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
871     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
872 
873     /**
874      * @tc.steps: step3. device A alloc communicator AA,AB,AC using label A,B,C
875      */
876     ICommunicator *commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
877     ASSERT_NOT_NULL_AND_ACTIVATE(commAA, "");
878     ICommunicator *commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errCode);
879     ASSERT_NOT_NULL_AND_ACTIVATE(commAB, "");
880     ICommunicator *commAC = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_C, errCode);
881     ASSERT_NOT_NULL_AND_ACTIVATE(commAC, "");
882     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
883 
884     /**
885      * @tc.steps: step4. device A Continuously send tiny message to B using communicator AA,AB,AC
886      */
887     for (int turn = 0; turn < 11; turn++) { // Total 11 turns
888         DO_SEND_MESSAGE(A, B, A, 0);
889         DO_SEND_MESSAGE(A, B, B, 0);
890         DO_SEND_MESSAGE(A, B, C, 0);
891     }
892 
893     /**
894      * @tc.steps: step5. device A Continuously send giant message to B using communicator AA,AB,AC
895      */
896     for (int turn = 0; turn < 5; turn++) { // Total 5 turns
897         DO_SEND_GIANT_MESSAGE(A, B, A, (3 * 1024 * 1024)); // 3 MBytes, 1024 is scale
898         DO_SEND_GIANT_MESSAGE(A, B, B, (6 * 1024 * 1024)); // 6 MBytes, 1024 is scale
899         DO_SEND_GIANT_MESSAGE(A, B, C, (7 * 1024 * 1024)); // 7 MBytes, 1024 is scale
900     }
901     DO_SEND_GIANT_MESSAGE(A, B, A, (30 * 1024 * 1024)); // 30 MBytes, 1024 is scale
902 
903     /**
904      * @tc.steps: step6. wait a long time then send last frame
905      */
906     for (int sec = 0; sec < 15; sec++) { // Total 15 s
907         std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
908         LOGI("[UT][Test][ReDeliverMessage003] Sleep and wait=%d.", sec);
909     }
910     DO_SEND_MESSAGE(A, B, A, 0);
911     std::this_thread::sleep_for(std::chrono::seconds(1)); // Sleep 1 s
912 
913     // Clean up
914     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
915     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAB);
916     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAC);
917     g_envDeviceB.commAggrHandle->RegCommunicatorLackCallback(nullptr, nullptr);
918     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
919 }
920 
921 namespace {
SetUpEnv(const std::string & localDev,const std::string & supportDev,const std::shared_ptr<DBStatusAdapter> & adapter,bool isSupport,EnvHandle & envDevice)922     void SetUpEnv(const std::string &localDev, const std::string &supportDev,
923         const std::shared_ptr<DBStatusAdapter> &adapter, bool isSupport, EnvHandle &envDevice)
924     {
925         std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
926         adapter->SetDBInfoHandle(handle);
927         adapter->SetRemoteOptimizeCommunication(supportDev, isSupport);
928         SetUpEnv(envDevice, localDev, adapter);
929     }
930 
InitCommunicator(DBInfo & dbInfo,EnvHandle & envDevice,OnOfflineDevice & onlineCallback,ICommunicator * & comm)931     void InitCommunicator(DBInfo &dbInfo, EnvHandle &envDevice, OnOfflineDevice &onlineCallback, ICommunicator *&comm)
932     {
933         dbInfo.userId = USER_ID;
934         dbInfo.appId = APP_ID;
935         dbInfo.storeId = STORE_ID_1;
936         dbInfo.isNeedSync = true;
937         dbInfo.syncDualTupleMode = false;
938         std::string label = DBCommon::GenerateHashLabel(dbInfo);
939         std::vector<uint8_t> commLabel(label.begin(), label.end());
940         int errorNo = E_OK;
941         comm = envDevice.commAggrHandle->AllocCommunicator(commLabel, errorNo);
942         ASSERT_NOT_NULL_AND_ACTIVATE(comm, "");
943         REG_CONNECT_CALLBACK(comm, onlineCallback);
944     }
945 }
946 
947 /**
948   * @tc.name: CommunicationOptimization001
949   * @tc.desc: Test notify with isSupport true.
950   * @tc.type: FUNC
951   * @tc.require:
952   * @tc.author: zhangqiquan
953   */
954 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization001, TestSize.Level3)
955 {
956     auto *pAggregator = new VirtualCommunicatorAggregator();
957     ASSERT_NE(pAggregator, nullptr);
958     const std::string deviceA = "DEVICES_A";
959     const std::string deviceB = "DEVICES_B";
960     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
961     /**
962      * @tc.steps: step1. set up env
963      */
964     EnvHandle envDeviceA;
965     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
966     SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
967 
968     EnvHandle envDeviceB;
969     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
970     SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
971 
972     /**
973      * @tc.steps: step2. device alloc communicator using label and register callback
974      */
975     DBInfo dbInfo;
976     ICommunicator *commAA = nullptr;
977     OnOfflineDevice onlineForAA;
978     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
979 
980     ICommunicator *commBB = nullptr;
981     OnOfflineDevice onlineForBB;
982     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
983 
984     /**
985      * @tc.steps: step3. connect device A with device B
986      */
987     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
988 
989     /**
990      * @tc.steps: step4. wait for label exchange
991      * @tc.expected: step4. both communicator has no callback;
992      */
993     std::this_thread::sleep_for(std::chrono::seconds(1));
994     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
995     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
996 
997     /**
998      * @tc.steps: step5. both notify
999      * @tc.expected: step5. both has callback;
1000      */
1001     RuntimeConfig::NotifyDBInfos({ deviceB }, { dbInfo });
1002     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1003     adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
1004     std::this_thread::sleep_for(std::chrono::seconds(1));
1005 
1006     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1007 
1008     dbInfo.isNeedSync = false;
1009     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1010     adapterB->NotifyDBInfos({ deviceA }, { dbInfo });
1011     std::this_thread::sleep_for(std::chrono::seconds(1));
1012     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1013 
1014     // Clean up and disconnect
1015     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1016     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1017     std::this_thread::sleep_for(std::chrono::seconds(1));
1018 
1019     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1020 
1021     TearDownEnv(envDeviceA);
1022     TearDownEnv(envDeviceB);
1023     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1024 }
1025 
1026 /**
1027   * @tc.name: CommunicationOptimization002
1028   * @tc.desc: Test notify with isSupport true and can offline by device change.
1029   * @tc.type: FUNC
1030   * @tc.require:
1031   * @tc.author: zhangqiquan
1032   */
1033 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization002, TestSize.Level3)
1034 {
1035     auto *pAggregator = new VirtualCommunicatorAggregator();
1036     ASSERT_NE(pAggregator, nullptr);
1037     const std::string deviceA = "DEVICES_A";
1038     const std::string deviceB = "DEVICES_B";
1039     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1040     /**
1041      * @tc.steps: step1. set up env
1042      */
1043     EnvHandle envDeviceA;
1044     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1045     SetUpEnv(deviceA, deviceB, adapterA, true, envDeviceA);
1046 
1047     EnvHandle envDeviceB;
1048     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1049     SetUpEnv(deviceB, deviceA, adapterB, true, envDeviceB);
1050 
1051     /**
1052      * @tc.steps: step2. device alloc communicator using label and register callback
1053      */
1054     DBInfo dbInfo;
1055     ICommunicator *commAA = nullptr;
1056     OnOfflineDevice onlineForAA;
1057     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1058     /**
1059      * @tc.steps: step3. connect device A with device B
1060      */
1061     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1062     std::this_thread::sleep_for(std::chrono::seconds(1));
1063 
1064     /**
1065      * @tc.steps: step5. A notify remote
1066      * @tc.expected: step5. A has callback;
1067      */
1068     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1069     std::this_thread::sleep_for(std::chrono::seconds(1));
1070     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(1));
1071 
1072     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1073     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1074 
1075     // Clean up and disconnect
1076     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1077     std::this_thread::sleep_for(std::chrono::seconds(1));
1078 
1079     TearDownEnv(envDeviceA);
1080     TearDownEnv(envDeviceB);
1081     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1082 }
1083 
1084 /**
1085   * @tc.name: CommunicationOptimization003
1086   * @tc.desc: Test notify with isSupport false.
1087   * @tc.type: FUNC
1088   * @tc.require:
1089   * @tc.author: zhangqiquan
1090   */
1091 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization003, TestSize.Level3)
1092 {
1093     auto *pAggregator = new VirtualCommunicatorAggregator();
1094     ASSERT_NE(pAggregator, nullptr);
1095     const std::string deviceA = "DEVICES_A";
1096     const std::string deviceB = "DEVICES_B";
1097     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1098     /**
1099      * @tc.steps: step1. set up env
1100      */
1101     EnvHandle envDeviceA;
1102     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1103     SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1104     EnvHandle envDeviceB;
1105     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1106     SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1107     /**
1108      * @tc.steps: step2. device alloc communicator using label and register callback
1109      */
1110     DBInfo dbInfo;
1111     ICommunicator *commAA = nullptr;
1112     OnOfflineDevice onlineForAA;
1113     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1114     ICommunicator *commBB = nullptr;
1115     OnOfflineDevice onlineForBB;
1116     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1117     /**
1118      * @tc.steps: step3. connect device A with device B
1119      */
1120     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1121     /**
1122      * @tc.steps: step4. wait for label exchange
1123      * @tc.expected: step4. both communicator has no callback;
1124      */
1125     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1126     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1127     /**
1128      * @tc.steps: step5. A notify remote
1129      * @tc.expected: step5. B has no callback;
1130      */
1131     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1132     std::this_thread::sleep_for(std::chrono::seconds(1));
1133     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(0));
1134     /**
1135      * @tc.steps: step6. A notify local
1136      * @tc.expected: step6. B has no callback;
1137      */
1138     dbInfo.isNeedSync = false;
1139     onlineForAA.onlineDevices.clear();
1140     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1141     std::this_thread::sleep_for(std::chrono::seconds(1));
1142     EXPECT_EQ(onlineForAA.onlineDevices.size(), static_cast<size_t>(0));
1143     // Clean up and disconnect
1144     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1145     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1146     std::this_thread::sleep_for(std::chrono::seconds(1));
1147     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1148     TearDownEnv(envDeviceA);
1149     TearDownEnv(envDeviceB);
1150     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1151 }
1152 
1153 /**
1154   * @tc.name: CommunicationOptimization004
1155   * @tc.desc: Test notify with isSupport false and it be will changed by communication.
1156   * @tc.type: FUNC
1157   * @tc.require:
1158   * @tc.author: zhangqiquan
1159   */
1160 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization004, TestSize.Level3)
1161 {
1162     const std::string deviceA = "DEVICES_A";
1163     const std::string deviceB = "DEVICES_B";
1164     /**
1165      * @tc.steps: step1. set up env
1166      */
1167     EnvHandle envDeviceA;
1168     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1169     SetUpEnv(deviceA, deviceB, adapterA, false, envDeviceA);
1170     RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1171     EnvHandle envDeviceB;
1172     std::shared_ptr<DBStatusAdapter> adapterB = std::make_shared<DBStatusAdapter>();
1173     SetUpEnv(deviceB, deviceA, adapterB, false, envDeviceB);
1174     /**
1175      * @tc.steps: step2. device alloc communicator using label and register callback
1176      */
1177     DBInfo dbInfo;
1178     ICommunicator *commAA = nullptr;
1179     OnOfflineDevice onlineForAA;
1180     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1181     ICommunicator *commBB = nullptr;
1182     OnOfflineDevice onlineForBB;
1183     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1184     /**
1185      * @tc.steps: step3. connect device A with device B
1186      */
1187     EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1188     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1189     std::this_thread::sleep_for(std::chrono::seconds(1));
1190     EXPECT_EQ(adapterA->IsSupport(deviceB), true);
1191     // Clean up and disconnect
1192     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1193     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1194     std::this_thread::sleep_for(std::chrono::seconds(1));
1195     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1196     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1197     envDeviceA.commAggrHandle = nullptr;
1198     TearDownEnv(envDeviceA);
1199     TearDownEnv(envDeviceB);
1200 }
1201 
1202 /**
1203   * @tc.name: CommunicationOptimization005
1204   * @tc.desc: Test notify with isSupport false and send label exchange.
1205   * @tc.type: FUNC
1206   * @tc.require:
1207   * @tc.author: zhangqiquan
1208   */
1209 HWTEST_F(DistributedDBCommunicatorTest, CommunicationOptimization005, TestSize.Level3)
1210 {
1211     const std::string deviceA = "DEVICES_A";
1212     const std::string deviceB = "DEVICES_B";
1213     /**
1214      * @tc.steps: step1. set up env
1215      */
1216     EnvHandle envDeviceA;
1217     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1218     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1219     handle->SetLocalIsSupport(false);
1220     adapterA->SetDBInfoHandle(handle);
1221     SetUpEnv(envDeviceA, deviceA, adapterA);
1222     RuntimeContext::GetInstance()->SetCommunicatorAggregator(envDeviceA.commAggrHandle);
1223     EnvHandle envDeviceB;
1224     SetUpEnv(envDeviceB, deviceB, nullptr);
1225     /**
1226      * @tc.steps: step2. connect device A with device B
1227      */
1228     EXPECT_EQ(adapterA->IsSupport(deviceB), false);
1229     AdapterStub::ConnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1230     std::this_thread::sleep_for(std::chrono::seconds(1));
1231     /**
1232      * @tc.steps: step3. device alloc communicator using label and register callback
1233      */
1234     DBInfo dbInfo;
1235     ICommunicator *commAA = nullptr;
1236     OnOfflineDevice onlineForAA;
1237     InitCommunicator(dbInfo, envDeviceA, onlineForAA, commAA);
1238     ICommunicator *commBB = nullptr;
1239     OnOfflineDevice onlineForBB;
1240     InitCommunicator(dbInfo, envDeviceB, onlineForBB, commBB);
1241     std::this_thread::sleep_for(std::chrono::seconds(1));
1242     EXPECT_EQ(onlineForBB.onlineDevices.size(), static_cast<size_t>(1));
1243 
1244     // Clean up and disconnect
1245     envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1246     envDeviceB.commAggrHandle->ReleaseCommunicator(commBB);
1247     std::this_thread::sleep_for(std::chrono::seconds(1));
1248     AdapterStub::DisconnectAdapterStub(envDeviceA.adapterHandle, envDeviceB.adapterHandle);
1249     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1250     envDeviceA.commAggrHandle = nullptr;
1251     TearDownEnv(envDeviceA);
1252     TearDownEnv(envDeviceB);
1253 }
1254 
1255 /**
1256   * @tc.name: DbStatusAdapter001
1257   * @tc.desc: Test notify with isSupport false.
1258   * @tc.type: FUNC
1259   * @tc.require:
1260   * @tc.author: zhangqiquan
1261   */
1262 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter001, TestSize.Level1)
1263 {
1264     auto *pAggregator = new VirtualCommunicatorAggregator();
1265     ASSERT_NE(pAggregator, nullptr);
1266     const std::string deviceA = "DEVICES_A";
1267     const std::string deviceB = "DEVICES_B";
1268     RuntimeContext::GetInstance()->SetCommunicatorAggregator(pAggregator);
1269 
1270     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1271     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1272     adapterA->SetDBInfoHandle(handle);
1273     adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1274     std::string actualRemoteDevInfo;
1275     size_t remoteInfoCount = 0u;
1276     size_t localCount = 0u;
1277     DBInfo dbInfo;
1278     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1279 
1280     dbInfo = {
1281         USER_ID,
1282         APP_ID,
1283         STORE_ID_1,
1284         true,
1285         false
1286     };
1287     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1288     dbInfo.isNeedSync = false;
1289     adapterA->NotifyDBInfos({ deviceA }, { dbInfo });
1290     adapterA->NotifyDBInfos({ deviceB }, { dbInfo });
1291 
1292     size_t remoteChangeCount = 0u;
1293     adapterA->SetDBStatusChangeCallback(
__anon9f0e15bf0f02(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1294         [&actualRemoteDevInfo, &remoteInfoCount](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
1295             actualRemoteDevInfo = devInfo;
1296             remoteInfoCount = dbInfos.size();
1297         },
__anon9f0e15bf1002() 1298         [&localCount]() {
1299             localCount++;
1300         },
__anon9f0e15bf1102(const std::string &dev) 1301         [&remoteChangeCount, deviceB](const std::string &dev) {
1302             remoteChangeCount++;
1303             EXPECT_EQ(dev, deviceB);
1304         });
1305     std::this_thread::sleep_for(std::chrono::seconds(1));
1306     EXPECT_EQ(actualRemoteDevInfo, deviceB);
1307     EXPECT_EQ(remoteInfoCount, 1u);
1308     adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1309     std::this_thread::sleep_for(std::chrono::seconds(1));
1310     EXPECT_EQ(remoteChangeCount, 1u);
1311     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
1312 }
1313 
1314 /**
1315   * @tc.name: DbStatusAdapter002
1316   * @tc.desc: Test adapter clear cache.
1317   * @tc.type: FUNC
1318   * @tc.require:
1319   * @tc.author: zhangqiquan
1320   */
1321 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter002, TestSize.Level1) {
1322     const std::string deviceB = "DEVICES_B";
1323     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1324     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1325     adapterA->SetDBInfoHandle(handle);
1326     adapterA->SetRemoteOptimizeCommunication(deviceB, true);
1327     EXPECT_TRUE(adapterA->IsSupport(deviceB));
1328     adapterA->SetDBInfoHandle(handle);
1329     adapterA->SetRemoteOptimizeCommunication(deviceB, false);
1330     EXPECT_FALSE(adapterA->IsSupport(deviceB));
1331 }
1332 
1333 /**
1334   * @tc.name: DbStatusAdapter003
1335   * @tc.desc: Test adapter get local dbInfo.
1336   * @tc.type: FUNC
1337   * @tc.require:
1338   * @tc.author: zhangqiquan
1339   */
1340 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter003, TestSize.Level1) {
1341     const std::string deviceB = "DEVICES_B";
1342     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1343     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1344     adapterA->SetDBInfoHandle(handle);
1345     handle->SetLocalIsSupport(true);
1346     std::vector<DBInfo> dbInfos;
1347     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1348     handle->SetLocalIsSupport(false);
1349     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), E_OK);
1350     adapterA->SetDBInfoHandle(handle);
1351     EXPECT_EQ(adapterA->GetLocalDBInfos(dbInfos), -E_NOT_SUPPORT);
1352 }
1353 
1354 /**
1355   * @tc.name: DbStatusAdapter004
1356   * @tc.desc: Test adapter clear cache will get callback.
1357   * @tc.type: FUNC
1358   * @tc.require:
1359   * @tc.author: zhangqiquan
1360   */
1361 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter004, TestSize.Level1)
1362 {
1363     const std::string deviceB = "DEVICES_B";
1364     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1365     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1366     adapterA->SetDBInfoHandle(handle);
1367     handle->SetLocalIsSupport(true);
1368     std::vector<DBInfo> dbInfos;
1369     DBInfo dbInfo = {
1370         USER_ID,
1371         APP_ID,
1372         STORE_ID_1,
1373         true,
1374         true
1375     };
1376     dbInfos.push_back(dbInfo);
1377     dbInfo.storeId = STORE_ID_2;
1378     dbInfos.push_back(dbInfo);
1379     dbInfo.storeId = STORE_ID_3;
1380     dbInfo.isNeedSync = false;
1381     dbInfos.push_back(dbInfo);
1382     adapterA->NotifyDBInfos({"dev"}, dbInfos);
1383     std::this_thread::sleep_for(std::chrono::seconds(1));
1384     size_t notifyCount = 0u;
1385     adapterA->SetDBStatusChangeCallback([&notifyCount](const std::string &devInfo,
__anon9f0e15bf1202(const std::string &devInfo, const std::vector<DBInfo> &dbInfos) 1386         const std::vector<DBInfo> &dbInfos) {
1387         LOGD("on callback");
1388         for (const auto &dbInfo: dbInfos) {
1389             EXPECT_FALSE(dbInfo.isNeedSync);
1390         }
1391         notifyCount = dbInfos.size();
1392     }, nullptr, nullptr);
1393     adapterA->SetDBInfoHandle(nullptr);
1394     EXPECT_EQ(notifyCount, 2u); // 2 dbInfo is need sync now it should be offline
1395 }
1396 
1397 /**
1398   * @tc.name: DbStatusAdapter005
1399   * @tc.desc: Test adapter is need auto sync.
1400   * @tc.type: FUNC
1401   * @tc.require:
1402   * @tc.author: zhangqiquan
1403   */
1404 HWTEST_F(DistributedDBCommunicatorTest, DbStatusAdapter005, TestSize.Level1)
1405 {
1406     const std::string deviceB = "DEVICES_B";
1407     std::shared_ptr<DBStatusAdapter> adapterA = std::make_shared<DBStatusAdapter>();
1408     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1409     adapterA->SetDBInfoHandle(handle);
1410     handle->SetLocalIsSupport(true);
1411     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1412     handle->SetNeedAutoSync(false);
1413     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1414     handle->SetLocalIsSupport(false);
1415     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), false);
1416     adapterA->SetDBInfoHandle(handle);
1417     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1418     adapterA->SetDBInfoHandle(nullptr);
1419     EXPECT_EQ(adapterA->IsNeedAutoSync(USER_ID, APP_ID, STORE_ID_1, deviceB), true);
1420 }
1421 
1422 /**
1423   * @tc.name: AbnormalCommunicatorTest001
1424   * @tc.desc: Test communicator abnormal.
1425   * @tc.type: FUNC
1426   * @tc.require:
1427   * @tc.author: caihaoting
1428   */
1429 HWTEST_F(DistributedDBCommunicatorTest, AbnormalCommunicatorTest001, TestSize.Level1)
1430 {
1431     ICommunicator *commAA = nullptr;
1432     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1433     EXPECT_EQ(commAA, nullptr);
1434     int errCode = E_OK;
1435     commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
1436     SendConfig conf = {true, false, true, 0};
1437     Message *msgForAA = nullptr;
1438     errCode = commAA->SendMessage(DEVICE_NAME_A, msgForAA, conf);
1439     EXPECT_NE(errCode, E_OK);
1440     msgForAA = BuildRegedTinyMessage();
1441     ASSERT_NE(msgForAA, nullptr);
1442     errCode = commAA->SendMessage("", msgForAA, conf);
1443     EXPECT_NE(errCode, E_OK);
1444     errCode = commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
1445     EXPECT_EQ(errCode, E_OK);
1446     EXPECT_EQ(commAA->GetTargetUserId({}), std::string(""));
1447     g_envDeviceA.commAggrHandle->ClearOnlineLabel();
1448     std::vector<uint8_t> label;
1449     auto aggregator = std::make_shared<CommunicatorAggregator>();
1450     ASSERT_NE(aggregator, nullptr);
1451     auto communicator = std::make_shared<Communicator>(aggregator.get(), label);
1452     ASSERT_NE(communicator, nullptr);
1453     EXPECT_EQ(communicator->GetTargetUserId({}), DBConstant::DEFAULT_USER);
1454     g_envDeviceA.commAggrHandle->ReleaseCommunicator(commAA);
1455 }
1456 
1457 /**
1458   * @tc.name: CheckInFragmentNoTest001
1459   * @tc.desc: Test CheckInFragmentNo func.
1460   * @tc.type: FUNC
1461   * @tc.require:
1462   * @tc.author: tiansimiao
1463   */
1464 HWTEST_F(DistributedDBCommunicatorTest, CheckInFragmentNoTest001, TestSize.Level1)
1465 {
1466     CombineStatus status;
1467     /**
1468      * @tc.steps: step1. set fragmentCount by inFragCount
1469      */
1470     uint16_t inFragCount = 5;
1471     status.SetFragmentCount(inFragCount);
1472     /**
1473      * @tc.steps: step2. make inFragNo greater than fragmentCount
1474      */
1475     uint16_t inFragNo = inFragCount + 1;
1476     status.CheckInFragmentNo(inFragNo);
1477     EXPECT_FALSE(status.IsFragNoAlreadyExist(inFragNo));
1478 }
1479 
1480 /**
1481   * @tc.name: GetRemoteCommunicatorVersionTest001
1482   * @tc.desc: Test GetRemoteCommunicatorVersion func.
1483   * @tc.type: FUNC
1484   * @tc.require:
1485   * @tc.author: tiansimiao
1486   */
1487 HWTEST_F(DistributedDBCommunicatorTest, GetRemoteCommunicatorVersionTest001, TestSize.Level1)
1488 {
1489     auto aggregator = std::make_unique<CommunicatorAggregator>();
1490     ASSERT_NE(aggregator, nullptr);
1491     uint16_t outVersion = 0;
1492     EXPECT_EQ(aggregator->GetRemoteCommunicatorVersion("notExistTarget", outVersion), -E_NOT_FOUND);
1493 }
1494 }