1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Block-based thread-safe queue.
22 *//*--------------------------------------------------------------------*/
23
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28 #include "deMemory.h"
29
30 #include <vector>
31
32 namespace de
33 {
34
35 using std::vector;
36
37 namespace BlockBufferBasicTest
38 {
39
40 struct Message
41 {
42 deUint32 data;
43
Messagede::BlockBufferBasicTest::Message44 Message (deUint16 threadId, deUint16 payload)
45 : data((threadId << 16) | payload)
46 {
47 }
48
Messagede::BlockBufferBasicTest::Message49 Message (void)
50 : data(0)
51 {
52 }
53
getThreadIdde::BlockBufferBasicTest::Message54 deUint16 getThreadId (void) const { return (deUint16)(data >> 16); }
getPayloadde::BlockBufferBasicTest::Message55 deUint16 getPayload (void) const { return (deUint16)(data & 0xffff); }
56 };
57
58 typedef BlockBuffer<Message> MessageBuffer;
59
60 class Consumer : public Thread
61 {
62 public:
Consumer(MessageBuffer & buffer,int numProducers)63 Consumer (MessageBuffer& buffer, int numProducers)
64 : m_buffer (buffer)
65 {
66 m_lastPayload.resize(numProducers, 0);
67 m_payloadSum.resize(numProducers, 0);
68 }
69
run(void)70 void run (void)
71 {
72 Random rnd ((deUint32)m_lastPayload.size());
73 Message tmpBuf [64];
74 bool consume = true;
75
76 while (consume)
77 {
78 int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
79 int numRead = m_buffer.tryRead(numToRead, &tmpBuf[0]);
80
81 for (int ndx = 0; ndx < numRead; ndx++)
82 {
83 const Message& msg = tmpBuf[ndx];
84
85 deUint16 threadId = msg.getThreadId();
86
87 if (threadId == 0xffff)
88 {
89 /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
90 if (ndx+1 < numRead)
91 {
92 m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
93 m_buffer.flush();
94 }
95
96 consume = false;
97 break;
98 }
99 else
100 {
101 /* Verify message. */
102 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
103 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
104
105 m_lastPayload[threadId] = msg.getPayload();
106 m_payloadSum[threadId] += (deUint32)msg.getPayload();
107 }
108 }
109 }
110 }
111
getPayloadSum(deUint16 threadId) const112 deUint32 getPayloadSum (deUint16 threadId) const
113 {
114 return m_payloadSum[threadId];
115 }
116
117 private:
118 MessageBuffer& m_buffer;
119 vector<deUint16> m_lastPayload;
120 vector<deUint32> m_payloadSum;
121 };
122
123 class Producer : public Thread
124 {
125 public:
Producer(MessageBuffer & buffer,deUint16 threadId,int numMessages)126 Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
127 : m_buffer (buffer)
128 , m_threadId (threadId)
129 , m_numMessages (numMessages)
130 {
131 }
132
run(void)133 void run (void)
134 {
135 // Yield to give main thread chance to start other producers.
136 deSleep(1);
137
138 Random rnd (m_threadId);
139 int msgNdx = 0;
140 Message tmpBuf[64];
141
142 while (msgNdx < m_numMessages)
143 {
144 int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
145 for (int ndx = 0; ndx < writeSize; ndx++)
146 tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
147
148 m_buffer.write(writeSize, &tmpBuf[0]);
149 if (rnd.getBool())
150 m_buffer.flush();
151 }
152 }
153
154 private:
155 MessageBuffer& m_buffer;
156 deUint16 m_threadId;
157 int m_numMessages;
158 };
159
runTest(void)160 void runTest (void)
161 {
162 const int numIterations = 8;
163 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
164 {
165 Random rnd (iterNdx);
166 int numBlocks = rnd.getInt(2, 128);
167 int blockSize = rnd.getInt(1, 16);
168 int numProducers = rnd.getInt(1, 16);
169 int numConsumers = rnd.getInt(1, 16);
170 int dataSize = rnd.getInt(50, 200);
171 MessageBuffer buffer (blockSize, numBlocks);
172 vector<Producer*> producers;
173 vector<Consumer*> consumers;
174
175 for (int i = 0; i < numProducers; i++)
176 producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
177
178 for (int i = 0; i < numConsumers; i++)
179 consumers.push_back(new Consumer(buffer, numProducers));
180
181 // Start consumers.
182 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
183 (*i)->start();
184
185 // Start producers.
186 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
187 (*i)->start();
188
189 // Wait for producers.
190 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
191 (*i)->join();
192
193 // Write end messages for consumers.
194 const Message endMsg(0xffff, 0);
195 for (int i = 0; i < numConsumers; i++)
196 buffer.write(1, &endMsg);
197 buffer.flush();
198
199 // Wait for consumers.
200 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
201 (*i)->join();
202
203 // Verify payload sums.
204 deUint32 refSum = 0;
205 for (int i = 0; i < dataSize; i++)
206 refSum += (deUint32)(deUint16)i;
207
208 for (int i = 0; i < numProducers; i++)
209 {
210 deUint32 cmpSum = 0;
211 for (int j = 0; j < numConsumers; j++)
212 cmpSum += consumers[j]->getPayloadSum((deUint16)i);
213 DE_TEST_ASSERT(refSum == cmpSum);
214 }
215
216 // Free resources.
217 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
218 delete *i;
219 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
220 delete *i;
221 }
222 }
223
224 } // BlockBufferBasicTest
225
226 namespace BlockBufferCancelTest
227 {
228
229 class Producer : public Thread
230 {
231 public:
Producer(BlockBuffer<deUint8> * buffer,deUint32 seed)232 Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
233 : m_buffer (buffer)
234 , m_seed (seed)
235 {
236 }
237
run(void)238 void run (void)
239 {
240 deUint8 tmp[1024];
241 Random rnd(m_seed);
242
243 deMemset(tmp, 0, DE_LENGTH_OF_ARRAY(tmp));
244
245 for (;;)
246 {
247 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
248
249 try
250 {
251 m_buffer->write(blockSize, &tmp[0]);
252
253 if (rnd.getBool())
254 m_buffer->flush();
255 }
256 catch (const BlockBuffer<deUint8>::CanceledException&)
257 {
258 break;
259 }
260 }
261 }
262
263 private:
264 BlockBuffer<deUint8>* m_buffer;
265 deUint32 m_seed;
266 };
267
268 class Consumer : public Thread
269 {
270 public:
Consumer(BlockBuffer<deUint8> * buffer,deUint32 seed)271 Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
272 : m_buffer (buffer)
273 , m_seed (seed)
274 {
275 }
276
run(void)277 void run (void)
278 {
279 deUint8 tmp[1024];
280 Random rnd(m_seed);
281
282 for (;;)
283 {
284 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
285
286 try
287 {
288 m_buffer->read(blockSize, &tmp[0]);
289 }
290 catch (const BlockBuffer<deUint8>::CanceledException&)
291 {
292 break;
293 }
294 }
295 }
296
297 private:
298 BlockBuffer<deUint8>* m_buffer;
299 deUint32 m_seed;
300 };
301
runTest(void)302 void runTest (void)
303 {
304 BlockBuffer<deUint8> buffer (64, 16);
305 const int numIterations = 8;
306
307 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
308 {
309 Random rnd (deInt32Hash(iterNdx));
310 int numThreads = rnd.getInt(1, 16);
311 int sleepMs = rnd.getInt(1, 200);
312 vector<Thread*> threads;
313
314 for (int i = 0; i < numThreads; i++)
315 {
316 if (rnd.getBool())
317 threads.push_back(new Consumer(&buffer, rnd.getUint32()));
318 else
319 threads.push_back(new Producer(&buffer, rnd.getUint32()));
320 }
321
322 // Start threads.
323 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
324 (*i)->start();
325
326 // Sleep for a while.
327 deSleep(sleepMs);
328
329 // Cancel buffer.
330 buffer.cancel();
331
332 // Wait for threads to finish.
333 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
334 (*i)->join();
335
336 // Reset buffer.
337 buffer.clear();
338
339 // Delete threads
340 for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
341 delete *thread;
342 }
343 }
344
345 } // BlockBufferCancelTest
346
BlockBuffer_selfTest(void)347 void BlockBuffer_selfTest (void)
348 {
349 BlockBufferBasicTest::runTest();
350 BlockBufferCancelTest::runTest();
351 }
352
353 } // de
354