1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "le_loop.h"
17
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21
22 #include "le_socket.h"
23 #include "le_task.h"
24
HandleSendMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26 const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28 EventLoop *loop = (EventLoop *)loopHandle;
29 StreamTask *stream = (StreamTask *)taskHandle;
30 LE_Buffer *buffer = GetFirstBuffer(stream);
31 while (buffer) {
32 int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33 if (strstr(((const char *)buffer->data + 12), "bootevent.boot.completed") != NULL) {
34 LE_LOGI("begin to send boot.completed to param_watcher, fd:%d, size:%u",
35 GetSocketFd(taskHandle), buffer->dataSize);
36 }
37 if (ret < 0 || (size_t)ret < buffer->dataSize) {
38 LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
39 buffer->dataSize, ret, errno);
40 }
41 LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
42 buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
43 if (complete != NULL) {
44 complete(taskHandle, buffer);
45 }
46 FreeBuffer(loopHandle, stream, buffer);
47 buffer = GetFirstBuffer(stream);
48 }
49 if (IsBufferEmpty(stream)) {
50 LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
51 loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
52 return LE_SUCCESS;
53 }
54 return LE_SUCCESS;
55 }
56
HandleRecvMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_RecvMessage recvMessage,const LE_HandleRecvMsg handleRecvMsg)57 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
58 const TaskHandle taskHandle, const LE_RecvMessage recvMessage, const LE_HandleRecvMsg handleRecvMsg)
59 {
60 LE_STATUS status = LE_SUCCESS;
61 LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
62 LE_CHECK(buffer != NULL, return LE_NO_MEMORY, "Failed to create buffer");
63 int readLen = 0;
64 while (1) {
65 if (handleRecvMsg != NULL) {
66 readLen = handleRecvMsg(taskHandle, buffer->data, LOOP_DEFAULT_BUFFER, 0);
67 } else {
68 readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
69 }
70 LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
71 if (readLen < 0) {
72 if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
73 continue;
74 }
75 status = LE_DIS_CONNECTED;
76 break;
77 } else if (readLen == 0) {
78 // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
79 status = LE_DIS_CONNECTED;
80 break;
81 } else {
82 break;
83 }
84 }
85 if (status != LE_SUCCESS) {
86 FreeBuffer(loopHandle, NULL, buffer);
87 return status;
88 }
89 if (recvMessage) {
90 recvMessage(taskHandle, buffer->data, readLen);
91 }
92 FreeBuffer(loopHandle, NULL, buffer);
93 return status;
94 }
95
HandleStreamEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)96 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
97 {
98 StreamConnectTask *stream = (StreamConnectTask *)handle;
99 LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
100
101 LE_STATUS status = LE_SUCCESS;
102 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
103 status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
104 }
105 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
106 status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage, stream->handleRecvMsg);
107 }
108 if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
109 if (stream->disConnectComplete) {
110 stream->disConnectComplete(handle);
111 }
112 LE_CloseStreamTask(loopHandle, handle);
113 }
114 return status;
115 }
116
HandleClientEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)117 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
118 {
119 StreamClientTask *client = (StreamClientTask *)handle;
120 LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
121
122 LE_STATUS status = LE_SUCCESS;
123 if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
124 LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
125 client->connected = 1;
126 status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
127 }
128 if (LE_TEST_FLAGS(oper, EVENT_READ)) {
129 status = HandleRecvMsg_(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
130 }
131 if (status == LE_DIS_CONNECTED) {
132 if (client->disConnectComplete) {
133 client->disConnectComplete(handle);
134 }
135 client->connected = 0;
136 LE_CloseStreamTask(loopHandle, handle);
137 }
138 return status;
139 }
140
HandleStreamTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)141 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
142 {
143 BaseTask *task = (BaseTask *)taskHandle;
144 DelTask((EventLoop *)loopHandle, task);
145 CloseTask(loopHandle, task);
146 if (task->taskId.fd > 0) {
147 close(task->taskId.fd);
148 }
149 }
150
DumpStreamServerTaskInfo_(const TaskHandle task)151 static void DumpStreamServerTaskInfo_(const TaskHandle task)
152 {
153 INIT_CHECK(task != NULL, return);
154 BaseTask *baseTask = (BaseTask *)task;
155 StreamServerTask *serverTask = (StreamServerTask *)baseTask;
156 printf("\tfd: %d \n", serverTask->base.taskId.fd);
157 printf("\t TaskType: %s \n", "ServerTask");
158 if (strlen(serverTask->server) > 0) {
159 printf("\t Server socket:%s \n", serverTask->server);
160 } else {
161 printf("\t Server socket:%s \n", "NULL");
162 }
163 }
164
DumpStreamConnectTaskInfo_(const TaskHandle task)165 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
166 {
167 INIT_CHECK(task != NULL, return);
168 BaseTask *baseTask = (BaseTask *)task;
169 StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
170 TaskHandle taskHandle = (TaskHandle)connectTask;
171 printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
172 printf("\t TaskType: %s \n", "ConnectTask");
173 printf("\t ServiceInfo: \n");
174 struct ucred cred = {-1, -1, -1};
175 socklen_t credSize = sizeof(struct ucred);
176 if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
177 printf("\t Service Pid: %d \n", cred.pid);
178 printf("\t Service Uid: %u \n", cred.uid);
179 printf("\t Service Gid: %u \n", cred.gid);
180 } else {
181 printf("\t Service Pid: %s \n", "NULL");
182 printf("\t Service Uid: %s \n", "NULL");
183 printf("\t Service Gid: %s \n", "NULL");
184 }
185 }
186
HandleServerEvent_(const LoopHandle loopHandle,const TaskHandle serverTask,uint32_t oper)187 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
188 {
189 LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
190 if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
191 return LE_FAILURE;
192 }
193 StreamServerTask *server = (StreamServerTask *)serverTask;
194 LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
195
196 int ret = server->incommingConnect(loopHandle, serverTask);
197 if (ret != LE_SUCCESS) {
198 LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
199 }
200 EventLoop *loop = (EventLoop *)loopHandle;
201 loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
202 return LE_SUCCESS;
203 }
204
LE_CreateStreamServer(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamServerInfo * info)205 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
206 TaskHandle *taskHandle, const LE_StreamServerInfo *info)
207 {
208 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
209 LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
210 LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
211 "Invalid parameters incommingConnect %s", info->server);
212
213 int fd = info->socketId;
214 int ret = 0;
215 if (info->socketId <= 0) {
216 fd = CreateSocket(info->baseInfo.flags, info->server);
217 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
218 } else {
219 ret = listenSocket(fd, info->baseInfo.flags, info->server);
220 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
221 }
222
223 EventLoop *loop = (EventLoop *)loopHandle;
224 StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
225 sizeof(StreamServerTask) + strlen(info->server) + 1);
226 LE_CHECK(task != NULL, close(fd);
227 return LE_NO_MEMORY, "Failed to create task");
228 task->base.handleEvent = HandleServerEvent_;
229 task->base.innerClose = HandleStreamTaskClose_;
230 task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
231 task->incommingConnect = info->incommingConnect;
232 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
233 ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
234 LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
235 *taskHandle = (TaskHandle)task;
236 return LE_SUCCESS;
237 }
238
LE_CreateStreamClient(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamInfo * info)239 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
240 TaskHandle *taskHandle, const LE_StreamInfo *info)
241 {
242 LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
243 LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
244
245 int fd = CreateSocket(info->baseInfo.flags, info->server);
246 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
247
248 StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
249 LE_CHECK(task != NULL, close(fd);
250 return LE_NO_MEMORY, "Failed to create task");
251 task->stream.base.handleEvent = HandleClientEvent_;
252 task->stream.base.innerClose = HandleStreamTaskClose_;
253 OH_ListInit(&task->stream.buffHead);
254 LoopMutexInit(&task->stream.mutex);
255
256 task->connectComplete = info->connectComplete;
257 task->sendMessageComplete = info->sendMessageComplete;
258 task->recvMessage = info->recvMessage;
259 task->disConnectComplete = info->disConnectComplete;
260 task->handleRecvMsg = info->handleRecvMsg;
261 EventLoop *loop = (EventLoop *)loopHandle;
262 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
263 *taskHandle = (TaskHandle)task;
264 return LE_SUCCESS;
265 }
266
LE_AcceptStreamClient(const LoopHandle loopHandle,const TaskHandle server,TaskHandle * taskHandle,const LE_StreamInfo * info)267 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
268 TaskHandle *taskHandle, const LE_StreamInfo *info)
269 {
270 LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
271 LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
272 LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
273 int fd = -1;
274 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
275 fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
276 LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
277 }
278 StreamConnectTask *task = (StreamConnectTask *)CreateTask(
279 loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
280 LE_CHECK(task != NULL, close(fd);
281 return LE_NO_MEMORY, "Failed to create task");
282 task->stream.base.handleEvent = HandleStreamEvent_;
283 task->stream.base.innerClose = HandleStreamTaskClose_;
284 task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
285 task->disConnectComplete = info->disConnectComplete;
286 task->sendMessageComplete = info->sendMessageComplete;
287 task->recvMessage = info->recvMessage;
288 task->serverTask = (StreamServerTask *)server;
289 task->handleRecvMsg = info->handleRecvMsg;
290 OH_ListInit(&task->stream.buffHead);
291 LoopMutexInit(&task->stream.mutex);
292 if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
293 EventLoop *loop = (EventLoop *)loopHandle;
294 loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
295 }
296 *taskHandle = (TaskHandle)task;
297 return 0;
298 }
299
LE_CloseStreamTask(const LoopHandle loopHandle,const TaskHandle taskHandle)300 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
301 {
302 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
303 LE_CloseTask(loopHandle, taskHandle);
304 }
305
LE_GetSocketFd(const TaskHandle taskHandle)306 int LE_GetSocketFd(const TaskHandle taskHandle)
307 {
308 LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
309 return GetSocketFd(taskHandle);
310 }
311