1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18 #include <sys/eventfd.h>
19 #include <cstdarg>
20
21 #include "begetctl.h"
22 #include "cJSON.h"
23 #include "init.h"
24 #include "init_hashmap.h"
25 #include "init_param.h"
26 #include "init_utils.h"
27 #include "le_epoll.h"
28 #include "le_loop.h"
29 #include "le_socket.h"
30 #include "le_task.h"
31 #include "loop_event.h"
32 #include "param_manager.h"
33 #include "param_message.h"
34 #include "param_utils.h"
35 #include "trigger_manager.h"
36
37 using namespace testing::ext;
38 using namespace std;
39
40 namespace init_ut {
41 const std::string TCP_SERVER = "127.0.0.1:7777";
42 const std::string PIPE_SERVER = STARTUP_INIT_UT_PATH "/dev/unix/socket/testsocket";
43 const std::string WATCHER_FILE = STARTUP_INIT_UT_PATH "/test_watcher_file";
44 const std::string FORMAT_STR = "{ \"cmd\":%d, \"message\":\"%s\" }";
45
46 static LoopHandle g_loopClient_ = nullptr;
47 static LoopHandle g_loopServer_ = nullptr;
48 static int g_maxCount = 0;
49 static int g_timeCount = 0;
50 static int g_cmd = 2;
DecodeMessage(const char * buffer,size_t nread,uint32_t & cmd)51 static void DecodeMessage(const char *buffer, size_t nread, uint32_t &cmd)
52 {
53 cJSON *root = cJSON_ParseWithLength(buffer, nread);
54 if (root == nullptr) {
55 EXPECT_NE(root, nullptr);
56 printf("Invalid message %s \n", buffer);
57 return;
58 }
59 printf("Message: %s \n", cJSON_GetStringValue(cJSON_GetObjectItem(root, "message")));
60 cmd = cJSON_GetNumberValue(cJSON_GetObjectItem(root, "cmd"));
61 printf("cmd: %d \n", cmd);
62 cJSON_Delete(root);
63 return;
64 }
65
SendMessage(const LoopHandle loopHandle,const TaskHandle taskHandle,const char * message,...)66 static void SendMessage(const LoopHandle loopHandle, const TaskHandle taskHandle, const char *message, ...)
67 {
68 uint32_t bufferSize = 1024; // 1024 buffer size
69 BufferHandle handle = LE_CreateBuffer(loopHandle, bufferSize);
70 char *buffer = (char *)LE_GetBufferInfo(handle, nullptr, &bufferSize);
71
72 va_list vargs;
73 va_start(vargs, message);
74 if (vsnprintf_s(buffer, bufferSize, bufferSize - 1, message, vargs) == -1) {
75 LE_FreeBuffer(loopHandle, taskHandle, handle);
76 va_end(vargs);
77 EXPECT_EQ(1, 0);
78 return;
79 }
80 va_end(vargs);
81 int ret = LE_Send(loopHandle, taskHandle, handle, bufferSize);
82 EXPECT_EQ(ret, 0);
83 }
84
TestOnClose(const TaskHandle taskHandle)85 static void TestOnClose(const TaskHandle taskHandle)
86 {
87 }
88
TestHandleTaskEvent(const LoopHandle loop,const TaskHandle task,uint32_t oper)89 static LE_STATUS TestHandleTaskEvent(const LoopHandle loop, const TaskHandle task, uint32_t oper)
90 {
91 return LE_SUCCESS;
92 }
93
TestOnReceiveRequest(const TaskHandle task,const uint8_t * buffer,uint32_t nread)94 static void TestOnReceiveRequest(const TaskHandle task, const uint8_t *buffer, uint32_t nread)
95 {
96 EXPECT_NE(buffer, nullptr);
97 if (buffer == nullptr) {
98 return;
99 }
100 printf("Server receive message %s \n", reinterpret_cast<const char *>(buffer));
101 uint32_t cmd = 0;
102 DecodeMessage(reinterpret_cast<const char *>(buffer), nread, cmd);
103 SendMessage(g_loopServer_, task, reinterpret_cast<const char *>(buffer));
104 }
105
TestClientOnReceiveRequest(const TaskHandle task,const uint8_t * buffer,uint32_t nread)106 static void TestClientOnReceiveRequest(const TaskHandle task, const uint8_t *buffer, uint32_t nread)
107 {
108 printf("Client receive message %s \n", reinterpret_cast<const char *>(buffer));
109 EXPECT_NE(buffer, nullptr);
110 if (buffer == nullptr) {
111 return;
112 }
113 uint32_t cmd = 0;
114 DecodeMessage(reinterpret_cast<const char *>(buffer), nread, cmd);
115 if (cmd == 5 || cmd == 2) { // 2 5 close server
116 LE_StopLoop(g_loopClient_);
117 }
118 }
119
ProcessAsyncEvent(const TaskHandle taskHandle,uint64_t eventId,const uint8_t * buffer,uint32_t buffLen)120 static void ProcessAsyncEvent(const TaskHandle taskHandle, uint64_t eventId, const uint8_t *buffer, uint32_t buffLen)
121 {
122 UNUSED(taskHandle);
123 UNUSED(eventId);
124 UNUSED(buffer);
125 UNUSED(buffLen);
126 }
127
TestSendMessageComplete(const TaskHandle taskHandle,BufferHandle handle)128 static void TestSendMessageComplete(const TaskHandle taskHandle, BufferHandle handle)
129 {
130 printf("SendMessage result %d \n", LE_GetSendResult(handle));
131 uint32_t bufferSize = 1024; // 1024 buffer size
132 char *buffer = (char *)LE_GetBufferInfo(handle, nullptr, &bufferSize);
133 uint32_t cmd = 0;
134 DecodeMessage(reinterpret_cast<const char *>(buffer), bufferSize, cmd);
135 if (cmd == 5) { // 5 close server
136 LE_StopLoop(g_loopServer_);
137 }
138 }
139
TestTcpIncomingConnect(LoopHandle loop,TaskHandle server)140 static int TestTcpIncomingConnect(LoopHandle loop, TaskHandle server)
141 {
142 PARAM_CHECK(server != nullptr, return -1, "Error server");
143 printf("Tcp connect incoming \n");
144 TaskHandle stream;
145 LE_StreamInfo info = {};
146 info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_CONNECT;
147 info.baseInfo.close = TestOnClose;
148 info.baseInfo.userDataSize = 0;
149 info.disConnectComplete = nullptr;
150 info.sendMessageComplete = TestSendMessageComplete;
151 info.recvMessage = TestOnReceiveRequest;
152 LE_STATUS ret = LE_AcceptStreamClient(loop, server, &stream, &info);
153 EXPECT_EQ(ret, 0);
154 return 0;
155 }
156
TestPipIncomingConnect(LoopHandle loop,TaskHandle server)157 static int TestPipIncomingConnect(LoopHandle loop, TaskHandle server)
158 {
159 PARAM_CHECK(server != nullptr, return -1, "Error server");
160 printf("Pipe connect incoming \n");
161 TaskHandle stream;
162 LE_StreamInfo info = {};
163 info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_CONNECT;
164 info.baseInfo.close = TestOnClose;
165 info.baseInfo.userDataSize = 0;
166 info.disConnectComplete = nullptr;
167 info.sendMessageComplete = TestSendMessageComplete;
168 info.recvMessage = TestOnReceiveRequest;
169 LE_STATUS ret = LE_AcceptStreamClient(loop, server, &stream, &info);
170 EXPECT_EQ(ret, 0);
171 return 0;
172 }
173
TestConnectComplete(const TaskHandle client)174 static void TestConnectComplete(const TaskHandle client)
175 {
176 printf("Connect complete \n");
177 }
178
TestDisConnectComplete(const TaskHandle client)179 static void TestDisConnectComplete(const TaskHandle client)
180 {
181 printf("DisConnect complete \n");
182 LE_StopLoop(g_loopClient_);
183 }
184
TestProcessTimer(const TimerHandle taskHandle,void * context)185 static void TestProcessTimer(const TimerHandle taskHandle, void *context)
186 {
187 g_timeCount++;
188 printf("ProcessTimer %d\n", g_timeCount);
189 if (g_maxCount == 2) { // 2 stop
190 if (g_timeCount >= g_maxCount) {
191 LE_StopLoop(g_loopClient_);
192 }
193 }
194 if (g_maxCount == 3) { // 3 stop timer
195 if (g_timeCount >= g_maxCount) {
196 LE_StopTimer(g_loopClient_, taskHandle);
197 LE_StopLoop(g_loopClient_);
198 }
199 }
200 if (g_maxCount == 10) { // 10 write watcher file
201 FILE *tmpFile = fopen(WATCHER_FILE.c_str(), "wr");
202 if (tmpFile != nullptr) {
203 fprintf(tmpFile, "%s", "test watcher file 22222222222");
204 (void)fflush(tmpFile);
205 fclose(tmpFile);
206 }
207 LE_StopTimer(g_loopClient_, taskHandle);
208 LE_StopLoop(g_loopClient_);
209 }
210 }
211
ProcessWatchEventTest(WatcherHandle taskHandle,int fd,uint32_t * events,const void * context)212 static void ProcessWatchEventTest(WatcherHandle taskHandle, int fd, uint32_t *events, const void *context)
213 {
214 UNUSED(taskHandle);
215 UNUSED(fd);
216 UNUSED(events);
217 UNUSED(context);
218 printf("Process watcher event \n");
219 LE_StopLoop(g_loopClient_);
220 }
221
222 class LoopServerUnitTest : public testing::Test {
223 public:
LoopServerUnitTest()224 LoopServerUnitTest() {};
~LoopServerUnitTest()225 virtual ~LoopServerUnitTest() {};
SetUpTestCase(void)226 static void SetUpTestCase(void) {};
TearDownTestCase(void)227 static void TearDownTestCase(void) {};
SetUp()228 void SetUp() {};
TearDown()229 void TearDown() {};
TestBody(void)230 void TestBody(void) {};
231
232 // for thread to create tcp\pipe server
RunServer(void)233 void RunServer(void)
234 {
235 TaskHandle tcpServer = nullptr;
236 TaskHandle pipeServer = nullptr;
237 LE_STATUS ret = LE_CreateLoop(&g_loopServer_);
238 EXPECT_EQ(ret, 0);
239 // create server for tcp
240 LE_StreamServerInfo info = {};
241 info.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
242 info.socketId = -1;
243 info.server = const_cast<char *>(TCP_SERVER.c_str());
244 info.baseInfo.close = TestOnClose;
245 info.incommingConnect = TestTcpIncomingConnect;
246 ret = LE_CreateStreamServer(g_loopServer_, &tcpServer, &info);
247 EXPECT_EQ(ret, 0);
248
249 info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
250 info.socketId = -1;
251 info.server = const_cast<char *>(PIPE_SERVER.c_str());
252 info.baseInfo.close = TestOnClose;
253 info.incommingConnect = TestPipIncomingConnect;
254 ret = LE_CreateStreamServer(g_loopServer_, &pipeServer, &info);
255 EXPECT_EQ(ret, 0);
256
257 printf("Run server pipeServer_ \n");
258 // run loop for server
259 LE_RunLoop(g_loopServer_);
260
261 printf("Run server pipeServer_ \n");
262 LE_CloseStreamTask(g_loopServer_, pipeServer);
263 pipeServer = nullptr;
264 LE_CloseStreamTask(g_loopServer_, tcpServer);
265 tcpServer = nullptr;
266 LE_CloseLoop(g_loopServer_);
267 g_loopServer_ = nullptr;
268 }
269
StartServer()270 void StartServer()
271 {
272 std::thread(&LoopServerUnitTest::RunServer, this).detach();
273 sleep(1);
274 }
275
CreateConnect(const char * tcpServer,uint32_t flags)276 TaskHandle CreateConnect(const char *tcpServer, uint32_t flags)
277 {
278 if (g_loopClient_ == nullptr) {
279 LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
280 EXPECT_EQ(ret, 0);
281 }
282
283 TaskHandle task = nullptr;
284 LE_StreamInfo info = {};
285 info.baseInfo.flags = TASK_STREAM | flags | TASK_CONNECT;
286 info.server = const_cast<char *>(tcpServer);
287 info.baseInfo.userDataSize = 0;
288 info.baseInfo.close = TestOnClose;
289 info.disConnectComplete = TestDisConnectComplete;
290 info.connectComplete = TestConnectComplete;
291 info.sendMessageComplete = nullptr;
292 info.recvMessage = TestClientOnReceiveRequest;
293 LE_STATUS status = LE_CreateStreamClient(g_loopClient_, &task, &info);
294 EXPECT_EQ(status, 0);
295 return task;
296 }
297
CreateWatcherTask(int fd,const char * fileName)298 WatcherHandle CreateWatcherTask(int fd, const char *fileName)
299 {
300 if (g_loopClient_ == nullptr) {
301 LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
302 EXPECT_EQ(ret, 0);
303 }
304 WatcherHandle handle = nullptr;
305 LE_WatchInfo info = {};
306 info.fd = fd;
307 info.flags = WATCHER_ONCE;
308 info.events = Event_Read | Event_Write;
309 info.processEvent = ProcessWatchEventTest;
310 LE_STATUS status = LE_StartWatcher(g_loopClient_, &handle, &info, nullptr);
311 EXPECT_EQ(status, 0);
312 return handle;
313 }
314
CreateTimerTask(int repeat)315 TimerHandle CreateTimerTask(int repeat)
316 {
317 if (g_loopClient_ == nullptr) {
318 LE_STATUS ret = LE_CreateLoop(&g_loopClient_);
319 EXPECT_EQ(ret, 0);
320 }
321 TimerHandle timer = nullptr;
322 int ret = LE_CreateTimer(g_loopClient_, &timer, TestProcessTimer, nullptr);
323 EXPECT_EQ(ret, 0);
324 ret = LE_StartTimer(g_loopClient_, timer, 500, repeat); // 500 ms
325 EXPECT_EQ(ret, 0);
326 return timer;
327 }
328 private:
329 std::thread *serverThread_ = nullptr;
330 };
331
332 HWTEST_F(LoopServerUnitTest, TestRunServer, TestSize.Level1)
333 {
334 LoopServerUnitTest test;
335 test.StartServer();
336 }
337
338 HWTEST_F(LoopServerUnitTest, TestPipConnect, TestSize.Level1)
339 {
340 g_cmd = 2; // 2 only close client
341 LoopServerUnitTest test;
342 TaskHandle pipe = test.CreateConnect(PIPE_SERVER.c_str(), TASK_PIPE);
343 EXPECT_NE(pipe, nullptr);
344 SendMessage(g_loopClient_, pipe, FORMAT_STR.c_str(), g_cmd, "connect success");
345 LE_RunLoop(g_loopClient_);
346 LE_CloseStreamTask(g_loopClient_, pipe);
347 LE_CloseLoop(g_loopClient_);
348 g_loopClient_ = nullptr;
349 }
350
351 HWTEST_F(LoopServerUnitTest, TestTcpConnect, TestSize.Level1)
352 {
353 g_cmd = 2; // 2 only close client
354 LoopServerUnitTest test;
355 TaskHandle tcp = test.CreateConnect(TCP_SERVER.c_str(), TASK_TCP);
356 EXPECT_NE(tcp, nullptr);
357 SendMessage(g_loopClient_, tcp, FORMAT_STR.c_str(), g_cmd, "connect success");
358 LE_RunLoop(g_loopClient_);
359 LE_CloseStreamTask(g_loopClient_, tcp);
360 LE_CloseLoop(g_loopClient_);
361 g_loopClient_ = nullptr;
362 }
363
364 HWTEST_F(LoopServerUnitTest, TestTimer, TestSize.Level1)
365 {
366 LoopServerUnitTest test;
367 g_maxCount = 2; // 2 stop
368 TimerHandle timer = test.CreateTimerTask(2);
369 EXPECT_NE(timer, nullptr);
370 LE_RunLoop(g_loopClient_);
371 LE_CloseLoop(g_loopClient_);
372 g_loopClient_ = nullptr;
373 }
374
375 HWTEST_F(LoopServerUnitTest, TestTimer2, TestSize.Level1)
376 {
377 LoopServerUnitTest test;
378 g_maxCount = 3; // 3 stop timer
379 TimerHandle timer = test.CreateTimerTask(3);
380 EXPECT_NE(timer, nullptr);
381 LE_RunLoop(g_loopClient_);
382 LE_CloseLoop(g_loopClient_);
383 g_loopClient_ = nullptr;
384 }
385
386 HWTEST_F(LoopServerUnitTest, TestWatcher, TestSize.Level1)
387 {
388 int fd = open(WATCHER_FILE.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
389 if (fd >= 0) {
390 write(fd, WATCHER_FILE.c_str(), WATCHER_FILE.size());
391 }
392 EXPECT_GE(fd, 0);
393 printf("Watcher fd %d \n", fd);
394 LoopServerUnitTest test;
395 WatcherHandle watcher = test.CreateWatcherTask(3, WATCHER_FILE.c_str());
396 EXPECT_NE(watcher, nullptr);
397 g_maxCount = 10; // 10 write watcher file
398 TimerHandle timer = test.CreateTimerTask(1);
399 EXPECT_NE(timer, nullptr);
400
401 LE_RunLoop(g_loopClient_);
402 LE_RemoveWatcher(g_loopClient_, watcher);
403 close(fd);
404 LE_CloseLoop(g_loopClient_);
405 g_loopClient_ = nullptr;
406 }
407
408 HWTEST_F(LoopServerUnitTest, TestStopServer, TestSize.Level1)
409 {
410 g_cmd = 5; // 5 close server
411 LoopServerUnitTest test;
412 TaskHandle pip = test.CreateConnect(PIPE_SERVER.c_str(), TASK_PIPE);
413 EXPECT_NE(pip, nullptr);
414 SendMessage(g_loopClient_, pip, FORMAT_STR.c_str(), g_cmd, "connect success");
415 LE_RunLoop(g_loopClient_);
416 LE_CloseStreamTask(g_loopClient_, pip);
417 LE_CloseLoop(g_loopClient_);
418 g_loopClient_ = nullptr;
419 }
420 } // namespace init_ut
421