• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "le_loop.h"
17 
18 #include <errno.h>
19 #include <sys/socket.h>
20 #include "securec.h"
21 
22 #include "le_socket.h"
23 #include "le_task.h"
24 
HandleSendMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_SendMessageComplete complete)25 static LE_STATUS HandleSendMsg_(const LoopHandle loopHandle,
26     const TaskHandle taskHandle, const LE_SendMessageComplete complete)
27 {
28     EventLoop *loop = (EventLoop *)loopHandle;
29     StreamTask *stream = (StreamTask *)taskHandle;
30     LE_Buffer *buffer = GetFirstBuffer(stream);
31     while (buffer) {
32         int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
33         if (ret < buffer->dataSize) {
34             LE_LOGE("HandleSendMsg_ fd:%d send data size %d %d, err:%d", GetSocketFd(taskHandle),
35                     buffer->dataSize, ret, errno);
36         }
37         LE_LOGV("HandleSendMsg_ fd:%d send data size %d %d", GetSocketFd(taskHandle), buffer->dataSize, ret);
38         buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
39         if (complete != NULL) {
40             complete(taskHandle, buffer);
41         }
42         FreeBuffer(loopHandle, stream, buffer);
43         buffer = GetFirstBuffer(stream);
44     }
45     if (IsBufferEmpty(stream)) {
46         LE_LOGV("HandleSendMsg_ fd:%d empty wait read", GetSocketFd(taskHandle));
47         loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
48         return LE_SUCCESS;
49     }
50     return LE_SUCCESS;
51 }
52 
HandleRecvMsg_(const LoopHandle loopHandle,const TaskHandle taskHandle,const LE_RecvMessage recvMessage)53 static LE_STATUS HandleRecvMsg_(const LoopHandle loopHandle,
54     const TaskHandle taskHandle, const LE_RecvMessage recvMessage)
55 {
56     LE_STATUS status = LE_SUCCESS;
57     LE_Buffer *buffer = CreateBuffer(LOOP_DEFAULT_BUFFER);
58     int readLen = 0;
59     while (1) {
60         readLen = recv(GetSocketFd(taskHandle), buffer->data, LOOP_DEFAULT_BUFFER, 0);
61         LE_LOGV("HandleRecvMsg fd:%d read msg len %d", GetSocketFd(taskHandle), readLen);
62         if (readLen < 0) {
63             if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
64                 continue;
65             }
66             status = LE_DIS_CONNECTED;
67             break;
68         } else if (readLen == 0) {
69             // 若另一端已关闭连接则返回0,这种关闭是对方主动且正常的关闭
70             status = LE_DIS_CONNECTED;
71             break;
72         } else {
73             break;
74         }
75     }
76     if (status != LE_SUCCESS) {
77         FreeBuffer(loopHandle, NULL, buffer);
78         return status;
79     }
80     if (recvMessage) {
81         recvMessage(taskHandle, buffer->data, readLen);
82     }
83     FreeBuffer(loopHandle, NULL, buffer);
84     return status;
85 }
86 
HandleStreamEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)87 static LE_STATUS HandleStreamEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
88 {
89     EventLoop *loop = (EventLoop *)loopHandle;
90     StreamConnectTask *stream = (StreamConnectTask *)handle;
91     LE_LOGV("HandleStreamEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
92 
93     LE_STATUS status = LE_SUCCESS;
94     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
95         status = HandleSendMsg_(loopHandle, handle, stream->sendMessageComplete);
96     }
97     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
98         status = HandleRecvMsg_(loopHandle, handle, stream->recvMessage);
99     }
100     if (status == LE_DIS_CONNECTED || LE_TEST_FLAGS(oper, EVENT_ERROR)) {
101         loop->delEvent(loop, GetSocketFd(handle), EVENT_READ | EVENT_WRITE);
102         if (stream->disConnectComplete) {
103             stream->disConnectComplete(handle);
104         }
105         LE_CloseStreamTask(loopHandle, handle);
106     }
107     return status;
108 }
109 
HandleClientEvent_(const LoopHandle loopHandle,const TaskHandle handle,uint32_t oper)110 static LE_STATUS HandleClientEvent_(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
111 {
112     StreamClientTask *client = (StreamClientTask *)handle;
113     LE_LOGV("HandleClientEvent_ fd:%d oper 0x%x", GetSocketFd(handle), oper);
114 
115     LE_STATUS status = LE_SUCCESS;
116     if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
117         LE_ONLY_CHECK(!(client->connected == 0 && client->connectComplete), client->connectComplete(handle));
118         client->connected = 1;
119         status = HandleSendMsg_(loopHandle, handle, client->sendMessageComplete);
120     }
121     if (LE_TEST_FLAGS(oper, EVENT_READ)) {
122         status = HandleRecvMsg_(loopHandle, handle, client->recvMessage);
123     }
124     if (status == LE_DIS_CONNECTED) {
125         if (client->disConnectComplete) {
126             client->disConnectComplete(handle);
127         }
128         client->connected = 0;
129         LE_CloseStreamTask(loopHandle, handle);
130     }
131     return status;
132 }
133 
HandleStreamTaskClose_(const LoopHandle loopHandle,const TaskHandle taskHandle)134 static void HandleStreamTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle)
135 {
136     BaseTask *task = (BaseTask *)taskHandle;
137     CloseTask(loopHandle, task);
138     DelTask((EventLoop *)loopHandle, task);
139     if (task->taskId.fd > 0) {
140         close(task->taskId.fd);
141     }
142 }
143 
DumpStreamServerTaskInfo_(const TaskHandle task)144 static void DumpStreamServerTaskInfo_(const TaskHandle task)
145 {
146     INIT_CHECK(task != NULL, return);
147     BaseTask *baseTask = (BaseTask *)task;
148     StreamServerTask *serverTask = (StreamServerTask *)baseTask;
149     printf("\tfd: %d \n", serverTask->base.taskId.fd);
150     printf("\t  TaskType: %s \n", "ServerTask");
151     if (strlen(serverTask->server) > 0) {
152         printf("\t  Server socket:%s \n", serverTask->server);
153     } else {
154         printf("\t  Server socket:%s \n", "NULL");
155     }
156 }
157 
DumpStreamConnectTaskInfo_(const TaskHandle task)158 static void DumpStreamConnectTaskInfo_(const TaskHandle task)
159 {
160     INIT_CHECK(task != NULL, return);
161     BaseTask *baseTask = (BaseTask *)task;
162     StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
163     TaskHandle taskHandle = (TaskHandle)connectTask;
164     printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
165     printf("\t  TaskType: %s \n", "ConnectTask");
166     printf("\t  ServiceInfo: \n");
167     struct ucred cred = {-1, -1, -1};
168     socklen_t credSize  = sizeof(struct ucred);
169     if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
170         printf("\t    Service Pid: %d \n", cred.pid);
171         printf("\t    Service Uid: %d \n", cred.uid);
172         printf("\t    Service Gid: %d \n", cred.gid);
173     } else {
174         printf("\t    Service Pid: %s \n", "NULL");
175         printf("\t    Service Uid: %s \n", "NULL");
176         printf("\t    Service Gid: %s \n", "NULL");
177     }
178 }
179 
HandleServerEvent_(const LoopHandle loopHandle,const TaskHandle serverTask,uint32_t oper)180 static LE_STATUS HandleServerEvent_(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
181 {
182     LE_LOGV("HandleServerEvent_ fd %d oper 0x%x", GetSocketFd(serverTask), oper);
183     if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
184         return LE_FAILURE;
185     }
186     StreamServerTask *server = (StreamServerTask *)serverTask;
187     LE_ONLY_CHECK(server->incommingConnect != NULL, return LE_SUCCESS);
188 
189     int ret = server->incommingConnect(loopHandle, serverTask);
190     if (ret != LE_SUCCESS) {
191         LE_LOGE("HandleServerEvent_ fd %d do not accept socket", GetSocketFd(serverTask));
192     }
193     EventLoop *loop = (EventLoop *)loopHandle;
194     loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
195     return LE_SUCCESS;
196 }
197 
LE_CreateStreamServer(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamServerInfo * info)198 LE_STATUS LE_CreateStreamServer(const LoopHandle loopHandle,
199     TaskHandle *taskHandle, const LE_StreamServerInfo *info)
200 {
201     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
202     LE_CHECK(info->server != NULL, return LE_INVALID_PARAM, "Invalid parameters server");
203     LE_CHECK(info->incommingConnect != NULL, return LE_INVALID_PARAM,
204         "Invalid parameters incommingConnect %s", info->server);
205 
206     int fd = info->socketId;
207     int ret = 0;
208     if (info->socketId <= 0) {
209         fd = CreateSocket(info->baseInfo.flags, info->server);
210         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
211     } else {
212         ret = listenSocket(fd, info->baseInfo.flags, info->server);
213         LE_CHECK(ret == 0, return LE_FAILURE, "Failed to listen socket %s", info->server);
214     }
215 
216     EventLoop *loop = (EventLoop *)loopHandle;
217     StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &info->baseInfo,
218         sizeof(StreamServerTask) + strlen(info->server) + 1);
219     LE_CHECK(task != NULL, close(fd);
220         return LE_NO_MEMORY, "Failed to create task");
221     task->base.handleEvent = HandleServerEvent_;
222     task->base.innerClose = HandleStreamTaskClose_;
223     task->base.dumpTaskInfo = DumpStreamServerTaskInfo_;
224     task->incommingConnect = info->incommingConnect;
225     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
226     ret = memcpy_s(task->server, strlen(info->server) + 1, info->server, strlen(info->server) + 1);
227     LE_CHECK(ret == 0, return LE_FAILURE, "Failed to copy server name %s", info->server);
228     *taskHandle = (TaskHandle)task;
229     return LE_SUCCESS;
230 }
231 
LE_CreateStreamClient(const LoopHandle loopHandle,TaskHandle * taskHandle,const LE_StreamInfo * info)232 LE_STATUS LE_CreateStreamClient(const LoopHandle loopHandle,
233     TaskHandle *taskHandle, const LE_StreamInfo *info)
234 {
235     LE_CHECK(loopHandle != NULL && taskHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
236     LE_CHECK(info->recvMessage != NULL, return LE_FAILURE, "Invalid parameters recvMessage %s", info->server);
237 
238     int fd = CreateSocket(info->baseInfo.flags, info->server);
239     LE_CHECK(fd > 0, return LE_FAILURE, "Failed to create socket %s", info->server);
240 
241     StreamClientTask *task = (StreamClientTask *)CreateTask(loopHandle, fd, &info->baseInfo, sizeof(StreamClientTask));
242     LE_CHECK(task != NULL, close(fd);
243         return LE_NO_MEMORY, "Failed to create task");
244     task->stream.base.handleEvent = HandleClientEvent_;
245     task->stream.base.innerClose = HandleStreamTaskClose_;
246     OH_ListInit(&task->stream.buffHead);
247     LoopMutexInit(&task->stream.mutex);
248 
249     task->connectComplete = info->connectComplete;
250     task->sendMessageComplete = info->sendMessageComplete;
251     task->recvMessage = info->recvMessage;
252     task->disConnectComplete = info->disConnectComplete;
253     EventLoop *loop = (EventLoop *)loopHandle;
254     loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
255     *taskHandle = (TaskHandle)task;
256     return LE_SUCCESS;
257 }
258 
LE_AcceptStreamClient(const LoopHandle loopHandle,const TaskHandle server,TaskHandle * taskHandle,const LE_StreamInfo * info)259 LE_STATUS LE_AcceptStreamClient(const LoopHandle loopHandle, const TaskHandle server,
260     TaskHandle *taskHandle, const LE_StreamInfo *info)
261 {
262     LE_CHECK(loopHandle != NULL && info != NULL, return LE_INVALID_PARAM, "Invalid parameters");
263     LE_CHECK(server != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters");
264     LE_CHECK(info->recvMessage != NULL, return LE_INVALID_PARAM, "Invalid parameters recvMessage");
265     int fd = -1;
266     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
267         fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
268         LE_CHECK(fd > 0, return LE_FAILURE, "Failed to accept socket %d", GetSocketFd(server));
269     }
270     StreamConnectTask *task = (StreamConnectTask *)CreateTask(
271         loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
272     LE_CHECK(task != NULL, close(fd);
273         return LE_NO_MEMORY, "Failed to create task");
274     task->stream.base.handleEvent = HandleStreamEvent_;
275     task->stream.base.innerClose = HandleStreamTaskClose_;
276     task->stream.base.dumpTaskInfo = DumpStreamConnectTaskInfo_;
277     task->disConnectComplete = info->disConnectComplete;
278     task->sendMessageComplete = info->sendMessageComplete;
279     task->recvMessage = info->recvMessage;
280     task->serverTask = (StreamServerTask *)server;
281     OH_ListInit(&task->stream.buffHead);
282     LoopMutexInit(&task->stream.mutex);
283     if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
284         EventLoop *loop = (EventLoop *)loopHandle;
285         loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
286     }
287     *taskHandle = (TaskHandle)task;
288     return 0;
289 }
290 
LE_CloseStreamTask(const LoopHandle loopHandle,const TaskHandle taskHandle)291 void LE_CloseStreamTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
292 {
293     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
294     LE_CloseTask(loopHandle, taskHandle);
295 }
296 
LE_GetSocketFd(const TaskHandle taskHandle)297 int LE_GetSocketFd(const TaskHandle taskHandle)
298 {
299     LE_CHECK(taskHandle != NULL, return -1, "Invalid parameters");
300     return GetSocketFd(taskHandle);
301 }