1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16 #include <pthread.h>
17 #include <sys/eventfd.h>
18
19 #include "begetctl.h"
20 #include "init.h"
21 #include "init_hashmap.h"
22 #include "init_param.h"
23 #include "init_utils.h"
24 #include "le_epoll.h"
25 #include "le_loop.h"
26 #include "le_socket.h"
27 #include "le_task.h"
28 #include "loop_event.h"
29 #include "param_manager.h"
30 #include "param_message.h"
31 #include "param_utils.h"
32 #include "trigger_manager.h"
33
34 using namespace testing::ext;
35 using namespace std;
36
37 using HashTab = struct {
38 HashNodeCompare nodeCompare;
39 HashKeyCompare keyCompare;
40 HashNodeFunction nodeHash;
41 HashKeyFunction keyHash;
42 HashNodeOnFree nodeFree;
43 int maxBucket;
44 uint32_t tableId;
45 HashNode *buckets[0];
46 };
47
48 extern "C" {
49 void OnClose(ParamTaskPtr client);
50 }
TestHandleTaskEvent(const LoopHandle loop,const TaskHandle task,uint32_t oper)51 static LE_STATUS TestHandleTaskEvent(const LoopHandle loop, const TaskHandle task, uint32_t oper)
52 {
53 return LE_SUCCESS;
54 }
55
OnReceiveRequest(const TaskHandle task,const uint8_t * buffer,uint32_t nread)56 static void OnReceiveRequest(const TaskHandle task, const uint8_t *buffer, uint32_t nread)
57 {
58 UNUSED(task);
59 UNUSED(buffer);
60 UNUSED(nread);
61 }
62
ProcessAsyncEvent(const TaskHandle taskHandle,uint64_t eventId,const uint8_t * buffer,uint32_t buffLen)63 static void ProcessAsyncEvent(const TaskHandle taskHandle, uint64_t eventId, const uint8_t *buffer, uint32_t buffLen)
64 {
65 UNUSED(taskHandle);
66 UNUSED(eventId);
67 UNUSED(buffer);
68 UNUSED(buffLen);
69 }
70
IncomingConnect(LoopHandle loop,TaskHandle server)71 static int IncomingConnect(LoopHandle loop, TaskHandle server)
72 {
73 UNUSED(loop);
74 UNUSED(server);
75 return 0;
76 }
77
ProcessWatchEventTest(WatcherHandle taskHandle,int fd,uint32_t * events,const void * context)78 static void ProcessWatchEventTest(WatcherHandle taskHandle, int fd, uint32_t *events, const void *context)
79 {
80 UNUSED(taskHandle);
81 UNUSED(fd);
82 UNUSED(events);
83 UNUSED(context);
84 }
85
86 namespace init_ut {
87 class LoopEventUnittest : public testing::Test {
88 public:
LoopEventUnittest()89 LoopEventUnittest() {};
~LoopEventUnittest()90 virtual ~LoopEventUnittest() {};
SetUpTestCase(void)91 static void SetUpTestCase(void) {};
TearDownTestCase(void)92 static void TearDownTestCase(void) {};
SetUp()93 void SetUp() {};
TearDown()94 void TearDown() {};
TestBody(void)95 void TestBody(void) {};
CreateServerTask()96 int CreateServerTask()
97 {
98 CheckTaskFlags(nullptr, Event_Write);
99 ParamStreamInfo info = {};
100 info.server = const_cast<char *>(PIPE_NAME);
101 info.close = NULL;
102 info.recvMessage = NULL;
103 info.incomingConnect = OnIncomingConnect;
104 return ParamServerCreate(&serverTask_, &info);
105 }
StreamTaskTest()106 void StreamTaskTest ()
107 {
108 LE_StreamInfo streamInfo = {};
109 streamInfo.recvMessage = OnReceiveRequest;
110 streamInfo.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_CONNECT | TASK_TEST;
111 streamInfo.server = (char *)"/data/testpipea";
112 TaskHandle clientTaskHandle = nullptr;
113 LE_AcceptStreamClient(LE_GetDefaultLoop(), (TaskHandle)serverTask_, &clientTaskHandle, &streamInfo);
114 if (clientTaskHandle == nullptr) {
115 return;
116 }
117 ((StreamConnectTask *)clientTaskHandle)->stream.base.handleEvent(LE_GetDefaultLoop(),
118 (TaskHandle)clientTaskHandle, Event_Read);
119 ((StreamConnectTask *)clientTaskHandle)->stream.base.handleEvent(LE_GetDefaultLoop(),
120 (TaskHandle)clientTaskHandle, Event_Write);
121
122 ((StreamConnectTask *)clientTaskHandle)->stream.base.handleEvent(LE_GetDefaultLoop(),
123 (TaskHandle)clientTaskHandle, 0);
124
125 streamInfo.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER;
126 streamInfo.server = (char *)"/data/testpipeb";
127 TaskHandle clientTaskHandleb = nullptr;
128 LE_CreateStreamClient(LE_GetDefaultLoop(), &clientTaskHandleb, &streamInfo);
129 if (clientTaskHandleb == nullptr) {
130 return;
131 }
132 ((StreamClientTask *)clientTaskHandleb)->stream.base.handleEvent(LE_GetDefaultLoop(),
133 clientTaskHandleb, Event_Read);
134 ((StreamClientTask *)clientTaskHandleb)->stream.base.handleEvent(LE_GetDefaultLoop(),
135 clientTaskHandleb, Event_Write);
136 ((StreamClientTask *)clientTaskHandleb)->stream.base.innerClose(LE_GetDefaultLoop(), clientTaskHandleb);
137
138 TaskHandle clientTaskHandlec = nullptr;
139 streamInfo.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_SERVER;
140 streamInfo.server = (char *)"0.0.0.0:10110";
141 LE_CreateStreamClient(LE_GetDefaultLoop(), &clientTaskHandlec, &streamInfo);
142 if (clientTaskHandlec == nullptr) {
143 return;
144 }
145 TaskHandle clientTaskHandled = nullptr;
146 streamInfo.baseInfo.flags = TASK_STREAM | TASK_TCP | TASK_CONNECT;
147 streamInfo.server = (char *)"127.0.0.1:10111";
148 LE_CreateStreamClient(LE_GetDefaultLoop(), &clientTaskHandled, &streamInfo);
149 if (clientTaskHandled == nullptr) {
150 return;
151 }
152 }
LeTaskTest()153 void LeTaskTest()
154 {
155 LE_StreamServerInfo info = {};
156 info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_SERVER | TASK_TEST;
157 info.server = (char *)"/data/testpipe";
158 info.baseInfo.close = OnClose;
159 info.incommingConnect = IncomingConnect;
160 LE_CreateStreamServer(LE_GetDefaultLoop(), &serverTask_, &info);
161 if (serverTask_ == nullptr) {
162 return;
163 }
164 ((StreamServerTask *)serverTask_)->base.handleEvent(LE_GetDefaultLoop(), serverTask_, Event_Read);
165
166 uint64_t eventId = 0;
167 ParamStreamInfo paramStreamInfo = {};
168 paramStreamInfo.flags = PARAM_TEST_FLAGS;
169 paramStreamInfo.server = NULL;
170 paramStreamInfo.close = OnClose;
171 paramStreamInfo.recvMessage = ProcessMessage;
172 paramStreamInfo.incomingConnect = NULL;
173 ParamTaskPtr client = NULL;
174 int ret = ParamStreamCreate(&client, serverTask_, ¶mStreamInfo, sizeof(ParamWatcher));
175 PARAM_CHECK(ret == 0, return, "Failed to create client");
176
177 BufferHandle handle = LE_CreateBuffer(LE_GetDefaultLoop(), 1 + sizeof(eventId));
178 LE_Buffer *buffer = (LE_Buffer *)handle;
179 AddBuffer((StreamTask *)client, buffer);
180 ((StreamConnectTask *)client)->stream.base.handleEvent(LE_GetDefaultLoop(), (TaskHandle)(client), Event_Write);
181
182 ParamMessage *request = (ParamMessage *)CreateParamMessage(MSG_SET_PARAM, "name", sizeof(ParamMessage));
183 ((StreamConnectTask *)client)->recvMessage(LE_GetDefaultLoop(), reinterpret_cast<uint8_t *>(request),
184 sizeof(ParamMessage));
185
186 LE_Buffer *next = nullptr;
187 EXPECT_EQ(GetNextBuffer((StreamTask *)client, next), nullptr);
188 ParamWatcher *watcher = (ParamWatcher *)ParamGetTaskUserData(client);
189 PARAM_CHECK(watcher != nullptr, return, "Failed to get watcher");
190 OH_ListInit(&watcher->triggerHead);
191 OnClose(client);
192 LE_FreeBuffer(LE_GetDefaultLoop(), (TaskHandle)client, nullptr);
193 return;
194 }
ProcessEventTest()195 void ProcessEventTest()
196 {
197 ProcessEvent((EventLoop *)LE_GetDefaultLoop(), 1, Event_Read);
198 LE_BaseInfo info = {TASK_EVENT, NULL};
199 int testfd = 65535; // 65535 is not exist fd
200 BaseTask *task = CreateTask(LE_GetDefaultLoop(), testfd, &info, sizeof(StreamClientTask));
201 if (task != nullptr) {
202 task->handleEvent = TestHandleTaskEvent;
203 ProcessEvent((EventLoop *)LE_GetDefaultLoop(), testfd, Event_Read);
204 ((HashTab *)(((EventLoop *)LE_GetDefaultLoop())->taskMap))->nodeFree(&task->hashNode);
205 }
206 }
ProcessasynEvent()207 void ProcessasynEvent()
208 {
209 TaskHandle asynHandle = nullptr;
210 LE_CreateAsyncTask(LE_GetDefaultLoop(), &asynHandle, ProcessAsyncEvent);
211 if (asynHandle == nullptr) {
212 return;
213 }
214 ((AsyncEventTask *)asynHandle)->stream.base.handleEvent(LE_GetDefaultLoop(), asynHandle, Event_Read);
215 ((AsyncEventTask *)asynHandle)->stream.base.handleEvent(LE_GetDefaultLoop(), asynHandle, Event_Write);
216 LE_StopAsyncTask(LE_GetDefaultLoop(), asynHandle);
217 }
ProcessWatcherTask()218 void ProcessWatcherTask()
219 {
220 WatcherHandle handle = nullptr;
221 LE_WatchInfo info = {};
222 info.fd = -1;
223 info.flags = WATCHER_ONCE;
224 info.events = Event_Read;
225 info.processEvent = ProcessWatchEventTest;
226 LE_StartWatcher(LE_GetDefaultLoop(), &handle, &info, nullptr);
227 if (handle == nullptr) {
228 return;
229 }
230 ((WatcherTask *)handle)->base.handleEvent(LE_GetDefaultLoop(), (TaskHandle)handle, Event_Read);
231 ((WatcherTask *)handle)->base.handleEvent(LE_GetDefaultLoop(), (TaskHandle)handle, 0);
232 ((WatcherTask *)handle)->base.flags = WATCHER_ONCE;
233 ((WatcherTask *)handle)->base.handleEvent(LE_GetDefaultLoop(), (TaskHandle)handle, Event_Read);
234 }
CreateSocketTest()235 void CreateSocketTest()
236 {
237 ParamTaskPtr serverTask = nullptr;
238 LE_StreamServerInfo info = {};
239 info.baseInfo.flags = TASK_PIPE | TASK_CONNECT | TASK_TEST;
240 info.server = (char *)"/data/testpipe";
241 info.baseInfo.close = OnClose;
242 info.incommingConnect = IncomingConnect;
243 info.socketId = 1111; // 1111 is test fd
244 LE_CreateStreamServer(LE_GetDefaultLoop(), &serverTask, &info);
245 EXPECT_NE(serverTask, nullptr);
246 if (serverTask == nullptr) {
247 return;
248 }
249 ((StreamServerTask *)serverTask)->base.taskId.fd = -1;
250 OnIncomingConnect(LE_GetDefaultLoop(), serverTask);
251 LE_GetSocketFd(serverTask);
252 AcceptSocket(-1, TASK_PIPE);
253 AcceptSocket(-1, TASK_TCP);
254 AcceptSocket(-1, TASK_TEST);
255 }
256
257 private:
258 ParamTaskPtr serverTask_ = NULL;
259 };
260
261 HWTEST_F(LoopEventUnittest, StreamTaskTest, TestSize.Level1)
262 {
263 LoopEventUnittest loopevtest = LoopEventUnittest();
264 loopevtest.CreateServerTask();
265 loopevtest.StreamTaskTest();
266 LE_StreamInfo streamInfo = {};
267 streamInfo.recvMessage = OnReceiveRequest;
268 streamInfo.baseInfo.flags = TASK_PIPE | TASK_CONNECT;
269 streamInfo.server = (char *)PIPE_NAME;
270 TaskHandle clientTaskHandlec = nullptr;
271 LE_CreateStreamClient(LE_GetDefaultLoop(), &clientTaskHandlec, &streamInfo);
272 EXPECT_NE(clientTaskHandlec, nullptr);
273 }
274
275 HWTEST_F(LoopEventUnittest, LeTaskTest, TestSize.Level1)
276 {
277 LoopEventUnittest loopevtest = LoopEventUnittest();
278 loopevtest.LeTaskTest();
279 }
280 HWTEST_F(LoopEventUnittest, runServerTest, TestSize.Level1)
281 {
282 LoopEventUnittest loopevtest = LoopEventUnittest();
283 loopevtest.ProcessEventTest();
284 }
285 HWTEST_F(LoopEventUnittest, ProcessasynEvent, TestSize.Level1)
286 {
287 LoopEventUnittest loopevtest = LoopEventUnittest();
288 loopevtest.ProcessasynEvent();
289 }
290 HWTEST_F(LoopEventUnittest, CreateSocketTest, TestSize.Level1)
291 {
292 LoopEventUnittest loopevtest = LoopEventUnittest();
293 loopevtest.CreateSocketTest();
294 }
295 HWTEST_F(LoopEventUnittest, ProcessWatcherTask, TestSize.Level1)
296 {
297 LoopEventUnittest loopevtest = LoopEventUnittest();
298 loopevtest.ProcessWatcherTask();
299 }
300
301 static LoopHandle g_loop = nullptr;
302 static int g_timeCount = 0;
Test_ProcessTimer(const TimerHandle taskHandle,void * context)303 static void Test_ProcessTimer(const TimerHandle taskHandle, void *context)
304 {
305 g_timeCount++;
306 printf("Test_ProcessTimer %d\n", g_timeCount);
307 if (g_timeCount > 1) {
308 LE_StopLoop(g_loop);
309 }
310 }
311
312 HWTEST_F(LoopEventUnittest, LoopRunTest, TestSize.Level1)
313 {
314 ASSERT_EQ(LE_CreateLoop(&g_loop), 0);
315 TimerHandle timer = nullptr;
316 int ret = LE_CreateTimer(g_loop, &timer, Test_ProcessTimer, nullptr);
317 ASSERT_EQ(ret, 0);
318 ret = LE_StartTimer(g_loop, timer, 500, 2);
319 ASSERT_EQ(ret, 0);
320 LE_CloseLoop(g_loop);
321 LE_RunLoop(g_loop);
322 LE_CloseLoop(g_loop);
323 }
324 } // namespace init_ut
325