1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "le_task.h"
17
18
19 #include "le_loop.h"
20 #include "le_utils.h"
21
CheckTaskFlags(const BaseTask * task,uint32_t flags)22 int CheckTaskFlags(const BaseTask *task, uint32_t flags)
23 {
24 if (task == NULL) {
25 return 0;
26 }
27 return ((task->flags & flags) == flags);
28 }
29
GetSocketFd(const TaskHandle task)30 int GetSocketFd(const TaskHandle task)
31 {
32 BaseTask *stream = (BaseTask *)task;
33 return stream->taskId.fd;
34 }
35
CreateTask(const LoopHandle loopHandle,int fd,const LE_BaseInfo * info,uint32_t size)36 BaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size)
37 {
38 if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) {
39 return NULL;
40 }
41 BaseTask *task = (BaseTask *)calloc(1, size + info->userDataSize);
42 LE_CHECK(task != NULL, return NULL, "Failed to alloc for task");
43 HASHMAPInitNode(&task->hashNode);
44 // key id
45 task->flags = info->flags;
46 task->taskId.fd = fd;
47 LE_STATUS ret = AddTask((EventLoop *)loopHandle, task);
48 LE_CHECK(ret == LE_SUCCESS, free(task);
49 return NULL, "Failed to alloc for task");
50 task->userDataSize = info->userDataSize;
51 task->userDataOffset = size;
52 task->close = info->close;
53 return task;
54 }
55
CloseTask(const LoopHandle loopHandle,BaseTask * task)56 void CloseTask(const LoopHandle loopHandle, BaseTask *task)
57 {
58 LE_LOGV("CloseTask");
59 LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters");
60 if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) ||
61 CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) {
62 StreamTask *stream = (StreamTask *)task;
63 LE_Buffer *buffer = GetFirstBuffer(stream);
64 while (buffer) {
65 FreeBuffer(loopHandle, stream, (BufferHandle)buffer);
66 buffer = GetFirstBuffer(stream);
67 }
68 }
69 if (task->close != NULL) {
70 task->close((TaskHandle)task);
71 }
72 }
73
CreateBuffer(uint32_t bufferSize)74 LE_Buffer *CreateBuffer(uint32_t bufferSize)
75 {
76 LE_ONLY_CHECK(bufferSize < LOOP_MAX_BUFFER, return NULL);
77 LE_Buffer *buffer = NULL;
78 LE_CHECK((buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize)) != NULL,
79 return NULL, "Failed to alloc memory for buffer");
80 OH_ListInit(&buffer->node);
81 buffer->buffSize = bufferSize;
82 buffer->dataSize = 0;
83 return buffer;
84 }
85
IsBufferEmpty(StreamTask * task)86 int IsBufferEmpty(StreamTask *task)
87 {
88 LoopMutexLock(&task->mutex);
89 int ret = ListEmpty(task->buffHead);
90 LoopMutexUnlock(&task->mutex);
91 return ret;
92 }
93
GetFirstBuffer(StreamTask * task)94 LE_Buffer *GetFirstBuffer(StreamTask *task)
95 {
96 LoopMutexLock(&task->mutex);
97 ListNode *node = task->buffHead.next;
98 LE_Buffer *buffer = NULL;
99 if (node != &task->buffHead) {
100 buffer = ListEntry(node, LE_Buffer, node);
101 }
102 LoopMutexUnlock(&task->mutex);
103 return buffer;
104 }
105
AddBuffer(StreamTask * task,LE_Buffer * buffer)106 void AddBuffer(StreamTask *task, LE_Buffer *buffer)
107 {
108 LoopMutexLock(&task->mutex);
109 OH_ListAddTail(&task->buffHead, &buffer->node);
110 LoopMutexUnlock(&task->mutex);
111 }
112
GetNextBuffer(StreamTask * task,const LE_Buffer * next)113 LE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next)
114 {
115 LoopMutexLock(&task->mutex);
116 LE_Buffer *buffer = NULL;
117 ListNode *node = NULL;
118 if (next == NULL) {
119 node = task->buffHead.next;
120 } else {
121 node = next->node.next;
122 }
123 if (node != &task->buffHead) {
124 buffer = ListEntry(node, LE_Buffer, node);
125 }
126 LoopMutexUnlock(&task->mutex);
127 return buffer;
128 }
129
FreeBuffer(const LoopHandle loop,StreamTask * task,LE_Buffer * buffer)130 void FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer)
131 {
132 LE_CHECK(buffer != NULL, return, "Invalid buffer");
133 if (task == NULL) {
134 free(buffer);
135 return;
136 }
137 if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) ||
138 CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) {
139 LoopMutexLock(&task->mutex);
140 OH_ListRemove(&buffer->node);
141 LoopMutexUnlock(&task->mutex);
142 }
143 free(buffer);
144 }
145
LE_CreateBuffer(const LoopHandle loop,uint32_t bufferSize)146 BufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize)
147 {
148 return (BufferHandle)CreateBuffer(bufferSize);
149 }
150
LE_FreeBuffer(const LoopHandle loop,const TaskHandle taskHandle,const BufferHandle handle)151 void LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle)
152 {
153 FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle);
154 }
155
LE_GetBufferInfo(const BufferHandle handle,uint32_t * dataSize,uint32_t * buffSize)156 uint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize)
157 {
158 LE_Buffer *buffer = (LE_Buffer *)handle;
159 LE_CHECK(buffer != NULL, return NULL, "Invalid buffer");
160 if (dataSize) {
161 *dataSize = (uint32_t)buffer->dataSize;
162 }
163 if (buffSize) {
164 *buffSize = (uint32_t)buffer->buffSize;
165 }
166 return buffer->data;
167 }
168
LE_Send(const LoopHandle loopHandle,const TaskHandle taskHandle,const BufferHandle buffHandle,uint32_t buffLen)169 LE_STATUS LE_Send(const LoopHandle loopHandle,
170 const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen)
171 {
172 EventLoop *loop = (EventLoop *)loopHandle;
173 if (((BaseTask *)taskHandle)->flags & TASK_FLAGS_INVALID) {
174 LE_FreeBuffer(loopHandle, taskHandle, buffHandle);
175 return LE_INVALID_TASK;
176 }
177 LE_Buffer *buffer = (LE_Buffer *)buffHandle;
178 buffer->dataSize = buffLen;
179 if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) {
180 AddBuffer((StreamTask *)taskHandle, buffer);
181 } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) {
182 AddBuffer((StreamTask *)taskHandle, buffer);
183 }
184 loop->modEvent(loop, (BaseTask *)taskHandle, Event_Write);
185 return LE_SUCCESS;
186 }
187
LE_CloseTask(const LoopHandle loopHandle,const TaskHandle taskHandle)188 void LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
189 {
190 LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
191 LE_LOGV("LE_CloseTask %d", GetSocketFd(taskHandle));
192 BaseTask *task = (BaseTask *)taskHandle;
193 if (task->innerClose != NULL) {
194 task->innerClose(loopHandle, taskHandle);
195 }
196 free(task);
197 }
198
LE_GetUserData(TaskHandle handle)199 void *LE_GetUserData(TaskHandle handle)
200 {
201 LE_CHECK(handle != NULL, return NULL, "Invalid handle");
202 BaseTask *stream = (BaseTask *)handle;
203 return (void *)(((char *)stream) + stream->userDataOffset);
204 }
205
LE_GetSendResult(const BufferHandle handle)206 int32_t LE_GetSendResult(const BufferHandle handle)
207 {
208 LE_CHECK(handle != NULL, return 0, "Invalid handle");
209 return ((LE_Buffer *)handle)->result;
210 }