• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 #include <gmock/gmock.h>
17 #ifdef RUN_AS_ROOT
18 #include <sys/time.h>
19 #endif
20 #include <thread>
21 
22 #include "db_common.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "generic_single_ver_kv_entry.h"
25 #include "message.h"
26 #include "mock_auto_launch.h"
27 #include "mock_communicator.h"
28 #include "mock_meta_data.h"
29 #include "mock_remote_executor.h"
30 #include "mock_single_ver_data_sync.h"
31 #include "mock_single_ver_state_machine.h"
32 #include "mock_sync_engine.h"
33 #include "mock_sync_task_context.h"
34 #include "mock_time_sync.h"
35 #include "remote_executor_packet.h"
36 #include "single_ver_kv_syncer.h"
37 #include "single_ver_relational_sync_task_context.h"
38 #include "virtual_communicator_aggregator.h"
39 #include "virtual_single_ver_sync_db_Interface.h"
40 #ifdef DATA_SYNC_CHECK_003
41 #include "virtual_relational_ver_sync_db_interface.h"
42 #endif
43 
44 using namespace testing::ext;
45 using namespace testing;
46 using namespace DistributedDB;
47 using namespace DistributedDBUnitTest;
48 
49 namespace {
50 const uint32_t MESSAGE_COUNT = 10u;
51 const uint32_t EXECUTE_COUNT = 2u;
Init(MockSingleVerStateMachine & stateMachine,MockSyncTaskContext & syncTaskContext,MockCommunicator & communicator,VirtualSingleVerSyncDBInterface & dbSyncInterface)52 void Init(MockSingleVerStateMachine &stateMachine, MockSyncTaskContext &syncTaskContext,
53     MockCommunicator &communicator, VirtualSingleVerSyncDBInterface &dbSyncInterface)
54 {
55     std::shared_ptr<Metadata> metadata = std::make_shared<Metadata>();
56     (void)syncTaskContext.Initialize("device", &dbSyncInterface, metadata, &communicator);
57     (void)stateMachine.Initialize(&syncTaskContext, &dbSyncInterface, metadata, &communicator);
58 }
59 
Init(MockSingleVerStateMachine & stateMachine,MockSyncTaskContext * syncTaskContext,MockCommunicator & communicator,VirtualSingleVerSyncDBInterface * dbSyncInterface)60 void Init(MockSingleVerStateMachine &stateMachine, MockSyncTaskContext *syncTaskContext,
61     MockCommunicator &communicator, VirtualSingleVerSyncDBInterface *dbSyncInterface)
62 {
63     std::shared_ptr<Metadata> metadata = std::make_shared<Metadata>();
64     (void)syncTaskContext->Initialize("device", dbSyncInterface, metadata, &communicator);
65     (void)stateMachine.Initialize(syncTaskContext, dbSyncInterface, metadata, &communicator);
66 }
67 
68 #ifdef RUN_AS_ROOT
ChangeTime(int sec)69 void ChangeTime(int sec)
70 {
71     timeval time;
72     gettimeofday(&time, nullptr);
73     time.tv_sec += sec;
74     settimeofday(&time, nullptr);
75 }
76 #endif
77 
BuildRemoteQueryMsg(DistributedDB::Message * & message)78 int BuildRemoteQueryMsg(DistributedDB::Message *&message)
79 {
80     auto packet = RemoteExecutorRequestPacket::Create();
81     if (packet == nullptr) {
82         return -E_OUT_OF_MEMORY;
83     }
84     message = new (std::nothrow) DistributedDB::Message(static_cast<uint32_t>(MessageId::REMOTE_EXECUTE_MESSAGE));
85     if (message == nullptr) {
86         RemoteExecutorRequestPacket::Release(packet);
87         return -E_OUT_OF_MEMORY;
88     }
89     message->SetMessageType(TYPE_REQUEST);
90     packet->SetNeedResponse();
91     message->SetExternalObject(packet);
92     return E_OK;
93 }
94 
ConstructPacel(Parcel & parcel,uint32_t conditionCount,const std::string & key,const std::string & value)95 void ConstructPacel(Parcel &parcel, uint32_t conditionCount, const std::string &key, const std::string &value)
96 {
97     parcel.WriteUInt32(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V2); // version
98     parcel.WriteUInt32(1); // flag
99     parcel.WriteInt(1); // current_version
100     parcel.WriteInt(1); // opcode
101     parcel.WriteString("sql"); // sql
102     parcel.WriteInt(1); // bandArgs_
103     parcel.WriteString("condition");
104     parcel.EightByteAlign();
105 
106     parcel.WriteUInt32(conditionCount);
107     if (key.empty()) {
108         return;
109     }
110     parcel.WriteString(key);
111     parcel.WriteString(value);
112 }
113 }
114 
115 class DistributedDBMockSyncModuleTest : public testing::Test {
116 public:
117     static void SetUpTestCase(void);
118     static void TearDownTestCase(void);
119     void SetUp();
120     void TearDown();
121 };
122 
SetUpTestCase(void)123 void DistributedDBMockSyncModuleTest::SetUpTestCase(void)
124 {
125 }
126 
TearDownTestCase(void)127 void DistributedDBMockSyncModuleTest::TearDownTestCase(void)
128 {
129 }
130 
SetUp(void)131 void DistributedDBMockSyncModuleTest::SetUp(void)
132 {
133     DistributedDBToolsUnitTest::PrintTestCaseInfo();
134 }
135 
TearDown(void)136 void DistributedDBMockSyncModuleTest::TearDown(void)
137 {
138 }
139 
140 /**
141  * @tc.name: StateMachineCheck001
142  * @tc.desc: Test machine do timeout when has same timerId.
143  * @tc.type: FUNC
144  * @tc.require: AR000CCPOM
145  * @tc.author: zhangqiquan
146  */
147 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck001, TestSize.Level1)
148 {
149     MockSingleVerStateMachine stateMachine;
150     MockSyncTaskContext syncTaskContext;
151     MockCommunicator communicator;
152     VirtualSingleVerSyncDBInterface dbSyncInterface;
153     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
154 
155     TimerId expectId = 0;
156     TimerId actualId = expectId;
157     EXPECT_CALL(syncTaskContext, GetTimerId()).WillOnce(Return(expectId));
158     EXPECT_CALL(stateMachine, SwitchStateAndStep(_)).WillOnce(Return());
159 
160     stateMachine.CallStepToTimeout(actualId);
161 }
162 
163 /**
164  * @tc.name: StateMachineCheck002
165  * @tc.desc: Test machine do timeout when has diff timerId.
166  * @tc.type: FUNC
167  * @tc.require: AR000CCPOM
168  * @tc.author: zhangqiquan
169  */
170 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck002, TestSize.Level1)
171 {
172     MockSingleVerStateMachine stateMachine;
173     MockSyncTaskContext syncTaskContext;
174     MockCommunicator communicator;
175     VirtualSingleVerSyncDBInterface dbSyncInterface;
176     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
177 
178     TimerId expectId = 0;
179     TimerId actualId = 1;
180     EXPECT_CALL(syncTaskContext, GetTimerId()).WillOnce(Return(expectId));
181     EXPECT_CALL(stateMachine, SwitchStateAndStep(_)).Times(0);
182 
183     stateMachine.CallStepToTimeout(actualId);
184 }
185 
186 /**
187  * @tc.name: StateMachineCheck003
188  * @tc.desc: Test machine exec next task when queue not empty.
189  * @tc.type: FUNC
190  * @tc.require: AR000CCPOM
191  * @tc.author: zhangqiquan
192  */
193 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck003, TestSize.Level1)
194 {
195     MockSingleVerStateMachine stateMachine;
196     MockSyncTaskContext syncTaskContext;
197     MockCommunicator communicator;
198     VirtualSingleVerSyncDBInterface dbSyncInterface;
199     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
200 
201     EXPECT_CALL(stateMachine, PrepareNextSyncTask()).WillOnce(Return(E_OK));
202 
203     EXPECT_CALL(syncTaskContext, IsTargetQueueEmpty()).WillRepeatedly(Return(false));
204     EXPECT_CALL(syncTaskContext, MoveToNextTarget()).WillRepeatedly(Return());
205     EXPECT_CALL(syncTaskContext, IsCurrentSyncTaskCanBeSkipped())
206         .WillOnce(Return(true))
207         .WillOnce(Return(false));
208     // we expect machine don't change context status when queue not empty
209     EXPECT_CALL(syncTaskContext, SetOperationStatus(_)).WillOnce(Return());
210     EXPECT_CALL(syncTaskContext, SetTaskExecStatus(_)).Times(0);
211 
212     EXPECT_EQ(stateMachine.CallExecNextTask(), E_OK);
213 }
214 
215 /**
216  * @tc.name: StateMachineCheck004
217  * @tc.desc: Test machine deal time sync ack failed.
218  * @tc.type: FUNC
219  * @tc.require: AR000CCPOM
220  * @tc.author: zhangqiquan
221  */
222 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck004, TestSize.Level1)
223 {
224     MockSingleVerStateMachine stateMachine;
225     MockSyncTaskContext syncTaskContext;
226     MockCommunicator communicator;
227     VirtualSingleVerSyncDBInterface dbSyncInterface;
228     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
229 
230     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
231     ASSERT_NE(message, nullptr);
232     message->SetMessageType(TYPE_RESPONSE);
233     message->SetSessionId(1u);
234     EXPECT_CALL(syncTaskContext, GetRequestSessionId()).WillRepeatedly(Return(1u));
235     EXPECT_EQ(stateMachine.CallTimeMarkSyncRecv(message), -E_INVALID_ARGS);
236     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_INVALID_ARGS);
237     delete message;
238 }
239 
240 /**
241  * @tc.name: StateMachineCheck005
242  * @tc.desc: Test machine recv errCode.
243  * @tc.type: FUNC
244  * @tc.require: AR000CCPOM
245  * @tc.author: zhangqiquan
246  */
247 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck005, TestSize.Level1)
248 {
249     MockSingleVerStateMachine stateMachine;
250     MockSyncTaskContext syncTaskContext;
251     MockCommunicator communicator;
252     VirtualSingleVerSyncDBInterface dbSyncInterface;
253     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
254     EXPECT_CALL(stateMachine, SwitchStateAndStep(_)).WillRepeatedly(Return());
255     EXPECT_CALL(syncTaskContext, GetRequestSessionId()).WillRepeatedly(Return(0u));
256 
257     std::initializer_list<int> testCode = {-E_DISTRIBUTED_SCHEMA_CHANGED, -E_DISTRIBUTED_SCHEMA_NOT_FOUND};
258     for (int errCode : testCode) {
259         stateMachine.DataRecvErrCodeHandle(0, errCode);
260         EXPECT_EQ(syncTaskContext.GetTaskErrCode(), errCode);
261         stateMachine.CallDataAckRecvErrCodeHandle(errCode, true);
262         EXPECT_EQ(syncTaskContext.GetTaskErrCode(), errCode);
263     }
264 }
265 
266 /**
267  * @tc.name: StateMachineCheck006
268  * @tc.desc: Test machine exec next task when queue not empty to empty.
269  * @tc.type: FUNC
270  * @tc.require: AR000CCPOM
271  * @tc.author: zhangqiquan
272  */
273 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck006, TestSize.Level1)
274 {
275     MockSingleVerStateMachine stateMachine;
276     MockSyncTaskContext syncTaskContext;
277     MockCommunicator communicator;
278     VirtualSingleVerSyncDBInterface dbSyncInterface;
279     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
280 
281     syncTaskContext.CallSetSyncMode(QUERY_PUSH);
282     EXPECT_CALL(syncTaskContext, IsTargetQueueEmpty())
283         .WillOnce(Return(false))
284         .WillOnce(Return(true));
285     EXPECT_CALL(syncTaskContext, IsCurrentSyncTaskCanBeSkipped())
286         .WillRepeatedly(Return(syncTaskContext.CallIsCurrentSyncTaskCanBeSkipped()));
287     EXPECT_CALL(syncTaskContext, MoveToNextTarget()).WillOnce(Return());
288     // we expect machine don't change context status when queue not empty
289     EXPECT_CALL(syncTaskContext, SetOperationStatus(_)).WillOnce(Return());
290     EXPECT_CALL(syncTaskContext, SetTaskExecStatus(_)).WillOnce(Return());
291     EXPECT_CALL(syncTaskContext, Clear()).WillOnce(Return());
292 
293     EXPECT_EQ(stateMachine.CallExecNextTask(), -E_NO_SYNC_TASK);
294 }
295 
296 /**
297  * @tc.name: StateMachineCheck007
298  * @tc.desc: Test machine DoSaveDataNotify in another thread.
299  * @tc.type: FUNC
300  * @tc.require: AR000CCPOM
301  * @tc.author: zhangqiquan
302  */
303 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck007, TestSize.Level3)
304 {
305     MockSingleVerStateMachine stateMachine;
306     uint8_t callCount = 0;
307     EXPECT_CALL(stateMachine, DoSaveDataNotify(_, _, _))
__anon3ac059ba0202(uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) 308         .WillRepeatedly([&callCount](uint32_t sessionId, uint32_t sequenceId, uint32_t inMsgId) {
309             (void) sessionId;
310             (void) sequenceId;
311             (void) inMsgId;
312             callCount++;
313             std::this_thread::sleep_for(std::chrono::seconds(4)); // sleep 4s
314         });
315     stateMachine.CallStartSaveDataNotify(0, 0, 0);
316     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s
317     stateMachine.CallStopSaveDataNotify();
318     // timer is called once in 2s, we sleep 5s timer call twice
319     EXPECT_EQ(callCount, 2);
320     std::this_thread::sleep_for(std::chrono::seconds(10)); // sleep 10s to wait all thread exit
321 }
322 
323 /**
324  * @tc.name: StateMachineCheck008
325  * @tc.desc: test machine process when last sync task send packet failed.
326  * @tc.type: FUNC
327  * @tc.require: AR000CCPOM
328  * @tc.author: zhuwentao
329  */
330 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck008, TestSize.Level1)
331 {
332     MockSingleVerStateMachine stateMachine;
333     MockSyncTaskContext syncTaskContext;
334     MockCommunicator communicator;
335     VirtualSingleVerSyncDBInterface dbSyncInterface;
336     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
337     syncTaskContext.CallCommErrHandlerFuncInner(-E_PERIPHERAL_INTERFACE_FAIL, 1u);
338     EXPECT_EQ(syncTaskContext.IsCommNormal(), true);
339 }
340 
341 /**
342  * @tc.name: StateMachineCheck009
343  * @tc.desc: test machine process when last sync task send packet failed.
344  * @tc.type: FUNC
345  * @tc.require: AR000CCPOM
346  * @tc.author: zhuwentao
347  */
348 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck009, TestSize.Level1)
349 {
350     MockSingleVerStateMachine stateMachine;
351     MockSyncTaskContext syncTaskContext;
352     MockCommunicator communicator;
353     VirtualSingleVerSyncDBInterface dbSyncInterface;
354     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
355     stateMachine.CallSwitchMachineState(1u); // START_SYNC_EVENT
356     stateMachine.CommErrAbort(1u);
357     EXPECT_EQ(stateMachine.GetCurrentState(), 1u);
358 }
359 
360 /**
361  * @tc.name: StateMachineCheck010
362  * @tc.desc: test machine process when error happened in response pull.
363  * @tc.type: FUNC
364  * @tc.require: AR000CCPOM
365  * @tc.author: zhangqiquan
366  */
367 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck010, TestSize.Level1)
368 {
369     MockSingleVerStateMachine stateMachine;
370     MockSyncTaskContext syncTaskContext;
371     MockCommunicator communicator;
372     VirtualSingleVerSyncDBInterface dbSyncInterface;
373     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
374     EXPECT_CALL(stateMachine, SwitchStateAndStep(_)).WillOnce(Return());
375     stateMachine.CallResponsePullError(-E_BUSY, false);
376     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_BUSY);
377 }
378 
379 /**
380  * @tc.name: StateMachineCheck011
381  * @tc.desc: test machine process when error happened in response pull.
382  * @tc.type: FUNC
383  * @tc.require: AR000CCPOM
384  * @tc.author: zhangqiquan
385  */
386 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck011, TestSize.Level1)
387 {
388     MockSingleVerStateMachine stateMachine;
389     MockSyncTaskContext syncTaskContext;
390     MockCommunicator communicator;
391     VirtualSingleVerSyncDBInterface dbSyncInterface;
392     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
393     syncTaskContext.CallSetTaskExecStatus(SyncTaskContext::RUNNING);
394     EXPECT_CALL(syncTaskContext, GetRequestSessionId()).WillOnce(Return(1u));
395     syncTaskContext.ClearAllSyncTask();
396     EXPECT_EQ(syncTaskContext.IsCommNormal(), false);
397 }
398 
399 /**
400  * @tc.name: StateMachineCheck013
401  * @tc.desc: test kill syncTaskContext.
402  * @tc.type: FUNC
403  * @tc.require: AR000CCPOM
404  * @tc.author: zhangqiquan
405  */
406 HWTEST_F(DistributedDBMockSyncModuleTest, StateMachineCheck013, TestSize.Level1)
407 {
408     MockSingleVerStateMachine stateMachine;
409     auto *syncTaskContext = new(std::nothrow) MockSyncTaskContext();
410     auto *dbSyncInterface = new(std::nothrow) VirtualSingleVerSyncDBInterface();
411     ASSERT_NE(syncTaskContext, nullptr);
412     EXPECT_NE(dbSyncInterface, nullptr);
413     if (dbSyncInterface == nullptr) {
414         RefObject::KillAndDecObjRef(syncTaskContext);
415         return;
416     }
417     MockCommunicator communicator;
418     Init(stateMachine, syncTaskContext, communicator, dbSyncInterface);
419     EXPECT_CALL(*syncTaskContext, Clear()).WillOnce(Return());
__anon3ac059ba0302() 420     syncTaskContext->RegForkGetDeviceIdFunc([]() {
421         std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s
422     });
423     int token = 1;
424     int *tokenPtr = &token;
425     syncTaskContext->SetContinueToken(tokenPtr);
426     RefObject::KillAndDecObjRef(syncTaskContext);
427     delete dbSyncInterface;
428     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s and wait for task exist
429     tokenPtr = nullptr;
430 }
431 
432 /**
433  * @tc.name: DataSyncCheck001
434  * @tc.desc: Test dataSync recv error ack.
435  * @tc.type: FUNC
436  * @tc.require: AR000CCPOM
437  * @tc.author: zhangqiquan
438  */
439 HWTEST_F(DistributedDBMockSyncModuleTest, DataSyncCheck001, TestSize.Level1)
440 {
441     SingleVerDataSync dataSync;
442     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
443     ASSERT_TRUE(message != nullptr);
444     message->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
445     EXPECT_EQ(dataSync.AckPacketIdCheck(message), true);
446     delete message;
447 }
448 
449 /**
450  * @tc.name: DataSyncCheck002
451  * @tc.desc: Test dataSync recv notify ack.
452  * @tc.type: FUNC
453  * @tc.require: AR000CCPOM
454  * @tc.author: zhangqiquan
455  */
456 HWTEST_F(DistributedDBMockSyncModuleTest, DataSyncCheck002, TestSize.Level1)
457 {
458     SingleVerDataSync dataSync;
459     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
460     ASSERT_TRUE(message != nullptr);
461     message->SetMessageType(TYPE_NOTIFY);
462     EXPECT_EQ(dataSync.AckPacketIdCheck(message), true);
463     delete message;
464 }
465 #ifdef DATA_SYNC_CHECK_003
466 /**
467  * @tc.name: DataSyncCheck003
468  * @tc.desc: Test dataSync recv notify ack.
469  * @tc.type: FUNC
470  * @tc.require: AR000CCPOM
471  * @tc.author: zhangqiquan
472  */
473 HWTEST_F(DistributedDBMockSyncModuleTest, DataSyncCheck003, TestSize.Level1)
474 {
475     MockSingleVerDataSync mockDataSync;
476     MockSyncTaskContext mockSyncTaskContext;
477     auto mockMetadata = std::make_shared<MockMetadata>();
478     SyncTimeRange dataTimeRange = {1, 0, 1, 0};
479     mockDataSync.CallUpdateSendInfo(dataTimeRange, &mockSyncTaskContext);
480 
481     VirtualRelationalVerSyncDBInterface storage;
482     MockCommunicator communicator;
483     std::shared_ptr<Metadata> metadata = std::static_pointer_cast<Metadata>(mockMetadata);
484     mockDataSync.Initialize(&storage, &communicator, metadata, "deviceId");
485 
486     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
487     ASSERT_TRUE(message != nullptr);
488     DataAckPacket packet;
489     message->SetSequenceId(1);
490     message->SetCopiedObject(packet);
491     mockSyncTaskContext.SetQuerySync(true);
492 
493     EXPECT_CALL(*mockMetadata, GetLastQueryTime(_, _, _)).WillOnce(Return(E_OK));
494     EXPECT_CALL(*mockMetadata, SetLastQueryTime(_, _, _)).WillOnce([&dataTimeRange](const std::string &queryIdentify,
__anon3ac059ba0402(const std::string &queryIdentify, const std::string &deviceId, const Timestamp &timestamp) 495         const std::string &deviceId, const Timestamp &timestamp) {
496         EXPECT_EQ(timestamp, dataTimeRange.endTime);
497         return E_OK;
498     });
499     EXPECT_CALL(mockSyncTaskContext, SetOperationStatus(_)).WillOnce(Return());
500     EXPECT_EQ(mockDataSync.TryContinueSync(&mockSyncTaskContext, message), -E_FINISHED);
501     delete message;
502 }
503 #endif
504 /**
505  * @tc.name: AutoLaunchCheck001
506  * @tc.desc: Test autoLaunch close connection.
507  * @tc.type: FUNC
508  * @tc.require: AR000CCPOM
509  * @tc.author: zhangqiquan
510  */
511 HWTEST_F(DistributedDBMockSyncModuleTest, AutoLaunchCheck001, TestSize.Level1)
512 {
513     MockAutoLaunch mockAutoLaunch;
514     /**
515      * @tc.steps: step1. put AutoLaunchItem in cache to simulate a connection was auto launched
516      */
517     std::string id = "TestAutoLaunch";
518     std::string userId = "userId";
519     AutoLaunchItem item;
520     mockAutoLaunch.SetAutoLaunchItem(id, userId, item);
521     EXPECT_CALL(mockAutoLaunch, TryCloseConnection(_)).WillOnce(Return());
522     /**
523      * @tc.steps: step2. send close signal to simulate a connection was unused in 1 min
524      * @tc.expected: 10 thread try to close the connection and one thread close success
525      */
526     const int loopCount = 10;
527     int finishCount = 0;
528     std::mutex mutex;
529     std::unique_lock<std::mutex> lock(mutex);
530     std::condition_variable cv;
531     for (int i = 0; i < loopCount; i++) {
__anon3ac059ba0502null532         std::thread t = std::thread([&finishCount, &mockAutoLaunch, &id, &userId, &mutex, &cv] {
533             mockAutoLaunch.CallExtConnectionLifeCycleCallbackTask(id, userId);
534             finishCount++;
535             if (finishCount == loopCount) {
536                 std::unique_lock<std::mutex> lockInner(mutex);
537                 cv.notify_one();
538             }
539         });
540         t.detach();
541     }
__anon3ac059ba0602() 542     cv.wait(lock, [&finishCount, &loopCount]() {
543         return finishCount == loopCount;
544     });
545 }
546 
547 /**
548  * @tc.name: SyncDataSync001
549  * @tc.desc: Test request start when RemoveDeviceDataIfNeed failed.
550  * @tc.type: FUNC
551  * @tc.require: AR000CCPOM
552  * @tc.author: zhangqiquan
553  */
554 HWTEST_F(DistributedDBMockSyncModuleTest, SyncDataSync001, TestSize.Level1)
555 {
556     MockSyncTaskContext syncTaskContext;
557     MockSingleVerDataSync dataSync;
558 
559     EXPECT_CALL(dataSync, RemoveDeviceDataIfNeed(_)).WillRepeatedly(Return(-E_BUSY));
560     EXPECT_EQ(dataSync.CallRequestStart(&syncTaskContext, PUSH), -E_BUSY);
561     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_BUSY);
562 }
563 
564 /**
565  * @tc.name: SyncDataSync002
566  * @tc.desc: Test pull request start when RemoveDeviceDataIfNeed failed.
567  * @tc.type: FUNC
568  * @tc.require: AR000CCPOM
569  * @tc.author: zhangqiquan
570  */
571 HWTEST_F(DistributedDBMockSyncModuleTest, SyncDataSync002, TestSize.Level1)
572 {
573     MockSyncTaskContext syncTaskContext;
574     MockSingleVerDataSync dataSync;
575 
576     EXPECT_CALL(dataSync, RemoveDeviceDataIfNeed(_)).WillRepeatedly(Return(-E_BUSY));
577     EXPECT_EQ(dataSync.CallPullRequestStart(&syncTaskContext), -E_BUSY);
578     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_BUSY);
579 }
580 
581 /**
582  * @tc.name: SyncDataSync003
583  * @tc.desc: Test call RemoveDeviceDataIfNeed in diff thread.
584  * @tc.type: FUNC
585  * @tc.require: AR000CCPOM
586  * @tc.author: zhangqiquan
587  */
588 HWTEST_F(DistributedDBMockSyncModuleTest, SyncDataSync003, TestSize.Level1)
589 {
590     MockSyncTaskContext syncTaskContext;
591     MockSingleVerDataSync dataSync;
592 
593     VirtualSingleVerSyncDBInterface storage;
594     MockCommunicator communicator;
595     std::shared_ptr<MockMetadata> mockMetadata = std::make_shared<MockMetadata>();
596     std::shared_ptr<Metadata> metadata = std::static_pointer_cast<Metadata>(mockMetadata);
597     metadata->Initialize(&storage);
598     const std::string deviceId = "deviceId";
599     dataSync.Initialize(&storage, &communicator, metadata, deviceId);
600     syncTaskContext.SetRemoteSoftwareVersion(SOFTWARE_VERSION_CURRENT);
601     syncTaskContext.Initialize(deviceId, &storage, metadata, &communicator);
602     syncTaskContext.EnableClearRemoteStaleData(true);
603 
604     /**
605      * @tc.steps: step1. set diff db createtime for rebuild label in meta
606      */
607     metadata->SetDbCreateTime(deviceId, 1, true); // 1 is old db createTime
608     metadata->SetDbCreateTime(deviceId, 2, true); // 1 is new db createTime
609 
610     DistributedDB::Key k1 = {'k', '1'};
611     DistributedDB::Value v1 = {'v', '1'};
612     DistributedDB::Key k2 = {'k', '2'};
613     DistributedDB::Value v2 = {'v', '2'};
614 
615     /**
616      * @tc.steps: step2. call RemoveDeviceDataIfNeed in diff thread and then put data
617      */
__anon3ac059ba0702() 618     std::thread thread1([&syncTaskContext, &storage, &dataSync, &deviceId, &k1, &v1]() {
619         (void)dataSync.CallRemoveDeviceDataIfNeed(&syncTaskContext);
620         storage.PutDeviceData(deviceId, k1, v1);
621         LOGD("PUT FINISH");
622     });
__anon3ac059ba0802() 623     std::thread thread2([&syncTaskContext, &storage, &dataSync, &deviceId, &k2, &v2]() {
624         (void)dataSync.CallRemoveDeviceDataIfNeed(&syncTaskContext);
625         storage.PutDeviceData(deviceId, k2, v2);
626         LOGD("PUT FINISH");
627     });
628     thread1.join();
629     thread2.join();
630 
631     DistributedDB::Value actualValue;
632     storage.GetDeviceData(deviceId, k1, actualValue);
633     EXPECT_EQ(v1, actualValue);
634     storage.GetDeviceData(deviceId, k2, actualValue);
635     EXPECT_EQ(v2, actualValue);
636 }
637 
638 /**
639  * @tc.name: AbilitySync001
640  * @tc.desc: Test abilitySync abort when recv error.
641  * @tc.type: FUNC
642  * @tc.require: AR000CCPOM
643  * @tc.author: zhangqiquan
644  */
645 HWTEST_F(DistributedDBMockSyncModuleTest, AbilitySync001, TestSize.Level1)
646 {
647     MockSyncTaskContext syncTaskContext;
648     AbilitySync abilitySync;
649 
650     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
651     ASSERT_TRUE(message != nullptr);
652     AbilitySyncAckPacket packet;
653     packet.SetAckCode(-E_BUSY);
654     message->SetCopiedObject(packet);
655     EXPECT_EQ(abilitySync.AckRecv(message, &syncTaskContext), -E_BUSY);
656     delete message;
657     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_BUSY);
658 }
659 
660 /**
661  * @tc.name: AbilitySync002
662  * @tc.desc: Test abilitySync abort when save meta failed.
663  * @tc.type: FUNC
664  * @tc.require: AR000CCPOM
665  * @tc.author: zhangqiquan
666  */
667 HWTEST_F(DistributedDBMockSyncModuleTest, AbilitySync002, TestSize.Level1)
668 {
669     MockSyncTaskContext syncTaskContext;
670     AbilitySync abilitySync;
671     MockCommunicator comunicator;
672     VirtualSingleVerSyncDBInterface syncDBInterface;
673     std::shared_ptr<Metadata> metaData = std::make_shared<Metadata>();
674     metaData->Initialize(&syncDBInterface);
675     abilitySync.Initialize(&comunicator, &syncDBInterface, metaData, "deviceId");
676 
677     /**
678      * @tc.steps: step1. set AbilitySyncAckPacket ackCode is E_OK for pass the ack check
679      */
680     DistributedDB::Message *message = new(std::nothrow) DistributedDB::Message();
681     ASSERT_TRUE(message != nullptr);
682     AbilitySyncAckPacket packet;
683     packet.SetAckCode(E_OK);
684     packet.SetSoftwareVersion(SOFTWARE_VERSION_CURRENT);
685     message->SetCopiedObject(packet);
686     /**
687      * @tc.steps: step2. set syncDBInterface busy for save data return -E_BUSY
688      */
689     syncDBInterface.SetBusy(true);
690     SyncStrategy mockStrategy = {true, false, false};
691     EXPECT_CALL(syncTaskContext, GetSyncStrategy(_)).WillOnce(Return(mockStrategy));
692     EXPECT_EQ(abilitySync.AckRecv(message, &syncTaskContext), -E_BUSY);
693     delete message;
694     EXPECT_EQ(syncTaskContext.GetTaskErrCode(), -E_BUSY);
695 }
696 
697 /**
698  * @tc.name: AbilitySync002
699  * @tc.desc: Test abilitySync when offline.
700  * @tc.type: FUNC
701  * @tc.require: AR000CCPOM
702  * @tc.author: zhangqiquan
703  */
704 HWTEST_F(DistributedDBMockSyncModuleTest, AbilitySync003, TestSize.Level1)
705 {
706     /**
707      * @tc.steps: step1. set table TEST is permitSync
708      */
709     SingleVerRelationalSyncTaskContext *context = new (std::nothrow) SingleVerRelationalSyncTaskContext();
710     ASSERT_NE(context, nullptr);
711     RelationalSyncStrategy strategy;
712     const std::string tableName = "TEST";
713     strategy[tableName] = {true, true, true};
714     context->SetRelationalSyncStrategy(strategy);
715     QuerySyncObject query;
716     query.SetTableName(tableName);
717     /**
718      * @tc.steps: step2. set table is need reset ability sync but it still permit sync
719      */
720     context->SetIsNeedResetAbilitySync(true);
721     EXPECT_EQ(context->GetSyncStrategy(query).permitSync, true);
722     /**
723      * @tc.steps: step3. set table is schema change now it don't permit sync
724      */
725     context->SchemaChange();
726     EXPECT_EQ(context->GetSyncStrategy(query).permitSync, false);
727     RefObject::KillAndDecObjRef(context);
728 }
729 
730 /**
731  * @tc.name: AbilitySync004
732  * @tc.desc: Test abilitySync when offline.
733  * @tc.type: FUNC
734  * @tc.require: AR000CCPOM
735  * @tc.author: zhangqiquan
736  */
737 HWTEST_F(DistributedDBMockSyncModuleTest, AbilitySync004, TestSize.Level1)
738 {
739     /**
740      * @tc.steps: step1. set table TEST is permitSync
741      */
742     auto *context = new (std::nothrow) SingleVerKvSyncTaskContext();
743     ASSERT_NE(context, nullptr);
744     /**
745      * @tc.steps: step2. test context recv dbAbility in diff thread
746      */
747     const int loopCount = 1000;
748     std::atomic<int> finishCount = 0;
749     std::mutex mutex;
750     std::unique_lock<std::mutex> lock(mutex);
751     std::condition_variable cv;
752     for (int i = 0; i < loopCount; i++) {
__anon3ac059ba0902null753         std::thread t = std::thread([&context, &finishCount, &loopCount, &cv] {
754             DbAbility dbAbility;
755             context->SetDbAbility(dbAbility);
756             finishCount++;
757             if (finishCount == loopCount) {
758                 cv.notify_one();
759             }
760         });
761         t.detach();
762     }
__anon3ac059ba0a02() 763     cv.wait(lock, [&]() { return finishCount == loopCount; });
764     RefObject::KillAndDecObjRef(context);
765 }
766 
767 /**
768  * @tc.name: SyncLifeTest001
769  * @tc.desc: Test syncer alive when thread still exist.
770  * @tc.type: FUNC
771  * @tc.require: AR000CCPOM
772  * @tc.author: zhangqiquan
773  */
774 HWTEST_F(DistributedDBMockSyncModuleTest, SyncLifeTest001, TestSize.Level3)
775 {
776     std::shared_ptr<SingleVerKVSyncer> syncer = std::make_shared<SingleVerKVSyncer>();
777     VirtualCommunicatorAggregator *virtualCommunicatorAggregator = new VirtualCommunicatorAggregator();
778     RuntimeContext::GetInstance()->SetCommunicatorAggregator(virtualCommunicatorAggregator);
779     VirtualSingleVerSyncDBInterface *syncDBInterface = new VirtualSingleVerSyncDBInterface();
780     syncer->Initialize(syncDBInterface, true);
781     syncer->EnableAutoSync(true);
782     for (int i = 0; i < 1000; i++) { // trigger 1000 times auto sync check
783         syncer->LocalDataChanged(SQLITE_GENERAL_NS_PUT_EVENT);
784     }
785     syncer = nullptr;
786     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
787     delete syncDBInterface;
788 }
789 
790 /**
791  * @tc.name: MessageScheduleTest001
792  * @tc.desc: Test MessageSchedule stop timer when no message.
793  * @tc.type: FUNC
794  * @tc.require: AR000CCPOM
795  * @tc.author: zhangqiquan
796  */
797 HWTEST_F(DistributedDBMockSyncModuleTest, MessageScheduleTest001, TestSize.Level1)
798 {
799     MockSyncTaskContext *context = new MockSyncTaskContext();
800     context->SetRemoteSoftwareVersion(SOFTWARE_VERSION_CURRENT);
801     bool last = false;
__anon3ac059ba0b02() 802     context->OnLastRef([&last]() {
803         last = true;
804     });
805     SingleVerDataMessageSchedule schedule;
806     bool isNeedHandle = false;
807     bool isNeedContinue = false;
808     schedule.MoveNextMsg(context, isNeedHandle, isNeedContinue);
809     RefObject::KillAndDecObjRef(context);
810     std::this_thread::sleep_for(std::chrono::seconds(1));
811     EXPECT_TRUE(last);
812 }
813 
814 /**
815  * @tc.name: SyncEngineTest001
816  * @tc.desc: Test SyncEngine receive message when closing.
817  * @tc.type: FUNC
818  * @tc.require: AR000CCPOM
819  * @tc.author: zhangqiquan
820  */
821 HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest001, TestSize.Level1)
822 {
823     std::unique_ptr<MockSyncEngine> enginePtr = std::make_unique<MockSyncEngine>();
824     EXPECT_CALL(*enginePtr, CreateSyncTaskContext()).WillRepeatedly(Return(nullptr));
825     VirtualCommunicatorAggregator *virtualCommunicatorAggregator = new VirtualCommunicatorAggregator();
826     VirtualSingleVerSyncDBInterface syncDBInterface;
827     std::shared_ptr<Metadata> metaData = std::make_shared<Metadata>();
828     ASSERT_NE(virtualCommunicatorAggregator, nullptr);
829     RuntimeContext::GetInstance()->SetCommunicatorAggregator(virtualCommunicatorAggregator);
830     EXPECT_EQ(enginePtr->Initialize(nullptr, metaData, nullptr, nullptr, nullptr), -E_INVALID_ARGS);
831     std::shared_ptr<Metadata> nullMetaData = nullptr;
832     EXPECT_EQ(enginePtr->Initialize(&syncDBInterface, nullMetaData, nullptr, nullptr, nullptr), -E_INVALID_ARGS);
833     enginePtr->Initialize(&syncDBInterface, metaData, nullptr, nullptr, nullptr);
834     auto communicator =
835         static_cast<VirtualCommunicator *>(virtualCommunicatorAggregator->GetCommunicator("real_device"));
836     RefObject::IncObjRef(communicator);
__anon3ac059ba0c02() 837     std::thread thread1([&communicator]() {
838         if (communicator == nullptr) {
839             return;
840         }
841         for (int count = 0; count < 100; count++) { // loop 100 times
842             auto *message = new(std::nothrow) DistributedDB::Message();
843             communicator->CallbackOnMessage("src", message);
844         }
845     });
__anon3ac059ba0d02() 846     std::thread thread2([&enginePtr]() {
847         enginePtr->Close();
848     });
849     thread1.join();
850     thread2.join();
851 
852     LOGD("FINISHED");
853     RefObject::KillAndDecObjRef(communicator);
854     communicator = nullptr;
855     enginePtr = nullptr;
856     metaData = nullptr;
857     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
858     virtualCommunicatorAggregator = nullptr;
859 }
860 
861 /**
862  * @tc.name: SyncEngineTest003
863  * @tc.desc: Test SyncEngine add block sync operation.
864  * @tc.type: FUNC
865  * @tc.require: AR000CCPOM
866  * @tc.author: zhangqiquan
867  */
868 HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest003, TestSize.Level1)
869 {
870     auto *enginePtr = new (std::nothrow) MockSyncEngine();
871     ASSERT_NE(enginePtr, nullptr);
872     std::vector<std::string> devices = {
873         "DEVICES_A", "DEVICES_B"
874     };
875     const int syncId = 1;
876     auto operation = new (std::nothrow) SyncOperation(syncId, devices, 0, nullptr, true);
877     ASSERT_NE(operation, nullptr);
878     operation->Initialize();
879     enginePtr->AddSyncOperation(operation);
880     for (const auto &device: devices) {
881         EXPECT_EQ(operation->GetStatus(device), static_cast<int>(SyncOperation::OP_BUSY_FAILURE));
882     }
883     RefObject::KillAndDecObjRef(operation);
884     RefObject::KillAndDecObjRef(enginePtr);
885 }
886 
887 /**
888 * @tc.name: remote query packet 001
889 * @tc.desc: Test RemoteExecutorRequestPacket Serialization And DeSerialization
890 * @tc.type: FUNC
891 * @tc.require: AR000GK58G
892 * @tc.author: zhangqiquan
893 */
894 HWTEST_F(DistributedDBMockSyncModuleTest, RemoteQueryPacket001, TestSize.Level1)
895 {
896     /**
897      * @tc.steps: step1. create remoteExecutorRequestPacket
898      */
899     RemoteExecutorRequestPacket packet;
900     std::map<std::string, std::string> extraCondition = { { "test", "testsql" } };
901     packet.SetExtraConditions(extraCondition);
902     packet.SetNeedResponse();
903     packet.SetVersion(SOFTWARE_VERSION_RELEASE_6_0);
904 
905     /**
906      * @tc.steps: step2. serialization to parcel
907      */
908     std::vector<uint8_t> buffer(packet.CalculateLen());
909     Parcel parcel(buffer.data(), buffer.size());
910     ASSERT_EQ(packet.Serialization(parcel), E_OK);
911     ASSERT_FALSE(parcel.IsError());
912 
913     /**
914      * @tc.steps: step3. deserialization from parcel
915      */
916     RemoteExecutorRequestPacket targetPacket;
917     Parcel targetParcel(buffer.data(), buffer.size());
918     ASSERT_EQ(targetPacket.DeSerialization(targetParcel), E_OK);
919     ASSERT_FALSE(parcel.IsError());
920 
921     /**
922      * @tc.steps: step4. check packet is equal
923      */
924     EXPECT_EQ(packet.GetVersion(), targetPacket.GetVersion());
925     EXPECT_EQ(packet.GetFlag(), targetPacket.GetFlag());
926 }
927 
928 /**
929 * @tc.name: remote query packet 002
930 * @tc.desc: Test RemoteExecutorAckPacket Serialization And DeSerialization
931 * @tc.type: FUNC
932 * @tc.require: AR000GK58G
933 * @tc.author: zhangqiquan
934 */
935 HWTEST_F(DistributedDBMockSyncModuleTest, RemoteQueryPacket002, TestSize.Level1)
936 {
937     /**
938      * @tc.steps: step1. create remoteExecutorRequestPacket
939      */
940     RemoteExecutorAckPacket packet;
941     packet.SetLastAck();
942     packet.SetAckCode(-E_INTERNAL_ERROR);
943     packet.SetVersion(SOFTWARE_VERSION_RELEASE_6_0);
944 
945     /**
946      * @tc.steps: step2. serialization to parcel
947      */
948     std::vector<uint8_t> buffer(packet.CalculateLen());
949     Parcel parcel(buffer.data(), buffer.size());
950     ASSERT_EQ(packet.Serialization(parcel), E_OK);
951     ASSERT_FALSE(parcel.IsError());
952 
953     /**
954      * @tc.steps: step3. deserialization from parcel
955      */
956     RemoteExecutorAckPacket targetPacket;
957     Parcel targetParcel(buffer.data(), buffer.size());
958     ASSERT_EQ(targetPacket.DeSerialization(targetParcel), E_OK);
959     ASSERT_FALSE(parcel.IsError());
960 
961     /**
962      * @tc.steps: step4. check packet is equal
963      */
964     EXPECT_EQ(packet.GetVersion(), targetPacket.GetVersion());
965     EXPECT_EQ(packet.GetFlag(), targetPacket.GetFlag());
966     EXPECT_EQ(packet.GetAckCode(), targetPacket.GetAckCode());
967 }
968 
969 /**
970 * @tc.name: remote query packet 003
971 * @tc.desc: Test RemoteExecutorRequestPacket Serialization with invalid args
972 * @tc.type: FUNC
973 * @tc.require: AR000GK58G
974 * @tc.author: zhangshijie
975 */
976 HWTEST_F(DistributedDBMockSyncModuleTest, RemoteQueryPacket003, TestSize.Level1)
977 {
978     /**
979      * @tc.steps: step1. check E_INVALID_ARGS
980      */
981     RemoteExecutorRequestPacket packet;
982     packet.SetNeedResponse();
983     packet.SetVersion(SOFTWARE_VERSION_RELEASE_6_0);
984 
985     std::vector<uint8_t> buffer(packet.CalculateLen());
986     Parcel parcel(buffer.data(), buffer.size());
987 
988     ASSERT_EQ(packet.Serialization(parcel), E_OK);
989     std::map<std::string, std::string> extraCondition = { { "test", "testsql" } };
990     packet.SetExtraConditions(extraCondition);
991     EXPECT_EQ(packet.Serialization(parcel), -E_INVALID_ARGS);
992 
993     std::string sql = "testsql";
994     for (uint32_t i = 0; i < DBConstant::MAX_CONDITION_COUNT; i++) {
995         extraCondition[std::to_string(i)] = sql;
996     }
997     packet.SetExtraConditions(extraCondition);
998 
999     std::vector<uint8_t> buffer2(packet.CalculateLen());
1000     Parcel parcel2(buffer2.data(), buffer2.size());
1001     Parcel targetParcel2(buffer2.data(), buffer2.size());
1002     EXPECT_EQ(packet.Serialization(parcel2), -E_INVALID_ARGS);
1003 
1004     extraCondition.erase("0");
1005     extraCondition.erase("1");
1006     extraCondition.erase("2");
1007     std::string bigKey(DBConstant::MAX_CONDITION_KEY_LEN + 1, 'a');
1008     extraCondition[bigKey] = sql;
1009     packet.SetExtraConditions(extraCondition);
1010     std::vector<uint8_t> buffer3(packet.CalculateLen());
1011     Parcel parcel3(buffer3.data(), buffer3.size());
1012     EXPECT_EQ(packet.Serialization(parcel3), -E_INVALID_ARGS);
1013 
1014     std::string bigValue(DBConstant::MAX_CONDITION_VALUE_LEN + 1, 'a');
1015     extraCondition["1"] = bigValue;
1016     packet.SetExtraConditions(extraCondition);
1017     std::vector<uint8_t> buffer4(packet.CalculateLen());
1018     Parcel parcel4(buffer4.data(), buffer4.size());
1019     EXPECT_EQ(packet.Serialization(parcel4), -E_INVALID_ARGS);
1020 }
1021 
1022 /**
1023 * @tc.name: remote query packet 004
1024 * @tc.desc: Test RemoteExecutorRequestPacket Deserialization with invalid args
1025 * @tc.type: FUNC
1026 * @tc.require: AR000GK58G
1027 * @tc.author: zhangshijie
1028 */
1029 HWTEST_F(DistributedDBMockSyncModuleTest, RemoteQueryPacket004, TestSize.Level1)
1030 {
1031     RemoteExecutorRequestPacket packet;
1032     packet.SetNeedResponse();
1033     packet.SetVersion(SOFTWARE_VERSION_RELEASE_6_0);
1034 
1035     std::vector<uint8_t> buffer(packet.CalculateLen());
1036     RemoteExecutorRequestPacket targetPacket;
1037     Parcel targetParcel(buffer.data(), 3); // 3 is invalid len for deserialization
1038     EXPECT_EQ(targetPacket.DeSerialization(targetParcel), -E_INVALID_ARGS);
1039 
1040     std::vector<uint8_t> buffer1(1024); // 1024 is buffer len for serialization
1041     Parcel parcel(buffer1.data(), buffer1.size());
1042     ConstructPacel(parcel, DBConstant::MAX_CONDITION_COUNT + 1, "", "");
1043     Parcel desParcel(buffer1.data(), buffer1.size());
1044     EXPECT_EQ(targetPacket.DeSerialization(desParcel), -E_INVALID_ARGS);
1045 
1046     Parcel parcel2(buffer1.data(), buffer1.size());
1047     std::string bigKey(DBConstant::MAX_CONDITION_KEY_LEN + 1, 'a');
1048     ConstructPacel(parcel2, 1, bigKey, "");
1049     Parcel desParcel2(buffer1.data(), buffer1.size());
1050     EXPECT_EQ(targetPacket.DeSerialization(desParcel2), -E_INVALID_ARGS);
1051 
1052     Parcel parcel3(buffer1.data(), buffer1.size());
1053     std::string bigValue(DBConstant::MAX_CONDITION_VALUE_LEN + 1, 'a');
1054     ConstructPacel(parcel3, 1, "1", bigValue);
1055     Parcel desParcel3(buffer1.data(), buffer1.size());
1056     EXPECT_EQ(targetPacket.DeSerialization(desParcel3), -E_INVALID_ARGS);
1057 
1058     Parcel parcel4(buffer1.data(), buffer1.size());
1059     ConstructPacel(parcel4, 1, "1", "1");
1060     Parcel desParcel4(buffer1.data(), buffer1.size());
1061     EXPECT_EQ(targetPacket.DeSerialization(desParcel4), E_OK);
1062 
1063     Parcel parcel5(buffer1.data(), buffer1.size());
1064     ConstructPacel(parcel5, 0, "", "");
1065     Parcel desParcel5(buffer1.data(), buffer1.size());
1066     EXPECT_EQ(targetPacket.DeSerialization(desParcel5), E_OK);
1067 }
1068 
1069 /**
1070  * @tc.name: SingleVerKvEntryTest001
1071  * @tc.desc: Test SingleVerKvEntry Serialize and DeSerialize.
1072  * @tc.type: FUNC
1073  * @tc.require: AR000CCPOM
1074  * @tc.author: zhangqiquan
1075  */
1076 HWTEST_F(DistributedDBMockSyncModuleTest, SingleVerKvEntryTest001, TestSize.Level1)
1077 {
1078     std::vector<SingleVerKvEntry *> kvEntries;
1079     size_t len = 0u;
1080     for (size_t i = 0; i < DBConstant::MAX_NORMAL_PACK_ITEM_SIZE + 1; ++i) {
1081         auto entryPtr = new GenericSingleVerKvEntry();
1082         kvEntries.push_back(entryPtr);
1083         len += entryPtr->CalculateLen(SOFTWARE_VERSION_CURRENT);
1084         len = BYTE_8_ALIGN(len);
1085     }
1086     std::vector<uint8_t> srcData(len, 0);
1087     Parcel parcel(srcData.data(), srcData.size());
1088     EXPECT_EQ(GenericSingleVerKvEntry::SerializeDatas(kvEntries, parcel, SOFTWARE_VERSION_CURRENT), E_OK);
1089     parcel = Parcel(srcData.data(), srcData.size());
1090     EXPECT_EQ(GenericSingleVerKvEntry::DeSerializeDatas(kvEntries, parcel), 0);
1091 }
1092 
1093 /**
1094 * @tc.name: mock remote query 001
1095 * @tc.desc: Test RemoteExecutor receive msg when closing
1096 * @tc.type: FUNC
1097 * @tc.require: AR000GK58G
1098 * @tc.author: zhangqiquan
1099 */
1100 HWTEST_F(DistributedDBMockSyncModuleTest, MockRemoteQuery001, TestSize.Level3)
1101 {
1102     MockRemoteExecutor *executor = new(std::nothrow) MockRemoteExecutor();
1103     ASSERT_NE(executor, nullptr);
1104     uint32_t count = 0u;
1105     EXPECT_CALL(*executor, ParseOneRequestMessage).WillRepeatedly(
__anon3ac059ba0e02(const std::string &device, DistributedDB::Message *inMsg) 1106         [&count](const std::string &device, DistributedDB::Message *inMsg) {
1107         std::this_thread::sleep_for(std::chrono::seconds(5)); // mock one msg execute 5 s
1108         count++;
1109     });
1110     EXPECT_CALL(*executor, IsPacketValid).WillRepeatedly(Return(true));
1111     for (uint32_t i = 0; i < MESSAGE_COUNT; i++) {
1112         DistributedDB::Message *message = nullptr;
1113         EXPECT_EQ(BuildRemoteQueryMsg(message), E_OK);
1114         executor->ReceiveMessage("DEVICE", message);
1115     }
1116     std::this_thread::sleep_for(std::chrono::seconds(1));
1117     executor->Close();
1118     EXPECT_EQ(count, EXECUTE_COUNT);
1119     RefObject::KillAndDecObjRef(executor);
1120 }
1121 
1122 /**
1123 * @tc.name: mock remote query 002
1124 * @tc.desc: Test RemoteExecutor response failed when closing
1125 * @tc.type: FUNC
1126 * @tc.require: AR000GK58G
1127 * @tc.author: zhangqiquan
1128 */
1129 HWTEST_F(DistributedDBMockSyncModuleTest, MockRemoteQuery002, TestSize.Level3)
1130 {
1131     MockRemoteExecutor *executor = new(std::nothrow) MockRemoteExecutor();
1132     ASSERT_NE(executor, nullptr);
1133     executor->CallResponseFailed(0, 0, 0, "DEVICE");
1134     RefObject::KillAndDecObjRef(executor);
1135 }
1136 
1137 /**
1138  * @tc.name: SyncTaskContextCheck001
1139  * @tc.desc: test context check task can be skipped in push mode.
1140  * @tc.type: FUNC
1141  * @tc.require: AR000CCPOM
1142  * @tc.author: zhangqiquan
1143  */
1144 HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck001, TestSize.Level1)
1145 {
1146     MockSyncTaskContext syncTaskContext;
1147     MockCommunicator communicator;
1148     VirtualSingleVerSyncDBInterface dbSyncInterface;
1149     std::shared_ptr<Metadata> metadata = std::make_shared<Metadata>();
1150     (void)syncTaskContext.Initialize("device", &dbSyncInterface, metadata, &communicator);
1151     syncTaskContext.SetLastFullSyncTaskStatus(SyncOperation::Status::OP_FINISHED_ALL);
1152     syncTaskContext.CallSetSyncMode(static_cast<int>(SyncModeType::PUSH));
1153     EXPECT_EQ(syncTaskContext.CallIsCurrentSyncTaskCanBeSkipped(), true);
1154 }
1155 
1156 /**
1157  * @tc.name: SyncTaskContextCheck002
1158  * @tc.desc: test context check task can be skipped in push mode.
1159  * @tc.type: FUNC
1160  * @tc.require: AR000CCPOM
1161  * @tc.author: zhangqiquan
1162  */
1163 HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck002, TestSize.Level1)
1164 {
1165     /**
1166      * @tc.steps: step1. create context and operation
1167      */
1168     auto syncTaskContext = new(std::nothrow) MockSyncTaskContext();
1169     ASSERT_NE(syncTaskContext, nullptr);
1170     auto operation = new SyncOperation(1u, {}, static_cast<int>(SyncModeType::QUERY_PUSH), nullptr, false);
1171     ASSERT_NE(operation, nullptr);
1172     QuerySyncObject querySyncObject;
1173     operation->SetQuery(querySyncObject);
1174     syncTaskContext->SetSyncOperation(operation);
1175     syncTaskContext->SetLastFullSyncTaskStatus(SyncOperation::Status::OP_FAILED);
1176     syncTaskContext->CallSetSyncMode(static_cast<int>(SyncModeType::QUERY_PUSH));
1177     EXPECT_CALL(*syncTaskContext, IsTargetQueueEmpty()).WillRepeatedly(Return(false));
1178 
1179     const int loopCount = 1000;
1180     /**
1181      * @tc.steps: step2. loop 1000 times for writing data into lastQuerySyncTaskStatusMap_ async
1182      */
__anon3ac059ba0f02() 1183     std::thread writeThread([&syncTaskContext]() {
1184         for (int i = 0; i < loopCount; ++i) {
1185             syncTaskContext->SaveLastPushTaskExecStatus(static_cast<int>(SyncOperation::Status::OP_FAILED));
1186         }
1187     });
1188     /**
1189      * @tc.steps: step3. loop 100000 times for clear lastQuerySyncTaskStatusMap_ async
1190      */
__anon3ac059ba1002() 1191     std::thread clearThread([&syncTaskContext]() {
1192         for (int i = 0; i < 100000; ++i) { // loop 100000 times
1193             syncTaskContext->ResetLastPushTaskStatus();
1194         }
1195     });
1196     /**
1197      * @tc.steps: step4. loop 1000 times for read data from lastQuerySyncTaskStatusMap_ async
1198      */
__anon3ac059ba1102() 1199     std::thread readThread([&syncTaskContext]() {
1200         for (int i = 0; i < loopCount; ++i) {
1201             EXPECT_EQ(syncTaskContext->CallIsCurrentSyncTaskCanBeSkipped(), false);
1202         }
1203     });
1204     writeThread.join();
1205     clearThread.join();
1206     readThread.join();
1207     RefObject::KillAndDecObjRef(operation);
1208     syncTaskContext->SetSyncOperation(nullptr);
1209     RefObject::KillAndDecObjRef(syncTaskContext);
1210 }
1211 
1212 #ifdef RUN_AS_ROOT
1213 /**
1214  * @tc.name: TimeChangeListenerTest001
1215  * @tc.desc: Test RegisterTimeChangedLister.
1216  * @tc.type: FUNC
1217  * @tc.require: AR000CCPOM
1218  * @tc.author: zhangqiquan
1219  */
1220 HWTEST_F(DistributedDBMockSyncModuleTest, TimeChangeListenerTest001, TestSize.Level1)
1221 {
1222     SingleVerKVSyncer syncer;
1223     VirtualSingleVerSyncDBInterface syncDBInterface;
1224     KvDBProperties dbProperties;
1225     dbProperties.SetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, true);
1226     syncDBInterface.SetDbProperties(dbProperties);
1227     EXPECT_EQ(syncer.Initialize(&syncDBInterface, false), -E_NO_NEED_ACTIVE);
1228     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s wait for time tick
1229     const std::string LOCAL_TIME_OFFSET_KEY = "localTimeOffset";
1230     std::vector<uint8_t> key;
1231     DBCommon::StringToVector(LOCAL_TIME_OFFSET_KEY, key);
1232     std::vector<uint8_t> beforeOffset;
1233     EXPECT_EQ(syncDBInterface.GetMetaData(key, beforeOffset), E_OK);
1234     ChangeTime(2); // increase 2s
1235     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s wait for time tick
1236     std::vector<uint8_t> afterOffset;
1237     EXPECT_EQ(syncDBInterface.GetMetaData(key, afterOffset), E_OK);
1238     EXPECT_NE(beforeOffset, afterOffset);
1239     ChangeTime(-2); // decrease 2s
1240 }
1241 #endif
1242 
1243 /**
1244  * @tc.name: TimeSync001
1245  * @tc.desc: Test syncer call set sync retry before init.
1246  * @tc.type: FUNC
1247  * @tc.require: AR000CCPOM
1248  * @tc.author: zhangqiquan
1249  */
1250 HWTEST_F(DistributedDBMockSyncModuleTest, TimeSync001, TestSize.Level1)
1251 {
1252     auto *communicator = new(std::nothrow) MockCommunicator();
1253     ASSERT_NE(communicator, nullptr);
1254     auto *storage = new(std::nothrow) VirtualSingleVerSyncDBInterface();
1255     ASSERT_NE(storage, nullptr);
1256     std::shared_ptr<Metadata> metadata = std::make_shared<Metadata>();
1257 
1258     EXPECT_CALL(*communicator, SendMessage(_, _, _, _)).WillRepeatedly(Return(DB_ERROR));
1259     const int loopCount = 100;
1260     const int timeDriverMs = 100;
1261     for (int i = 0; i < loopCount; ++i) {
1262         MockTimeSync timeSync;
1263         timeSync.Initialize(communicator, metadata, storage, "DEVICES_A");
1264         timeSync.ModifyTimer(timeDriverMs);
1265         std::this_thread::sleep_for(std::chrono::milliseconds(timeDriverMs));
1266         timeSync.Close();
1267     }
1268     std::this_thread::sleep_for(std::chrono::seconds(1));
1269     metadata = nullptr;
1270     delete storage;
1271     delete communicator;
1272 }
1273