1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Thread safe ringbuffer
22 *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
24
25 #include "deInt32.h"
26 #include "deMemory.h"
27 #include "deSemaphore.h"
28
29 #include <stdlib.h>
30 #include <stdio.h>
31
32 struct deRingbuffer_s
33 {
34 deInt32 blockSize;
35 deInt32 blockCount;
36 deInt32* blockUsage;
37 deUint8* buffer;
38
39 deSemaphore emptyCount;
40 deSemaphore fullCount;
41
42 deInt32 outBlock;
43 deInt32 outPos;
44
45 deInt32 inBlock;
46 deInt32 inPos;
47
48 deBool stopNotified;
49 deBool consumerStopping;
50 };
51
deRingbuffer_create(deInt32 blockSize,deInt32 blockCount)52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
53 {
54 deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
55
56 DE_ASSERT(ringbuffer);
57 DE_ASSERT(blockCount > 0);
58 DE_ASSERT(blockSize > 0);
59
60 ringbuffer->blockSize = blockSize;
61 ringbuffer->blockCount = blockCount;
62 ringbuffer->buffer = (deUint8*)deMalloc(sizeof(deUint8) * blockSize * blockCount);
63 ringbuffer->blockUsage = (deInt32*)deMalloc(sizeof(deUint32) * blockCount);
64 ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65 ringbuffer->fullCount = deSemaphore_create(0, DE_NULL);
66
67 if (!ringbuffer->buffer ||
68 !ringbuffer->blockUsage ||
69 !ringbuffer->emptyCount ||
70 !ringbuffer->fullCount)
71 {
72 if (ringbuffer->emptyCount)
73 deSemaphore_destroy(ringbuffer->emptyCount);
74 if (ringbuffer->fullCount)
75 deSemaphore_destroy(ringbuffer->fullCount);
76 deFree(ringbuffer->buffer);
77 deFree(ringbuffer->blockUsage);
78 deFree(ringbuffer);
79 return DE_NULL;
80 }
81
82 memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * blockCount);
83
84 ringbuffer->outBlock = 0;
85 ringbuffer->outPos = 0;
86
87 ringbuffer->inBlock = 0;
88 ringbuffer->inPos = 0;
89
90 ringbuffer->stopNotified = DE_FALSE;
91 ringbuffer->consumerStopping = DE_FALSE;
92
93 return ringbuffer;
94 }
95
deRingbuffer_stop(deRingbuffer * ringbuffer)96 void deRingbuffer_stop (deRingbuffer* ringbuffer)
97 {
98 /* Set notify to true and increment fullCount to let consumer continue */
99 ringbuffer->stopNotified = DE_TRUE;
100 deSemaphore_increment(ringbuffer->fullCount);
101 }
102
deRingbuffer_destroy(deRingbuffer * ringbuffer)103 void deRingbuffer_destroy (deRingbuffer* ringbuffer)
104 {
105 deSemaphore_destroy(ringbuffer->emptyCount);
106 deSemaphore_destroy(ringbuffer->fullCount);
107
108 free(ringbuffer->buffer);
109 free(ringbuffer->blockUsage);
110 free(ringbuffer);
111 }
112
producerStream_write(deStreamData * stream,const void * buf,deInt32 bufSize,deInt32 * written)113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
114 {
115 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
116
117 DE_ASSERT(stream);
118 /* If ringbuffer is stopping return error on write */
119 if (ringbuffer->stopNotified)
120 {
121 DE_ASSERT(DE_FALSE);
122 return DE_STREAMRESULT_ERROR;
123 }
124
125 *written = 0;
126
127 /* Write while more data available */
128 while (*written < bufSize)
129 {
130 deInt32 writeSize = 0;
131 deUint8* src = DE_NULL;
132 deUint8* dst = DE_NULL;
133
134 /* If between blocks accuire new block */
135 if (ringbuffer->inPos == 0)
136 {
137 deSemaphore_decrement(ringbuffer->emptyCount);
138 }
139
140 writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
141 dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
142 src = (deUint8*)buf + *written;
143
144 deMemcpy(dst, src, writeSize);
145
146 ringbuffer->inPos += writeSize;
147 *written += writeSize;
148 ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
149
150 /* Block is full move to next one (or "between" this and next block) */
151 if (ringbuffer->inPos == ringbuffer->blockSize)
152 {
153 ringbuffer->inPos = 0;
154 ringbuffer->inBlock++;
155
156 if (ringbuffer->inBlock == ringbuffer->blockCount)
157 ringbuffer->inBlock = 0;
158 deSemaphore_increment(ringbuffer->fullCount);
159 }
160 }
161
162 return DE_STREAMRESULT_SUCCESS;
163 }
164
producerStream_flush(deStreamData * stream)165 static deStreamResult producerStream_flush (deStreamData* stream)
166 {
167 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
168
169 DE_ASSERT(stream);
170
171 /* No blocks reserved by producer */
172 if (ringbuffer->inPos == 0)
173 return DE_STREAMRESULT_SUCCESS;
174
175 ringbuffer->inPos = 0;
176 ringbuffer->inBlock++;
177
178 if (ringbuffer->inBlock == ringbuffer->blockCount)
179 ringbuffer->inBlock = 0;
180
181 deSemaphore_increment(ringbuffer->fullCount);
182 return DE_STREAMRESULT_SUCCESS;
183 }
184
producerStream_deinit(deStreamData * stream)185 static deStreamResult producerStream_deinit (deStreamData* stream)
186 {
187 DE_ASSERT(stream);
188
189 producerStream_flush(stream);
190
191 /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
192 return DE_STREAMRESULT_SUCCESS;
193 }
194
consumerStream_read(deStreamData * stream,void * buf,deInt32 bufSize,deInt32 * read)195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
196 {
197 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
198
199 DE_ASSERT(stream);
200
201 *read = 0;
202 DE_ASSERT(ringbuffer);
203
204 while (*read < bufSize)
205 {
206 deInt32 writeSize = 0;
207 deUint8* src = DE_NULL;
208 deUint8* dst = DE_NULL;
209
210 /* If between blocks accuire new block */
211 if (ringbuffer->outPos == 0)
212 {
213 /* If consumer is set to stop after everything is consumed,
214 * do not block if there is no more input left
215 */
216 if (ringbuffer->consumerStopping)
217 {
218 /* Try to accuire new block, if can't there is no more input */
219 if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
220 {
221 return DE_STREAMRESULT_END_OF_STREAM;
222 }
223 }
224 else
225 {
226 /* If not stopping block until there is more input */
227 deSemaphore_decrement(ringbuffer->fullCount);
228 /* Ringbuffer was set to stop */
229 if (ringbuffer->stopNotified)
230 {
231 ringbuffer->consumerStopping = DE_TRUE;
232 }
233 }
234
235 }
236
237 writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
238 src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
239 dst = (deUint8*)buf + *read;
240
241 deMemcpy(dst, src, writeSize);
242
243 ringbuffer->outPos += writeSize;
244 *read += writeSize;
245
246 /* Block is consumed move to next one (or "between" this and next block) */
247 if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
248 {
249 ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
250 ringbuffer->outPos = 0;
251 ringbuffer->outBlock++;
252
253 if (ringbuffer->outBlock == ringbuffer->blockCount)
254 ringbuffer->outBlock = 0;
255
256 deSemaphore_increment(ringbuffer->emptyCount);
257 }
258 }
259
260 return DE_STREAMRESULT_SUCCESS;
261 }
262
263
consumerStream_deinit(deStreamData * stream)264 static deStreamResult consumerStream_deinit (deStreamData* stream)
265 {
266 DE_ASSERT(stream);
267 DE_UNREF(stream);
268
269 return DE_STREAMRESULT_SUCCESS;
270 }
271
272 /* There are no sensible errors so status is always good */
dummy_getStatus(deStreamData * stream)273 deStreamStatus dummy_getStatus (deStreamData* stream)
274 {
275 DE_UNREF(stream);
276
277 return DE_STREAMSTATUS_GOOD;
278 }
279
280 /* There are no sensible errors in ringbuffer */
dummy_getError(deStreamData * stream)281 static const char* dummy_getError (deStreamData* stream)
282 {
283 DE_ASSERT(stream);
284 DE_UNREF(stream);
285 return DE_NULL;
286 }
287
288 static const deIOStreamVFTable producerStreamVFTable = {
289 DE_NULL,
290 producerStream_write,
291 dummy_getError,
292 producerStream_flush,
293 producerStream_deinit,
294 dummy_getStatus
295 };
296
297 static const deIOStreamVFTable consumerStreamVFTable = {
298 consumerStream_read,
299 DE_NULL,
300 dummy_getError,
301 DE_NULL,
302 consumerStream_deinit,
303 dummy_getStatus
304 };
305
deProducerStream_init(deOutStream * stream,deRingbuffer * buffer)306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
307 {
308 stream->ioStream.streamData = (deStreamData*)buffer;
309 stream->ioStream.vfTable = &producerStreamVFTable;
310 }
311
deConsumerStream_init(deInStream * stream,deRingbuffer * buffer)312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
313 {
314 stream->ioStream.streamData = (deStreamData*)buffer;
315 stream->ioStream.vfTable = &consumerStreamVFTable;
316 }
317