1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "safe_queue.h"
16
17 #include <array>
18 #include <future>
19 #include <gtest/gtest.h>
20 #include <iostream>
21 #include <thread>
22 #include <chrono> // std::chrono::seconds
23 using namespace testing::ext;
24 using namespace std;
25
26 namespace OHOS {
27 namespace {
28 class UtilsSafeQueue : public testing::Test
29 {
30 };
31
32 const unsigned int QUEUE_SLOTS = 10;
33 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
34
35 class DemoThreadData
36 {
37 public:
DemoThreadData()38 DemoThreadData()
39 {
40 putStatus = false;
41 getStatus = false;
42 }
43 static SafeQueue<int> shareQueue;
44 bool putStatus;
45 bool getStatus;
46
Put(int i)47 void Put(int i)
48 {
49 shareQueue.Push(i);
50 putStatus = true;
51 }
52
Get(int & i)53 void Get(int &i)
54 {
55 shareQueue.Pop(i);
56 getStatus = true;
57 }
58 };
59
60 SafeQueue<int> DemoThreadData::shareQueue;
61
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)62 void PutHandleThreadDataTime(DemoThreadData &q, int i, std::chrono::system_clock::time_point absTime)
63 {
64 cout << "thread-" << std::this_thread::get_id() << " run time: "
65 << std::chrono::system_clock::to_time_t(absTime) << endl;
66 std::this_thread::sleep_until(absTime);
67
68 q.Put(i);
69 }
70
GetHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)71 void GetHandleThreadDataTime(DemoThreadData &q, int i, std::chrono::system_clock::time_point absTime)
72 {
73 cout << "thread-" << std::this_thread::get_id() << " run time: "
74 << std::chrono::system_clock::to_time_t(absTime) << endl;
75 std::this_thread::sleep_until(absTime);
76 int t = 0;
77 q.Get(t);
78 }
79
80 class TestThreading
81 {
82
83 public:
TestThreading()84 TestThreading()
85 {
86 demoDatas.fill(DemoThreadData());
87 }
88
AllThreadPut(std::time_t & timeT)89 void AllThreadPut(std::time_t &timeT)
90 {
91 using std::chrono::system_clock;
92 for (unsigned int i = 0; i < THREAD_NUM; i++)
93 {
94 threads[i] = std::thread(PutHandleThreadDataTime,
95 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
96 }
97 }
AllThreadGet(std::time_t & timeT)98 void AllThreadGet(std::time_t &timeT)
99 {
100 using std::chrono::system_clock;
101 for (unsigned int i = 0; i < THREAD_NUM; i++)
102 {
103 threads[i] = std::thread(GetHandleThreadDataTime,
104 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
105 }
106 }
107
GetThreadDatePushedStatus(unsigned int & pushedIn,unsigned int & unpushedIn)108 void GetThreadDatePushedStatus(unsigned int &pushedIn, unsigned int &unpushedIn)
109 {
110 pushedIn = 0;
111 unpushedIn = 0;
112 for (auto &t : demoDatas)
113 {
114 if (t.putStatus)
115 pushedIn++;
116 else
117 {
118 unpushedIn++;
119 }
120 }
121 }
122
GetThreadDateGetedStatus(unsigned int & getedOut,unsigned int & ungetedOut)123 void GetThreadDateGetedStatus(unsigned int &getedOut, unsigned int &ungetedOut)
124 {
125 getedOut = 0;
126 ungetedOut = 0;
127 for (auto &t : demoDatas)
128 {
129 if (t.getStatus)
130 getedOut++;
131 else
132 {
133 ungetedOut++;
134 }
135 }
136 }
137
ResetStatus()138 void ResetStatus()
139 {
140 for (auto &t : threads)
141 {
142 t.join();
143 }
144
145 DemoThreadData::shareQueue.Clear();
146 }
147
148 std::thread threads[THREAD_NUM];
149 std::array<DemoThreadData, THREAD_NUM> demoDatas;
150 };
151
152 /*
153 * Feature: SafeBlockQueue
154 * Function:put
155 * SubFunction: NA
156 * FunctionPoints:
157 * EnvConditions: NA
158 * CaseDescription: Multiple threads put , one thread gets, all threads finish running normally
159 */
160
161 HWTEST_F(UtilsSafeQueue, testMutilthreadPutAndOneThreadGetOnemptyQueue, TestSize.Level0)
162 {
163
164 TestThreading testThread;
165 using std::chrono::system_clock;
166
167 std::time_t timeT = system_clock::to_time_t(system_clock::now());
168 timeT += 2;
169 testThread.AllThreadPut(timeT);
170
171 // 1. queue is full and some threads is blocked
172 std::this_thread::sleep_for(std::chrono::seconds(3));
173 ASSERT_TRUE(DemoThreadData::shareQueue.Size() > 0);
174
175 unsigned int pushedIn = 0;
176 unsigned int unpushedIn = 0;
177
178 testThread.GetThreadDatePushedStatus(pushedIn, unpushedIn);
179
180 ASSERT_EQ(pushedIn, THREAD_NUM);
181
182 //2. get one out and wait some put in
183 for (unsigned int i = 0; i < THREAD_NUM; i++)
184 {
185 int t = 0;
186 testThread.demoDatas[0].Get(t);
187 }
188
189 std::this_thread::sleep_for(std::chrono::seconds(2));
190 // queue is full and some threads is blocked and is not joined
191 ASSERT_TRUE(DemoThreadData::shareQueue.Size() == 0);
192
193 // here means all thread end ok or if some operation blocked and the testcase blocked
194 testThread.ResetStatus();
195 }
196
197 /*
198 * Feature: SafeBlockQueue
199 * Function:put
200 * SubFunction: NA
201 * FunctionPoints:
202 * EnvConditions: NA
203 * CaseDescription: Multi-threaded put() and Multi-threaded get() on the empty queue. When all threads are waiting to reach a certain
204 * time-point, everyone run concurrently to see the status of the queue and the state of the thread.
205 */
206
207 HWTEST_F(UtilsSafeQueue, testMutilthreadPutAndGetConcurrently, TestSize.Level0)
208 {
209
210 using std::chrono::system_clock;
211
212 std::time_t timeT = system_clock::to_time_t(system_clock::now());
213 timeT += 2;
214
215 TestThreading putInTestThread;
216 putInTestThread.AllThreadPut(timeT);
217
218 TestThreading getOutTestThread;
219 getOutTestThread.AllThreadGet(timeT);
220
221 // 1. queue is full and some threads is blocked
222 std::this_thread::sleep_for(std::chrono::seconds(4));
223
224 unsigned int pushedIn = 0;
225 unsigned int unpushedIn = 0;
226 unsigned int getedOut = 0;
227 unsigned int ungetedOut = 0;
228 putInTestThread.GetThreadDatePushedStatus(pushedIn, unpushedIn);
229 getOutTestThread.GetThreadDateGetedStatus(getedOut, ungetedOut);
230 cout << "DemoThreadData::shareQueue.Size() = " << DemoThreadData::shareQueue.Size() << endl;
231 ASSERT_EQ(pushedIn, THREAD_NUM);
232 ASSERT_EQ(getedOut, THREAD_NUM);
233
234 putInTestThread.ResetStatus();
235 getOutTestThread.ResetStatus();
236 }
237
238 /*
239 * Feature: SafeBlockQueue
240 * Function:put
241 * SubFunction: NA
242 * FunctionPoints:
243 * EnvConditions: NA
244 * CaseDescription: Multi-threaded put() and Multi-threaded get() on the not empty queue. When all threads are waiting to reach a certain
245 * time-point, everyone run concurrently to see the status of the queue and the state of the thread.
246 */
247 HWTEST_F(UtilsSafeQueue, testMutilthreadConcurrentGetAndPopInNotEmptyQueue, TestSize.Level0)
248 {
249 //1. prepare
250 using std::chrono::system_clock;
251
252 std::time_t timeT = system_clock::to_time_t(system_clock::now());
253 timeT += 2;
254
255 ASSERT_TRUE(DemoThreadData::shareQueue.Size() == 0);
256 int t = 1;
257 for (unsigned int i = 0; i < THREAD_NUM; i++)
258 {
259 DemoThreadData::shareQueue.Push(t);
260 }
261
262 ASSERT_TRUE(DemoThreadData::shareQueue.Size() == THREAD_NUM);
263
264 //2. start thread put in not full queue
265
266 TestThreading putInTestThread;
267 putInTestThread.AllThreadPut(timeT);
268
269 TestThreading getOutTestThread;
270 getOutTestThread.AllThreadGet(timeT);
271
272 std::this_thread::sleep_for(std::chrono::seconds(3));
273 ASSERT_TRUE(DemoThreadData::shareQueue.Size() == THREAD_NUM);
274
275 unsigned int getedOut = 0;
276 unsigned int ungetedOut = 0;
277 unsigned int pushedIn = 0;
278 unsigned int unpushedIn = 0;
279 getOutTestThread.GetThreadDateGetedStatus(getedOut, ungetedOut);
280 putInTestThread.GetThreadDatePushedStatus(pushedIn, unpushedIn);
281
282 ASSERT_EQ(pushedIn, THREAD_NUM);
283 ASSERT_EQ(getedOut, THREAD_NUM);
284
285 // 3. reset status
286 putInTestThread.ResetStatus();
287 getOutTestThread.ResetStatus();
288 }
289 } // namespace
290 } // namespace OHOS