1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "le_task.h"
17
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <sys/socket.h>
21
22 #include "le_loop.h"
23 #include "le_socket.h"
24
CheckTaskFlags(const BaseTask * task,uint32_t flags)25 int CheckTaskFlags(const BaseTask *task, uint32_t flags)
26 {
27 if (task == NULL) {
28 return 0;
29 }
30 return ((task->flags & flags) == flags);
31 }
32
GetSocketFd(const TaskHandle task)33 int GetSocketFd(const TaskHandle task)
34 {
35 BaseTask *stream = (BaseTask *)task;
36 return stream->taskId.fd;
37 }
38
CreateTask(const LoopHandle loopHandle,int fd,const LE_BaseInfo * info,uint32_t size)39 BaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size)
40 {
41 if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) {
42 return NULL;
43 }
44 BaseTask *task = (BaseTask *)malloc(size + info->userDataSize);
45 LE_CHECK(task != NULL, return NULL, "Failed to alloc for task");
46 HASHMAPInitNode(&task->hashNode);
47 // key id
48 task->flags = info->flags;
49 task->taskId.fd = fd;
50 LE_STATUS ret = AddTask((EventLoop *)loopHandle, task);
51 LE_CHECK(ret == LE_SUCCESS, free(task);
52 return NULL, "Failed to alloc for task");
53 task->userDataSize = info->userDataSize;
54 task->userDataOffset = size;
55 task->close = info->close;
56 return task;
57 }
58
CloseTask(const LoopHandle loopHandle,BaseTask * task)59 void CloseTask(const LoopHandle loopHandle, BaseTask *task)
60 {
61 LE_LOGV("CloseTask %p", task);
62 LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters");
63 if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) ||
64 CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) {
65 StreamTask *stream = (StreamTask *)task;
66 LE_Buffer *buffer = GetFirstBuffer(stream);
67 while (buffer) {
68 FreeBuffer(loopHandle, stream, (BufferHandle)buffer);
69 buffer = GetFirstBuffer(stream);
70 }
71 }
72 if (task->close != NULL) {
73 task->close((TaskHandle)task);
74 }
75 DelTask((EventLoop *)loopHandle, task);
76 }
77
CreateBuffer(uint32_t bufferSize)78 LE_Buffer *CreateBuffer(uint32_t bufferSize)
79 {
80 if (bufferSize >= LOOP_MAX_BUFFER) {
81 return NULL;
82 }
83 LE_Buffer *buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize);
84 LE_CHECK(buffer != NULL, return NULL, "Failed to alloc memory for buffer");
85 ListInit(&buffer->node);
86 buffer->buffSize = bufferSize;
87 buffer->dataSize = 0;
88 return buffer;
89 }
90
IsBufferEmpty(StreamTask * task)91 int IsBufferEmpty(StreamTask *task)
92 {
93 LoopMutexLock(&task->mutex);
94 int ret = ListEmpty(task->buffHead);
95 LoopMutexUnlock(&task->mutex);
96 return ret;
97 }
98
GetFirstBuffer(StreamTask * task)99 LE_Buffer *GetFirstBuffer(StreamTask *task)
100 {
101 LoopMutexLock(&task->mutex);
102 ListNode *node = task->buffHead.next;
103 LE_Buffer *buffer = NULL;
104 if (node != &task->buffHead) {
105 buffer = ListEntry(node, LE_Buffer, node);
106 }
107 LoopMutexUnlock(&task->mutex);
108 return buffer;
109 }
110
AddBuffer(StreamTask * task,LE_Buffer * buffer)111 void AddBuffer(StreamTask *task, LE_Buffer *buffer)
112 {
113 LoopMutexLock(&task->mutex);
114 ListAddTail(&task->buffHead, &buffer->node);
115 LoopMutexUnlock(&task->mutex);
116 }
117
GetNextBuffer(StreamTask * task,const LE_Buffer * next)118 LE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next)
119 {
120 LoopMutexLock(&task->mutex);
121 LE_Buffer *buffer = NULL;
122 ListNode *node = NULL;
123 if (next == NULL) {
124 node = task->buffHead.next;
125 } else {
126 node = next->node.next;
127 }
128 if (node != &task->buffHead) {
129 buffer = ListEntry(node, LE_Buffer, node);
130 }
131 LoopMutexUnlock(&task->mutex);
132 return buffer;
133 }
134
FreeBuffer(const LoopHandle loop,StreamTask * task,LE_Buffer * buffer)135 void FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer)
136 {
137 LE_CHECK(buffer != NULL, return, "Invalid buffer");
138 if (task == NULL) {
139 free(buffer);
140 return;
141 }
142 if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) ||
143 CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) {
144 LoopMutexLock(&task->mutex);
145 ListRemove(&buffer->node);
146 LoopMutexUnlock(&task->mutex);
147 }
148 free(buffer);
149 }
150
LE_CreateBuffer(const LoopHandle loop,uint32_t bufferSize)151 BufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize)
152 {
153 return (BufferHandle)CreateBuffer(bufferSize);
154 }
155
LE_FreeBuffer(const LoopHandle loop,const TaskHandle taskHandle,const BufferHandle handle)156 void LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle)
157 {
158 FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle);
159 }
160
LE_GetBufferInfo(const BufferHandle handle,uint32_t * dataSize,uint32_t * buffSize)161 uint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize)
162 {
163 LE_Buffer *buffer = (LE_Buffer *)handle;
164 LE_CHECK(buffer != NULL, return NULL, "Invalid buffer");
165 if (dataSize) {
166 *dataSize = (uint32_t)buffer->dataSize;
167 }
168 if (buffSize) {
169 *buffSize = (uint32_t)buffer->buffSize;
170 }
171 return buffer->data;
172 }
173
LE_Send(const LoopHandle loopHandle,const TaskHandle taskHandle,const BufferHandle buffHandle,uint32_t buffLen)174 LE_STATUS LE_Send(const LoopHandle loopHandle,
175 const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen)
176 {
177 EventLoop *loop = (EventLoop *)loopHandle;
178 LE_Buffer *buffer = (LE_Buffer *)buffHandle;
179 buffer->dataSize = buffLen;
180 if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) {
181 AddBuffer((StreamTask *)taskHandle, buffer);
182 } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) {
183 AddBuffer((StreamTask *)taskHandle, buffer);
184 }
185 loop->modEvent(loop, (BaseTask *)taskHandle, Event_Write);
186 return LE_SUCCESS;
187 }
188
LE_CloseTask(const LoopHandle loopHandle,const TaskHandle taskHandle)189 void LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
190 {
191 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
192 LE_LOGV("LE_CloseTask %d %p", GetSocketFd(taskHandle), taskHandle);
193 BaseTask *task = (BaseTask *)taskHandle;
194 if (task->innerClose != NULL) {
195 task->innerClose(loopHandle, taskHandle);
196 }
197 free(task);
198 }
199
LE_GetUserData(TaskHandle handle)200 void *LE_GetUserData(TaskHandle handle)
201 {
202 LE_CHECK(handle != NULL, return NULL, "Invalid handle");
203 BaseTask *stream = (BaseTask *)handle;
204 return (void *)(((char *)stream) + stream->userDataOffset);
205 }