1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "safe_block_queue.h"
17
18 #include <array>
19 #include <future>
20 #include <gtest/gtest.h>
21 #include <iostream>
22 #include <thread>
23
24 #include <iostream>
25
26 #include <chrono> // std::chrono::seconds
27 #include <iostream> // std::cout
28 #include <thread> // std::thread, std::this_thread::sleep_for
29 using namespace testing::ext;
30 using namespace std;
31
32 namespace OHOS {
33 namespace {
34 class UtilsSafeBlockQueue : public testing::Test {
35 };
36
37 const unsigned int QUEUE_SLOTS = 10;
38 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
39
40 class DemoThreadData {
41 public:
DemoThreadData()42 DemoThreadData()
43 {
44 putStatus = false;
45 getStatus = false;
46 }
47 static SafeBlockQueue<int> shareQueue;
48 bool putStatus;
49 bool getStatus;
50
Put(int i)51 void Put(int i)
52 {
53 shareQueue.Push(i);
54 putStatus = true;
55 }
56
Get()57 void Get()
58 {
59 shareQueue.Pop();
60 getStatus = true;
61 }
62 };
63
64 SafeBlockQueue<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
65
PutHandleThreadData(DemoThreadData & q,int i)66 void PutHandleThreadData(DemoThreadData& q, int i)
67 {
68 q.Put(i);
69 }
70
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)71 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
72 unsigned int& pushedIn, unsigned int& unpushedIn)
73 {
74 pushedIn = 0;
75 unpushedIn = 0;
76 for (auto& t : demoDatas) {
77 if (t.putStatus) {
78 pushedIn++;
79 }
80 else {
81 unpushedIn++;
82 }
83 }
84 }
85
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)86 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
87 unsigned int& getedOut, unsigned int& ungetedOut)
88 {
89 getedOut = 0;
90 ungetedOut = 0;
91 for (auto& t : demoDatas) {
92 if (t.getStatus) {
93 getedOut++;
94 }
95 else {
96 ungetedOut++;
97 }
98 }
99 }
100
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)101 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
102 {
103 cout << "thread-" << std::this_thread::get_id() << " run time: "
104 << std::chrono::system_clock::to_time_t(absTime) << endl;
105 std::this_thread::sleep_until(absTime);
106
107 q.Put(i);
108 }
109
GetHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void GetHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112 cout << "thread-" << std::this_thread::get_id() << " run time: "
113 << std::chrono::system_clock::to_time_t(absTime) << endl;
114 std::this_thread::sleep_until(absTime);
115
116 q.Get();
117 }
118
119 /*
120 * @tc.name: testPut001
121 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
122 */
123 HWTEST_F(UtilsSafeBlockQueue, testPut001, TestSize.Level0)
124 {
125 SafeBlockQueue<int> qi(10);
126 int i = 1;
127 qi.Push(i);
128 EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
129 }
130
131 /*
132 * @tc.name: testGet001
133 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
134 */
135 HWTEST_F(UtilsSafeBlockQueue, testGet001, TestSize.Level0)
136 {
137 SafeBlockQueue<int> qi(10);
138 for (int i = 0; i < 3; i++) {
139 qi.Push(i);
140 }
141 EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
142 int t = qi.Pop();
143 ASSERT_EQ(t, 0);
144 }
145
146 /*
147 * @tc.name: testMutilthreadPutAndBlock001
148 * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
149 */
150 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadPutAndBlock001, TestSize.Level0)
151 {
152 std::thread threads[THREAD_NUM];
153
154 std::array<DemoThreadData, THREAD_NUM> demoDatas;
155 demoDatas.fill(DemoThreadData());
156
157 for (unsigned int i = 0; i < THREAD_NUM; i++) {
158 threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
159 }
160
161 // 1. queue is full and some threads is blocked
162 std::this_thread::sleep_for(std::chrono::seconds(2));
163 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
164
165 unsigned int pushedIn = 0;
166 unsigned int unpushedIn = 0;
167 unsigned int getedOut = 0;
168 unsigned int ungetedOut = 0;
169 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
170
171 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
172 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
173
174 // 2. get one out and wait some put in
175 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
176 demoDatas[0].Get();
177 }
178
179 std::this_thread::sleep_for(std::chrono::seconds(2));
180 // queue is full and some threads is blocked and is not joined
181 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
182 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
183 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
184 ASSERT_EQ(pushedIn, THREAD_NUM);
185 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
186
187 for (auto& t : threads) {
188 t.join();
189 }
190
191 while (!DemoThreadData::shareQueue.IsEmpty()) {
192 demoDatas[0].Get();
193 }
194
195 // here means all thread end ok or if some operation blocked and the testcase blocked
196 }
197
198 /*
199 * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
200 * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
201 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
202 */
203 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
204 {
205 // 1. prepare
206 std::thread threads[THREAD_NUM];
207 std::array<DemoThreadData, THREAD_NUM> demoDatas;
208 demoDatas.fill(DemoThreadData());
209
210 using std::chrono::system_clock;
211
212 std::time_t timeT = system_clock::to_time_t(system_clock::now());
213 cout << "start time: " << timeT << endl;
214 timeT += 2;
215
216 // 2. start thread
217 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
218 for (unsigned int i = 0; i < THREAD_NUM; i++) {
219 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
220 }
221
222 // 1. queue is full and some threads is blocked
223 std::this_thread::sleep_for(std::chrono::seconds(4));
224 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
225
226 unsigned int pushedIn = 0;
227 unsigned int unpushedIn = 0;
228 unsigned int getedOut = 0;
229 unsigned int ungetedOut = 0;
230 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
231
232 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
233 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
234
235 // 2. get one out and wait some put in
236 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
237 demoDatas[0].Get();
238 }
239
240 std::this_thread::sleep_for(std::chrono::seconds(2));
241 // queue is full and some threads is blocked and is not joined
242 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
243 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
244 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
245 ASSERT_EQ(pushedIn, THREAD_NUM);
246 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
247
248 for (auto& t : threads) {
249 t.join();
250 }
251
252 while (!DemoThreadData::shareQueue.IsEmpty()) {
253 demoDatas[0].Get();
254 }
255 // here means all thread end ok or if some operation blocked and the testcase blocked
256 }
257
258 /*
259 * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
260 * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
261 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
262 */
263 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
264 {
265 // 1. prepare
266 std::thread threads[THREAD_NUM];
267 std::array<DemoThreadData, THREAD_NUM> demoDatas;
268 demoDatas.fill(DemoThreadData());
269
270 using std::chrono::system_clock;
271
272 std::time_t timeT = system_clock::to_time_t(system_clock::now());
273 cout << "start time: " << timeT << endl;
274 timeT += 2;
275 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
276 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
277 int t = i;
278 DemoThreadData::shareQueue.Push(t);
279 }
280 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
281
282 // 2. start thread put in full queue
283 for (unsigned int i = 0; i < THREAD_NUM; i++) {
284 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
285 }
286
287 std::this_thread::sleep_for(std::chrono::seconds(3));
288 // 3. now thread is running and all is blocked
289 unsigned int pushedIn = 0;
290 unsigned int unpushedIn = 0;
291 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
292 ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
293 ASSERT_EQ(unpushedIn, THREAD_NUM);
294 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
295 for (unsigned int i = 0; i < THREAD_NUM; i++) {
296 cout << "get out one and then the queue is full" << endl;
297 DemoThreadData::shareQueue.Pop();
298 std::this_thread::sleep_for(std::chrono::seconds(1));
299 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
300 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
301 ASSERT_EQ(pushedIn, i + 1);
302 ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
303 }
304
305 for (auto& t : threads) {
306 t.join();
307 }
308
309 while (!DemoThreadData::shareQueue.IsEmpty()) {
310 demoDatas[0].Get();
311 }
312 }
313
314 /*
315 * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
316 * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
317 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
318 */
319 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
320 {
321 // 1. prepare
322 std::thread threads[THREAD_NUM];
323 std::array<DemoThreadData, THREAD_NUM> demoDatas;
324 demoDatas.fill(DemoThreadData());
325
326 using std::chrono::system_clock;
327
328 std::time_t timeT = system_clock::to_time_t(system_clock::now());
329 cout << "start time: " << timeT << endl;
330 timeT += 2;
331 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
332
333 // 2. start thread put in empty queue
334 for (unsigned int i = 0; i < THREAD_NUM; i++) {
335 threads[i] = std::thread(GetHandleThreadDataTime,
336 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
337 }
338 std::this_thread::sleep_for(std::chrono::seconds(3));
339
340 // 3. now thread is running and all is blocked
341 unsigned int getedOut = 0;
342 unsigned int ungetedOut = 0;
343 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
344 ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
345 ASSERT_EQ(ungetedOut, THREAD_NUM);
346 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
347
348 int value = 1;
349
350 for (unsigned int i = 0; i < THREAD_NUM; i++) {
351 cout << "put in one and then the queue is empty" << endl;
352 DemoThreadData::shareQueue.Push(value);
353 std::this_thread::sleep_for(std::chrono::seconds(1));
354 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
355 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
356 ASSERT_EQ(getedOut, i + 1);
357 ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
358 }
359
360 for (auto& t : threads) {
361 t.join();
362 }
363
364 while (!DemoThreadData::shareQueue.IsEmpty()) {
365 demoDatas[0].Get();
366 }
367 }
368 /*
369 * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
370 * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
371 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
372 */
373 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
374 {
375 // 1. prepare
376 std::thread threads[THREAD_NUM];
377 std::array<DemoThreadData, THREAD_NUM> demoDatas;
378 demoDatas.fill(DemoThreadData());
379
380 using std::chrono::system_clock;
381
382 std::time_t timeT = system_clock::to_time_t(system_clock::now());
383 cout << "start time: " << timeT << endl;
384 timeT += 2;
385 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
386 int t = 1;
387 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
388 DemoThreadData::shareQueue.Push(t);
389 }
390 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
391
392 // 2. start thread put in full queue
393 for (unsigned int i = 0; i < THREAD_NUM; i++) {
394 threads[i] = std::thread(GetHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
395 }
396
397 std::this_thread::sleep_for(std::chrono::seconds(4));
398 // 3. start judge
399 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
400
401 unsigned int getedOut = 0;
402 unsigned int ungetedOut = 0;
403 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
404
405 ASSERT_EQ(getedOut, QUEUE_SLOTS);
406 ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
407
408 // 2. put one in and wait some get out
409 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
410 demoDatas[0].Put(t);
411 }
412
413 std::this_thread::sleep_for(std::chrono::seconds(2));
414 // queue is full and some threads is blocked and is not joined
415 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
416 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
417 ASSERT_EQ(getedOut, THREAD_NUM);
418 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
419
420 for (auto& t : threads) {
421 t.join();
422 }
423
424 while (!DemoThreadData::shareQueue.IsEmpty()) {
425 demoDatas[0].Get();
426 }
427 }
428
429 /*
430 * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
431 * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
432 * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
433 */
434 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
435 {
436 // 1. prepare
437 std::thread threads[THREAD_NUM];
438 std::array<DemoThreadData, THREAD_NUM> demoDatas;
439 demoDatas.fill(DemoThreadData());
440
441 using std::chrono::system_clock;
442
443 const unsigned int REMAIN_SLOTS = 5;
444 std::time_t timeT = system_clock::to_time_t(system_clock::now());
445 cout << "start time: " << timeT << endl;
446 timeT += 2;
447 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
448 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
449 int t = i;
450 DemoThreadData::shareQueue.Push(t);
451 }
452
453 // 2. start thread put in not full queue
454 for (unsigned int i = 0; i < THREAD_NUM; i++) {
455 threads[i] = std::thread(GetHandleThreadDataTime,
456 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
457 }
458
459 std::this_thread::sleep_for(std::chrono::seconds(3));
460
461 unsigned int getedOut = 0;
462 unsigned int ungetedOut = 0;
463 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
464 ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
465 ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
466
467 // 3. put ungetedOut
468 for (unsigned int i = 0; i < ungetedOut; i++) {
469 int t = i;
470 DemoThreadData::shareQueue.Push(t);
471 }
472
473 std::this_thread::sleep_for(std::chrono::seconds(1));
474 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
475 ASSERT_EQ(getedOut, THREAD_NUM);
476 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
477
478 for (auto& t : threads) {
479 t.join();
480 }
481
482 while (!DemoThreadData::shareQueue.IsEmpty()) {
483 demoDatas[0].Get();
484 }
485 }
486
487 /*
488 * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
489 * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
490 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
491 */
492 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
493 {
494 // 1. prepare
495 std::thread threads[THREAD_NUM];
496 std::array<DemoThreadData, THREAD_NUM> demoDatas;
497 demoDatas.fill(DemoThreadData());
498
499 using std::chrono::system_clock;
500
501 const unsigned int REMAIN_SLOTS = 5;
502 std::time_t timeT = system_clock::to_time_t(system_clock::now());
503 cout << "start time: " << timeT << endl;
504 timeT += 2;
505 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
506 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
507 int t = i;
508 DemoThreadData::shareQueue.Push(t);
509 }
510
511 // 2. start thread put in not full queue
512 for (unsigned int i = 0; i < THREAD_NUM; i++) {
513 threads[i] = std::thread(PutHandleThreadDataTime,
514 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
515 }
516
517 std::this_thread::sleep_for(std::chrono::seconds(3));
518
519 unsigned int putedin = 0;
520 unsigned int unputedin = 0;
521 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
522 ASSERT_EQ(putedin, REMAIN_SLOTS);
523 ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
524
525 // 3. put ungetedOut
526 for (unsigned int i = 0; i < unputedin; i++) {
527 DemoThreadData::shareQueue.Pop();
528 }
529
530 std::this_thread::sleep_for(std::chrono::seconds(1));
531 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
532 ASSERT_EQ(putedin, THREAD_NUM);
533 ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
534
535 for (auto& t : threads) {
536 t.join();
537 }
538
539 while (!DemoThreadData::shareQueue.IsEmpty()) {
540 demoDatas[0].Get();
541 }
542 }
543
544 /*
545 * @tc.name: testMutilthreadConcurrentGetAndPopInblankqueue001
546 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the empty queue. When all threads are waiting to reach
547 * a certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
548 */
549 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInblankqueue001, TestSize.Level0)
550 {
551 // 1. prepare
552 std::thread threadsout[THREAD_NUM];
553 std::array<DemoThreadData, THREAD_NUM> demoDatas;
554 demoDatas.fill(DemoThreadData());
555
556 std::thread threadsin[THREAD_NUM];
557
558 using std::chrono::system_clock;
559
560 std::time_t timeT = system_clock::to_time_t(system_clock::now());
561 cout << "start time: " << timeT << endl;
562 timeT += 2;
563 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
564
565 // 2. start thread put and get in empty queue
566
567 for (unsigned int i = 0; i < THREAD_NUM; i++) {
568 threadsout[i] = std::thread(GetHandleThreadDataTime,
569 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
570 }
571
572 for (unsigned int i = 0; i < THREAD_NUM; i++) {
573 threadsin[i] = std::thread(PutHandleThreadDataTime,
574 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
575 }
576
577 std::this_thread::sleep_for(std::chrono::seconds(3));
578
579 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
580 unsigned int getedOut = 0;
581 unsigned int ungetedOut = 0;
582 unsigned int pushedIn = 0;
583 unsigned int unpushedIn = 0;
584 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
585 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
586
587 ASSERT_EQ(pushedIn, THREAD_NUM);
588 ASSERT_EQ(getedOut, THREAD_NUM);
589
590 for (auto& t : threadsout) {
591 t.join();
592 }
593
594 for (auto& t : threadsin) {
595 t.join();
596 }
597
598 while (!DemoThreadData::shareQueue.IsEmpty()) {
599 demoDatas[0].Get();
600 }
601 }
602
603 /*
604 * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
605 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue.
606 * When all threads are waiting to reach a certain
607 * time-point, everyone run concurrently to see the status of the queue and the state of the thread.
608 */
609 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
610 {
611 // 1. prepare
612 std::thread threadsout[THREAD_NUM];
613 std::array<DemoThreadData, THREAD_NUM> demoDatas;
614 demoDatas.fill(DemoThreadData());
615
616 std::thread threadsin[THREAD_NUM];
617
618 using std::chrono::system_clock;
619
620 std::time_t timeT = system_clock::to_time_t(system_clock::now());
621 cout << "start time: " << timeT << endl;
622 timeT += 2;
623 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
624 int t = 1;
625 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
626 DemoThreadData::shareQueue.Push(t);
627 }
628 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
629 // 2. start thread put in not full queue
630 for (unsigned int i = 0; i < THREAD_NUM; i++) {
631 threadsin[i] = std::thread(PutHandleThreadDataTime,
632 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
633 }
634
635 for (unsigned int i = 0; i < THREAD_NUM; i++) {
636 threadsout[i] = std::thread(GetHandleThreadDataTime,
637 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
638 }
639
640 std::this_thread::sleep_for(std::chrono::seconds(3));
641
642 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
643 unsigned int getedOut = 0;
644 unsigned int ungetedOut = 0;
645 unsigned int pushedIn = 0;
646 unsigned int unpushedIn = 0;
647 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
648 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
649
650 ASSERT_EQ(pushedIn, THREAD_NUM);
651 ASSERT_EQ(getedOut, THREAD_NUM);
652
653 for (auto& t : threadsout) {
654 t.join();
655 }
656
657 for (auto& t : threadsin) {
658 t.join();
659 }
660
661 while (!DemoThreadData::shareQueue.IsEmpty()) {
662 demoDatas[0].Get();
663 }
664 }
665 } // namespace
666 } // namespace OHOS