• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*-------------------------------------------------------------------------
2  * drawElements C++ Base Library
3  * -----------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Block-based thread-safe queue.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28 #include "deMemory.h"
29 
30 #include <vector>
31 
32 namespace de
33 {
34 
35 using std::vector;
36 
37 namespace BlockBufferBasicTest
38 {
39 
40 struct Message
41 {
42 	deUint32 data;
43 
Messagede::BlockBufferBasicTest::Message44 	Message (deUint16 threadId, deUint16 payload)
45 		: data((threadId << 16) | payload)
46 	{
47 	}
48 
Messagede::BlockBufferBasicTest::Message49 	Message (void)
50 		: data(0)
51 	{
52 	}
53 
getThreadIdde::BlockBufferBasicTest::Message54 	deUint16 getThreadId	(void) const { return (deUint16)(data >> 16);		}
getPayloadde::BlockBufferBasicTest::Message55 	deUint16 getPayload		(void) const { return (deUint16)(data & 0xffff);	}
56 };
57 
58 typedef BlockBuffer<Message> MessageBuffer;
59 
60 class Consumer : public Thread
61 {
62 public:
Consumer(MessageBuffer & buffer,int numProducers)63 	Consumer (MessageBuffer& buffer, int numProducers)
64 		: m_buffer		(buffer)
65 	{
66 		m_lastPayload.resize(numProducers, 0);
67 		m_payloadSum.resize(numProducers, 0);
68 	}
69 
run(void)70 	void run (void)
71 	{
72 		Random	rnd		((deUint32)m_lastPayload.size());
73 		Message	tmpBuf	[64];
74 		bool	consume	= true;
75 
76 		while (consume)
77 		{
78 			int numToRead	= rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
79 			int numRead		= m_buffer.tryRead(numToRead, &tmpBuf[0]);
80 
81 			for (int ndx = 0; ndx < numRead; ndx++)
82 			{
83 				const Message& msg = tmpBuf[ndx];
84 
85 				deUint16 threadId = msg.getThreadId();
86 
87 				if (threadId == 0xffff)
88 				{
89 					/* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
90 					if (ndx+1 < numRead)
91 					{
92 						m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
93 						m_buffer.flush();
94 					}
95 
96 					consume = false;
97 					break;
98 				}
99 				else
100 				{
101 					/* Verify message. */
102 					DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
103 					DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
104 
105 					m_lastPayload[threadId]	 = msg.getPayload();
106 					m_payloadSum[threadId]	+= (deUint32)msg.getPayload();
107 				}
108 			}
109 		}
110 	}
111 
getPayloadSum(deUint16 threadId) const112 	deUint32 getPayloadSum (deUint16 threadId) const
113 	{
114 		return m_payloadSum[threadId];
115 	}
116 
117 private:
118 	MessageBuffer&			m_buffer;
119 	vector<deUint16>		m_lastPayload;
120 	vector<deUint32>		m_payloadSum;
121 };
122 
123 class Producer : public Thread
124 {
125 public:
Producer(MessageBuffer & buffer,deUint16 threadId,int numMessages)126 	Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
127 		: m_buffer		(buffer)
128 		, m_threadId	(threadId)
129 		, m_numMessages	(numMessages)
130 	{
131 	}
132 
run(void)133 	void run (void)
134 	{
135 		// Yield to give main thread chance to start other producers.
136 		deSleep(1);
137 
138 		Random	rnd		(m_threadId);
139 		int		msgNdx	= 0;
140 		Message	tmpBuf[64];
141 
142 		while (msgNdx < m_numMessages)
143 		{
144 			int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
145 			for (int ndx = 0; ndx < writeSize; ndx++)
146 				tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
147 
148 			m_buffer.write(writeSize, &tmpBuf[0]);
149 			if (rnd.getBool())
150 				m_buffer.flush();
151 		}
152 	}
153 
154 private:
155 	MessageBuffer&	m_buffer;
156 	deUint16		m_threadId;
157 	int				m_numMessages;
158 };
159 
runTest(void)160 void runTest (void)
161 {
162 	const int numIterations = 8;
163 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
164 	{
165 		Random							rnd				(iterNdx);
166 		int								numBlocks		= rnd.getInt(2, 128);
167 		int								blockSize		= rnd.getInt(1, 16);
168 		int								numProducers	= rnd.getInt(1, 16);
169 		int								numConsumers	= rnd.getInt(1, 16);
170 		int								dataSize		= rnd.getInt(50, 200);
171 		MessageBuffer					buffer			(blockSize, numBlocks);
172 		vector<Producer*>				producers;
173 		vector<Consumer*>				consumers;
174 
175 		for (int i = 0; i < numProducers; i++)
176 			producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
177 
178 		for (int i = 0; i < numConsumers; i++)
179 			consumers.push_back(new Consumer(buffer, numProducers));
180 
181 		// Start consumers.
182 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
183 			(*i)->start();
184 
185 		// Start producers.
186 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
187 			(*i)->start();
188 
189 		// Wait for producers.
190 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
191 			(*i)->join();
192 
193 		// Write end messages for consumers.
194 		const Message endMsg(0xffff, 0);
195 		for (int i = 0; i < numConsumers; i++)
196 			buffer.write(1, &endMsg);
197 		buffer.flush();
198 
199 		// Wait for consumers.
200 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
201 			(*i)->join();
202 
203 		// Verify payload sums.
204 		deUint32 refSum = 0;
205 		for (int i = 0; i < dataSize; i++)
206 			refSum += (deUint32)(deUint16)i;
207 
208 		for (int i = 0; i < numProducers; i++)
209 		{
210 			deUint32 cmpSum = 0;
211 			for (int j = 0; j < numConsumers; j++)
212 				cmpSum += consumers[j]->getPayloadSum((deUint16)i);
213 			DE_TEST_ASSERT(refSum == cmpSum);
214 		}
215 
216 		// Free resources.
217 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
218 			delete *i;
219 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
220 			delete *i;
221 	}
222 }
223 
224 } // BlockBufferBasicTest
225 
226 namespace BlockBufferCancelTest
227 {
228 
229 class Producer : public Thread
230 {
231 public:
Producer(BlockBuffer<deUint8> * buffer,deUint32 seed)232 	Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
233 		: m_buffer	(buffer)
234 		, m_seed	(seed)
235 	{
236 	}
237 
run(void)238 	void run (void)
239 	{
240 		deUint8	tmp[1024];
241 		Random	rnd(m_seed);
242 
243 		deMemset(tmp, 0, DE_LENGTH_OF_ARRAY(tmp));
244 
245 		for (;;)
246 		{
247 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
248 
249 			try
250 			{
251 				m_buffer->write(blockSize, &tmp[0]);
252 
253 				if (rnd.getBool())
254 					m_buffer->flush();
255 			}
256 			catch (const BlockBuffer<deUint8>::CanceledException&)
257 			{
258 				break;
259 			}
260 		}
261 	}
262 
263 private:
264 	BlockBuffer<deUint8>*	m_buffer;
265 	deUint32				m_seed;
266 };
267 
268 class Consumer : public Thread
269 {
270 public:
Consumer(BlockBuffer<deUint8> * buffer,deUint32 seed)271 	Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
272 		: m_buffer	(buffer)
273 		, m_seed	(seed)
274 	{
275 	}
276 
run(void)277 	void run (void)
278 	{
279 		deUint8	tmp[1024];
280 		Random	rnd(m_seed);
281 
282 		for (;;)
283 		{
284 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
285 
286 			try
287 			{
288 				m_buffer->read(blockSize, &tmp[0]);
289 			}
290 			catch (const BlockBuffer<deUint8>::CanceledException&)
291 			{
292 				break;
293 			}
294 		}
295 	}
296 
297 private:
298 	BlockBuffer<deUint8>*	m_buffer;
299 	deUint32				m_seed;
300 };
301 
runTest(void)302 void runTest (void)
303 {
304 	BlockBuffer<deUint8>	buffer			(64, 16);
305 	const int				numIterations	= 8;
306 
307 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
308 	{
309 		Random				rnd				(deInt32Hash(iterNdx));
310 		int					numThreads		= rnd.getInt(1, 16);
311 		int					sleepMs			= rnd.getInt(1, 200);
312 		vector<Thread*>		threads;
313 
314 		for (int i = 0; i < numThreads; i++)
315 		{
316 			if (rnd.getBool())
317 				threads.push_back(new Consumer(&buffer, rnd.getUint32()));
318 			else
319 				threads.push_back(new Producer(&buffer, rnd.getUint32()));
320 		}
321 
322 		// Start threads.
323 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
324 			(*i)->start();
325 
326 		// Sleep for a while.
327 		deSleep(sleepMs);
328 
329 		// Cancel buffer.
330 		buffer.cancel();
331 
332 		// Wait for threads to finish.
333 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
334 			(*i)->join();
335 
336 		// Reset buffer.
337 		buffer.clear();
338 
339 		// Delete threads
340 		for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
341 			delete *thread;
342 	}
343 }
344 
345 } // BlockBufferCancelTest
346 
BlockBuffer_selfTest(void)347 void BlockBuffer_selfTest (void)
348 {
349 	BlockBufferBasicTest::runTest();
350 	BlockBufferCancelTest::runTest();
351 }
352 
353 } // de
354