1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Thread-safe ring buffer template.
22 *//*--------------------------------------------------------------------*/
23
24 #include "deThreadSafeRingBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27
28 #include <vector>
29
30 using std::vector;
31
32 namespace de
33 {
34
35 namespace
36 {
37
38 struct Message
39 {
40 deUint32 data;
41
Messagede::__anon8324d81e0111::Message42 Message (deUint16 threadId, deUint16 payload)
43 : data((threadId << 16) | payload)
44 {
45 }
46
Messagede::__anon8324d81e0111::Message47 Message (void)
48 : data(0)
49 {
50 }
51
getThreadIdde::__anon8324d81e0111::Message52 deUint16 getThreadId (void) const { return (deUint16)(data >> 16); }
getPayloadde::__anon8324d81e0111::Message53 deUint16 getPayload (void) const { return (deUint16)(data & 0xffff); }
54 };
55
56 class Consumer : public Thread
57 {
58 public:
Consumer(ThreadSafeRingBuffer<Message> & buffer,int numProducers)59 Consumer (ThreadSafeRingBuffer<Message>& buffer, int numProducers)
60 : m_buffer (buffer)
61 {
62 m_lastPayload.resize(numProducers, 0);
63 m_payloadSum.resize(numProducers, 0);
64 }
65
run(void)66 void run (void)
67 {
68 for (;;)
69 {
70 Message msg = m_buffer.popBack();
71
72 deUint16 threadId = msg.getThreadId();
73
74 if (threadId == 0xffff)
75 break;
76
77 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
78 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
79
80 m_lastPayload[threadId] = msg.getPayload();
81 m_payloadSum[threadId] += (deUint32)msg.getPayload();
82 }
83 }
84
getPayloadSum(deUint16 threadId) const85 deUint32 getPayloadSum (deUint16 threadId) const
86 {
87 return m_payloadSum[threadId];
88 }
89
90 private:
91 ThreadSafeRingBuffer<Message>& m_buffer;
92 vector<deUint16> m_lastPayload;
93 vector<deUint32> m_payloadSum;
94 };
95
96 class Producer : public Thread
97 {
98 public:
Producer(ThreadSafeRingBuffer<Message> & buffer,deUint16 threadId,int dataSize)99 Producer (ThreadSafeRingBuffer<Message>& buffer, deUint16 threadId, int dataSize)
100 : m_buffer (buffer)
101 , m_threadId (threadId)
102 , m_dataSize (dataSize)
103 {
104 }
105
run(void)106 void run (void)
107 {
108 // Yield to give main thread chance to start other producers.
109 deSleep(1);
110
111 for (int ndx = 0; ndx < m_dataSize; ndx++)
112 m_buffer.pushFront(Message(m_threadId, (deUint16)ndx));
113 }
114
115 private:
116 ThreadSafeRingBuffer<Message>& m_buffer;
117 deUint16 m_threadId;
118 int m_dataSize;
119 };
120
121 } // anonymous
122
ThreadSafeRingBuffer_selfTest(void)123 void ThreadSafeRingBuffer_selfTest (void)
124 {
125 const int numIterations = 16;
126 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
127 {
128 Random rnd (iterNdx);
129 int bufSize = rnd.getInt(1, 2048);
130 int numProducers = rnd.getInt(1, 16);
131 int numConsumers = rnd.getInt(1, 16);
132 int dataSize = rnd.getInt(1000, 10000);
133 ThreadSafeRingBuffer<Message> buffer (bufSize);
134 vector<Producer*> producers;
135 vector<Consumer*> consumers;
136
137 for (int i = 0; i < numProducers; i++)
138 producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
139
140 for (int i = 0; i < numConsumers; i++)
141 consumers.push_back(new Consumer(buffer, numProducers));
142
143 // Start consumers.
144 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
145 (*i)->start();
146
147 // Start producers.
148 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
149 (*i)->start();
150
151 // Wait for producers.
152 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
153 (*i)->join();
154
155 // Write end messages for consumers.
156 for (int i = 0; i < numConsumers; i++)
157 buffer.pushFront(Message(0xffff, 0));
158
159 // Wait for consumers.
160 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
161 (*i)->join();
162
163 // Verify payload sums.
164 deUint32 refSum = 0;
165 for (int i = 0; i < dataSize; i++)
166 refSum += (deUint32)(deUint16)i;
167
168 for (int i = 0; i < numProducers; i++)
169 {
170 deUint32 cmpSum = 0;
171 for (int j = 0; j < numConsumers; j++)
172 cmpSum += consumers[j]->getPayloadSum((deUint16)i);
173 DE_TEST_ASSERT(refSum == cmpSum);
174 }
175
176 // Free resources.
177 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
178 delete *i;
179 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
180 delete *i;
181 }
182 }
183
184 } // de
185