• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "le_task.h"
17 
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <sys/socket.h>
21 
22 #include "le_loop.h"
23 #include "le_socket.h"
24 
CheckTaskFlags(const BaseTask * task,uint32_t flags)25 int CheckTaskFlags(const BaseTask *task, uint32_t flags)
26 {
27     if (task == NULL) {
28         return 0;
29     }
30     return ((task->flags & flags) == flags);
31 }
32 
GetSocketFd(const TaskHandle task)33 int GetSocketFd(const TaskHandle task)
34 {
35     BaseTask *stream = (BaseTask *)task;
36     return stream->taskId.fd;
37 }
38 
CreateTask(const LoopHandle loopHandle,int fd,const LE_BaseInfo * info,uint32_t size)39 BaseTask *CreateTask(const LoopHandle loopHandle, int fd, const LE_BaseInfo *info, uint32_t size)
40 {
41     if ((size >= LOOP_MAX_BUFFER) || ((size + info->userDataSize) >= LOOP_MAX_BUFFER)) {
42         return NULL;
43     }
44     BaseTask *task = (BaseTask *)malloc(size + info->userDataSize);
45     LE_CHECK(task != NULL, return NULL, "Failed to alloc for task");
46     HASHMAPInitNode(&task->hashNode);
47     // key id
48     task->flags = info->flags;
49     task->taskId.fd = fd;
50     LE_STATUS ret = AddTask((EventLoop *)loopHandle, task);
51     LE_CHECK(ret == LE_SUCCESS, free(task);
52         return NULL, "Failed to alloc for task");
53     task->userDataSize = info->userDataSize;
54     task->userDataOffset = size;
55     task->close = info->close;
56     return task;
57 }
58 
CloseTask(const LoopHandle loopHandle,BaseTask * task)59 void CloseTask(const LoopHandle loopHandle, BaseTask *task)
60 {
61     LE_LOGV("CloseTask %p", task);
62     LE_CHECK(loopHandle != NULL && task != NULL, return, "Invalid parameters");
63     if (CheckTaskFlags(task, TASK_STREAM | TASK_CONNECT) ||
64         CheckTaskFlags(task, TASK_EVENT | TASK_ASYNC_EVENT)) {
65         StreamTask *stream = (StreamTask *)task;
66         LE_Buffer *buffer = GetFirstBuffer(stream);
67         while (buffer) {
68             FreeBuffer(loopHandle, stream, (BufferHandle)buffer);
69             buffer = GetFirstBuffer(stream);
70         }
71     }
72     if (task->close != NULL) {
73         task->close((TaskHandle)task);
74     }
75     DelTask((EventLoop *)loopHandle, task);
76 }
77 
CreateBuffer(uint32_t bufferSize)78 LE_Buffer *CreateBuffer(uint32_t bufferSize)
79 {
80     if (bufferSize >= LOOP_MAX_BUFFER) {
81         return NULL;
82     }
83     LE_Buffer *buffer = (LE_Buffer *)malloc(sizeof(LE_Buffer) + bufferSize);
84     LE_CHECK(buffer != NULL, return NULL, "Failed to alloc memory for buffer");
85     ListInit(&buffer->node);
86     buffer->buffSize = bufferSize;
87     buffer->dataSize = 0;
88     return buffer;
89 }
90 
IsBufferEmpty(StreamTask * task)91 int IsBufferEmpty(StreamTask *task)
92 {
93     LoopMutexLock(&task->mutex);
94     int ret = ListEmpty(task->buffHead);
95     LoopMutexUnlock(&task->mutex);
96     return ret;
97 }
98 
GetFirstBuffer(StreamTask * task)99 LE_Buffer *GetFirstBuffer(StreamTask *task)
100 {
101     LoopMutexLock(&task->mutex);
102     ListNode *node = task->buffHead.next;
103     LE_Buffer *buffer = NULL;
104     if (node != &task->buffHead) {
105         buffer = ListEntry(node, LE_Buffer, node);
106     }
107     LoopMutexUnlock(&task->mutex);
108     return buffer;
109 }
110 
AddBuffer(StreamTask * task,LE_Buffer * buffer)111 void AddBuffer(StreamTask *task, LE_Buffer *buffer)
112 {
113     LoopMutexLock(&task->mutex);
114     ListAddTail(&task->buffHead, &buffer->node);
115     LoopMutexUnlock(&task->mutex);
116 }
117 
GetNextBuffer(StreamTask * task,const LE_Buffer * next)118 LE_Buffer *GetNextBuffer(StreamTask *task, const LE_Buffer *next)
119 {
120     LoopMutexLock(&task->mutex);
121     LE_Buffer *buffer = NULL;
122     ListNode *node = NULL;
123     if (next == NULL) {
124         node = task->buffHead.next;
125     } else {
126         node = next->node.next;
127     }
128     if (node != &task->buffHead) {
129         buffer = ListEntry(node, LE_Buffer, node);
130     }
131     LoopMutexUnlock(&task->mutex);
132     return buffer;
133 }
134 
FreeBuffer(const LoopHandle loop,StreamTask * task,LE_Buffer * buffer)135 void FreeBuffer(const LoopHandle loop, StreamTask *task, LE_Buffer *buffer)
136 {
137     LE_CHECK(buffer != NULL, return, "Invalid buffer");
138     if (task == NULL) {
139         free(buffer);
140         return;
141     }
142     if (CheckTaskFlags((BaseTask *)task, TASK_STREAM | TASK_CONNECT) ||
143         CheckTaskFlags((BaseTask *)task, TASK_EVENT | TASK_ASYNC_EVENT)) {
144         LoopMutexLock(&task->mutex);
145         ListRemove(&buffer->node);
146         LoopMutexUnlock(&task->mutex);
147     }
148     free(buffer);
149 }
150 
LE_CreateBuffer(const LoopHandle loop,uint32_t bufferSize)151 BufferHandle LE_CreateBuffer(const LoopHandle loop, uint32_t bufferSize)
152 {
153     return (BufferHandle)CreateBuffer(bufferSize);
154 }
155 
LE_FreeBuffer(const LoopHandle loop,const TaskHandle taskHandle,const BufferHandle handle)156 void LE_FreeBuffer(const LoopHandle loop, const TaskHandle taskHandle, const BufferHandle handle)
157 {
158     FreeBuffer(loop, (StreamTask *)taskHandle, (LE_Buffer *)handle);
159 }
160 
LE_GetBufferInfo(const BufferHandle handle,uint32_t * dataSize,uint32_t * buffSize)161 uint8_t *LE_GetBufferInfo(const BufferHandle handle, uint32_t *dataSize, uint32_t *buffSize)
162 {
163     LE_Buffer *buffer = (LE_Buffer *)handle;
164     LE_CHECK(buffer != NULL, return NULL, "Invalid buffer");
165     if (dataSize) {
166         *dataSize = (uint32_t)buffer->dataSize;
167     }
168     if (buffSize) {
169         *buffSize = (uint32_t)buffer->buffSize;
170     }
171     return buffer->data;
172 }
173 
LE_Send(const LoopHandle loopHandle,const TaskHandle taskHandle,const BufferHandle buffHandle,uint32_t buffLen)174 LE_STATUS LE_Send(const LoopHandle loopHandle,
175     const TaskHandle taskHandle, const BufferHandle buffHandle, uint32_t buffLen)
176 {
177     EventLoop *loop = (EventLoop *)loopHandle;
178     LE_Buffer *buffer = (LE_Buffer *)buffHandle;
179     buffer->dataSize = buffLen;
180     if (CheckTaskFlags((BaseTask *)taskHandle, TASK_STREAM | TASK_CONNECT)) {
181         AddBuffer((StreamTask *)taskHandle, buffer);
182     } else if (CheckTaskFlags((BaseTask *)taskHandle, TASK_EVENT | TASK_ASYNC_EVENT)) {
183         AddBuffer((StreamTask *)taskHandle, buffer);
184     }
185     loop->modEvent(loop, (BaseTask *)taskHandle, Event_Write);
186     return LE_SUCCESS;
187 }
188 
LE_CloseTask(const LoopHandle loopHandle,const TaskHandle taskHandle)189 void LE_CloseTask(const LoopHandle loopHandle, const TaskHandle taskHandle)
190 {
191     LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters");
192     LE_LOGV("LE_CloseTask %d %p", GetSocketFd(taskHandle), taskHandle);
193     BaseTask *task = (BaseTask *)taskHandle;
194     if (task->innerClose != NULL) {
195         task->innerClose(loopHandle, taskHandle);
196     }
197     free(task);
198 }
199 
LE_GetUserData(TaskHandle handle)200 void *LE_GetUserData(TaskHandle handle)
201 {
202     LE_CHECK(handle != NULL, return NULL, "Invalid handle");
203     BaseTask *stream = (BaseTask *)handle;
204     return (void *)(((char *)stream) + stream->userDataOffset);
205 }