1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Buffered and threaded input and output streams
22 *//*--------------------------------------------------------------------*/
23
24 #include "deThreadStream.h"
25 #include "deStreamCpyThread.h"
26 #include "deRingbuffer.h"
27 #include "stdlib.h"
28
29 typedef struct deThreadInStream_s
30 {
31 deRingbuffer* ringbuffer;
32 deInStream* input;
33 deInStream consumerStream;
34 deOutStream producerStream;
35 deThread thread;
36 int bufferSize;
37 } deThreadInStream;
38
39 typedef struct deThreadOutStream_s
40 {
41 deRingbuffer* ringbuffer;
42 deInStream consumerStream;
43 deOutStream producerStream;
44 deStreamCpyThread* thread;
45 } deThreadOutStream;
46
inStreamCopy(void * arg)47 static void inStreamCopy (void* arg)
48 {
49 deThreadInStream* threadStream = (deThreadInStream*)arg;
50
51 deUint8* buffer = malloc(sizeof(deUint8) * threadStream->bufferSize);
52
53 for(;;)
54 {
55 deInt32 read = 0;
56 deInt32 written = 0;
57 deStreamResult readResult = DE_STREAMRESULT_ERROR;
58
59 readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60 DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61 while (written < read)
62 {
63 deInt32 wrote = 0;
64
65 /* \todo [mika] Handle errors */
66 deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
67
68 written += wrote;
69 }
70
71 if (readResult == DE_STREAMRESULT_END_OF_STREAM)
72 {
73 break;
74 }
75 }
76
77 deOutStream_flush(&(threadStream->producerStream));
78 deRingbuffer_stop(threadStream->ringbuffer);
79 free(buffer);
80
81 }
82
threadInStream_read(deStreamData * stream,void * buf,deInt32 bufSize,deInt32 * numRead)83 static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
84 {
85 deThreadInStream* threadStream = (deThreadInStream*)stream;
86 return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
87 }
88
threadInStream_getError(deStreamData * stream)89 static const char* threadInStream_getError (deStreamData* stream)
90 {
91 deThreadInStream* threadStream = (deThreadInStream*)stream;
92
93 /* \todo [mika] Add handling for errors on thread stream */
94 return deInStream_getError(&(threadStream->consumerStream));
95 }
96
threadInStream_getStatus(deStreamData * stream)97 static deStreamStatus threadInStream_getStatus (deStreamData* stream)
98 {
99 deThreadInStream* threadStream = (deThreadInStream*)stream;
100
101 /* \todo [mika] Add handling for status on thread stream */
102 return deInStream_getStatus(&(threadStream->consumerStream));
103 }
104
105 /* \note [mika] Used by both in and out stream */
threadStream_deinit(deStreamData * stream)106 static deStreamResult threadStream_deinit (deStreamData* stream)
107 {
108 deThreadInStream* threadStream = (deThreadInStream*)stream;
109
110 deRingbuffer_stop(threadStream->ringbuffer);
111
112 deThread_join(threadStream->thread);
113 deThread_destroy(threadStream->thread);
114
115 deOutStream_deinit(&(threadStream->producerStream));
116 deInStream_deinit(&(threadStream->consumerStream));
117
118 deRingbuffer_destroy(threadStream->ringbuffer);
119
120 return DE_STREAMRESULT_SUCCESS;
121 }
122
123 static const deIOStreamVFTable threadInStreamVFTable = {
124 threadInStream_read,
125 DE_NULL,
126 threadInStream_getError,
127 DE_NULL,
128 threadStream_deinit,
129 threadInStream_getStatus
130 };
131
deThreadInStream_init(deInStream * stream,deInStream * input,int ringbufferBlockSize,int ringbufferBlockCount)132 void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
133 {
134 deThreadInStream* threadStream = DE_NULL;
135
136 threadStream = malloc(sizeof(deThreadInStream));
137 DE_ASSERT(threadStream);
138
139 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
140 DE_ASSERT(threadStream->ringbuffer);
141
142 threadStream->bufferSize = ringbufferBlockSize;
143 threadStream->input = input;
144 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
145 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
146
147 threadStream->thread = deThread_create(inStreamCopy, threadStream, DE_NULL);
148 stream->ioStream.vfTable = &threadInStreamVFTable;
149 stream->ioStream.streamData = threadStream;
150 }
151
threadOutStream_write(deStreamData * stream,const void * buf,deInt32 bufSize,deInt32 * numWritten)152 static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
153 {
154 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
155 return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
156 }
157
threadOutStream_getError(deStreamData * stream)158 static const char* threadOutStream_getError (deStreamData* stream)
159 {
160 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
161
162 /* \todo [mika] Add handling for errors on thread stream */
163 return deOutStream_getError(&(threadStream->producerStream));
164 }
165
threadOutStream_getStatus(deStreamData * stream)166 static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
167 {
168 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
169
170 /* \todo [mika] Add handling for errors on thread stream */
171 return deOutStream_getStatus(&(threadStream->producerStream));
172 }
173
threadOutStream_flush(deStreamData * stream)174 static deStreamResult threadOutStream_flush (deStreamData* stream)
175 {
176 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
177
178 return deOutStream_flush(&(threadStream->producerStream));
179 }
180
181 static const deIOStreamVFTable threadOutStreamVFTable = {
182 DE_NULL,
183 threadOutStream_write,
184 threadOutStream_getError,
185 threadOutStream_flush,
186 threadStream_deinit,
187 threadOutStream_getStatus
188 };
189
deThreadOutStream_init(deOutStream * stream,deOutStream * output,int ringbufferBlockSize,int ringbufferBlockCount)190 void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
191 {
192 deThreadOutStream* threadStream = DE_NULL;
193
194 threadStream = malloc(sizeof(deThreadOutStream));
195 DE_ASSERT(threadStream);
196
197 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
198 DE_ASSERT(threadStream->ringbuffer);
199
200 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
201 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
202
203 threadStream->thread = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
204 stream->ioStream.vfTable = &threadOutStreamVFTable;
205 stream->ioStream.streamData = threadStream;
206 }
207
208