1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "safe_block_queue.h"
17
18 #include <array>
19 #include <future>
20 #include <gtest/gtest.h>
21 #include <iostream>
22 #include <thread>
23
24 #include <iostream>
25
26 #include <chrono> // std::chrono::seconds
27 #include <iostream> // std::cout
28 #include <thread> // std::thread, std::this_thread::sleep_for
29
30 using namespace testing::ext;
31 using namespace OHOS;
32 using namespace std;
33
34 class UtilsSafeBlockQueue : public testing::Test {
35 };
36
task_put(SafeBlockQueue<int> & q,int i)37 void task_put(SafeBlockQueue<int>& q, int i)
38 {
39 q.Push(i);
40 }
41
42 const unsigned int QUEUE_SLOTS = 10;
43 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
44
45 class DemoThreadData {
46 public:
DemoThreadData()47 DemoThreadData()
48 {
49 putStatus = false;
50 getStatus = false;
51 }
52 static SafeBlockQueue<int> shareQueue;
53 bool putStatus;
54 bool getStatus;
55
Put(int i)56 void Put(int i)
57 {
58 shareQueue.Push(i);
59 putStatus = true;
60 }
61
Get()62 void Get()
63 {
64 shareQueue.Pop();
65 getStatus = true;
66 }
67 };
68 SafeBlockQueue<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
69
PutHandleThreadData(DemoThreadData & q,int i)70 void PutHandleThreadData(DemoThreadData& q, int i)
71 {
72 q.Put(i);
73 }
74
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)75 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
76 unsigned int& pushedIn, unsigned int& unpushedIn)
77 {
78 pushedIn = 0;
79 unpushedIn = 0;
80 for (auto& t : demoDatas) {
81 if (t.putStatus) {
82 pushedIn++;
83 }
84 else {
85 unpushedIn++;
86 }
87 }
88 }
89
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)90 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
91 unsigned int& getedOut, unsigned int& ungetedOut)
92 {
93 getedOut = 0;
94 ungetedOut = 0;
95 for (auto& t : demoDatas) {
96 if (t.getStatus) {
97 getedOut++;
98 }
99 else {
100 ungetedOut++;
101 }
102 }
103 }
104
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)105 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
106 {
107 cout << "thread-" << std::this_thread::get_id() << " run time: "
108 << std::chrono::system_clock::to_time_t(absTime) << endl;
109 std::this_thread::sleep_until(absTime);
110
111 q.Put(i);
112 }
113
GetHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)114 void GetHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
115 {
116 cout << "thread-" << std::this_thread::get_id() << " run time: "
117 << std::chrono::system_clock::to_time_t(absTime) << endl;
118 std::this_thread::sleep_until(absTime);
119
120 q.Get();
121 }
122
123 /*
124 * @tc.name: testPut001
125 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
126 */
127 HWTEST_F(UtilsSafeBlockQueue, testPut001, TestSize.Level1)
128 {
129 SafeBlockQueue<int> qi(10);
130 int i = 1;
131 qi.Push(i);
132 EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
133 }
134
135 /*
136 * @tc.name: testGet001
137 * @tc.desc: Single-threaded call put and get to determine that the normal scenario
138 */
139 HWTEST_F(UtilsSafeBlockQueue, testGet001, TestSize.Level1)
140 {
141 SafeBlockQueue<int> qi(10);
142 for (int i = 0; i < 3; i++) {
143 qi.Push(i);
144 }
145 EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
146 int t = qi.Pop();
147 ASSERT_EQ(t, 0);
148 }
149
150 /*
151 * @tc.name: testMutilthreadPutAndBlock001
152 * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
153 */
154 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadPutAndBlock001, TestSize.Level1)
155 {
156 std::thread threads[THREAD_NUM];
157
158 std::array<DemoThreadData, THREAD_NUM> demoDatas;
159 demoDatas.fill(DemoThreadData());
160
161 for (unsigned int i = 0; i < THREAD_NUM; i++) {
162 threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
163 }
164
165 // 1. queue is full and some threads is blocked
166 std::this_thread::sleep_for(std::chrono::seconds(2));
167 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
168
169 unsigned int pushedIn = 0;
170 unsigned int unpushedIn = 0;
171 unsigned int getedOut = 0;
172 unsigned int ungetedOut = 0;
173 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
174
175 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
176 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
177
178 // 2. get one out and wait some put in
179 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
180 demoDatas[0].Get();
181 }
182
183 std::this_thread::sleep_for(std::chrono::seconds(2));
184 // queue is full and some threads is blocked and is not joined
185 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
186 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
187 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
188 ASSERT_EQ(pushedIn, THREAD_NUM);
189 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
190
191 for (auto& t : threads) {
192 t.join();
193 }
194
195 while (!DemoThreadData::shareQueue.IsEmpty()) {
196 demoDatas[0].Get();
197 }
198
199 // here means all thread end ok or if some operation blocked and the testcase blocked
200 }
201
202 /*
203 * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
204 * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
205 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
206 */
207 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level1)
208 {
209 // 1. prepare
210 std::thread threads[THREAD_NUM];
211 std::array<DemoThreadData, THREAD_NUM> demoDatas;
212 demoDatas.fill(DemoThreadData());
213
214 using std::chrono::system_clock;
215
216 std::time_t timeT = system_clock::to_time_t(system_clock::now());
217 cout << "start time: " << timeT << endl;
218 timeT += 2;
219
220 // 2. start thread
221 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
222 for (unsigned int i = 0; i < THREAD_NUM; i++) {
223 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
224 }
225
226 // 1. queue is full and some threads is blocked
227 std::this_thread::sleep_for(std::chrono::seconds(4));
228 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
229
230 unsigned int pushedIn = 0;
231 unsigned int unpushedIn = 0;
232 unsigned int getedOut = 0;
233 unsigned int ungetedOut = 0;
234 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
235
236 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
237 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
238
239 // 2. get one out and wait some put in
240 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
241 demoDatas[0].Get();
242 }
243
244 std::this_thread::sleep_for(std::chrono::seconds(2));
245 // queue is full and some threads is blocked and is not joined
246 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
247 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
248 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
249 ASSERT_EQ(pushedIn, THREAD_NUM);
250 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
251
252 for (auto& t : threads) {
253 t.join();
254 }
255
256 while (!DemoThreadData::shareQueue.IsEmpty()) {
257 demoDatas[0].Get();
258 }
259 // here means all thread end ok or if some operation blocked and the testcase blocked
260 }
261
262 /*
263 * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
264 * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
265 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
266 */
267 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level1)
268 {
269 // 1. prepare
270 std::thread threads[THREAD_NUM];
271 std::array<DemoThreadData, THREAD_NUM> demoDatas;
272 demoDatas.fill(DemoThreadData());
273
274 using std::chrono::system_clock;
275
276 std::time_t timeT = system_clock::to_time_t(system_clock::now());
277 cout << "start time: " << timeT << endl;
278 timeT += 2;
279 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
280 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
281 int t = i;
282 DemoThreadData::shareQueue.Push(t);
283 }
284 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
285
286 // 2. start thread put in full queue
287 for (unsigned int i = 0; i < THREAD_NUM; i++) {
288 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
289 }
290
291 std::this_thread::sleep_for(std::chrono::seconds(3));
292 // 3. now thread is running and all is blocked
293 unsigned int pushedIn = 0;
294 unsigned int unpushedIn = 0;
295 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
296 ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
297 ASSERT_EQ(unpushedIn, THREAD_NUM);
298 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
299 for (unsigned int i = 0; i < THREAD_NUM; i++) {
300 cout << "get out one and then the queue is full" << endl;
301 DemoThreadData::shareQueue.Pop();
302 std::this_thread::sleep_for(std::chrono::seconds(1));
303 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
304 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
305 ASSERT_EQ(pushedIn, i + 1);
306 ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
307 }
308
309 for (auto& t : threads) {
310 t.join();
311 }
312
313 while (!DemoThreadData::shareQueue.IsEmpty()) {
314 demoDatas[0].Get();
315 }
316 }
317
318 /*
319 * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
320 * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
321 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
322 */
323 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level1)
324 {
325 // 1. prepare
326 std::thread threads[THREAD_NUM];
327 std::array<DemoThreadData, THREAD_NUM> demoDatas;
328 demoDatas.fill(DemoThreadData());
329
330 using std::chrono::system_clock;
331
332 std::time_t timeT = system_clock::to_time_t(system_clock::now());
333 cout << "start time: " << timeT << endl;
334 timeT += 2;
335 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
336
337 // 2. start thread put in empty queue
338 for (unsigned int i = 0; i < THREAD_NUM; i++) {
339 threads[i] = std::thread(GetHandleThreadDataTime,
340 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
341 }
342 std::this_thread::sleep_for(std::chrono::seconds(3));
343
344 // 3. now thread is running and all is blocked
345 unsigned int getedOut = 0;
346 unsigned int ungetedOut = 0;
347 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
348 ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
349 ASSERT_EQ(ungetedOut, THREAD_NUM);
350 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
351
352 int value = 1;
353
354 for (unsigned int i = 0; i < THREAD_NUM; i++) {
355 cout << "put in one and then the queue is empty" << endl;
356 DemoThreadData::shareQueue.Push(value);
357 std::this_thread::sleep_for(std::chrono::seconds(1));
358 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
359 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
360 ASSERT_EQ(getedOut, i + 1);
361 ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
362 }
363
364 for (auto& t : threads) {
365 t.join();
366 }
367
368 while (!DemoThreadData::shareQueue.IsEmpty()) {
369 demoDatas[0].Get();
370 }
371 }
372 /*
373 * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
374 * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
375 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
376 */
377 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level1)
378 {
379 // 1. prepare
380 std::thread threads[THREAD_NUM];
381 std::array<DemoThreadData, THREAD_NUM> demoDatas;
382 demoDatas.fill(DemoThreadData());
383
384 using std::chrono::system_clock;
385
386 std::time_t timeT = system_clock::to_time_t(system_clock::now());
387 cout << "start time: " << timeT << endl;
388 timeT += 2;
389 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
390 int t = 1;
391 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
392 DemoThreadData::shareQueue.Push(t);
393 }
394 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
395
396 // 2. start thread put in full queue
397 for (unsigned int i = 0; i < THREAD_NUM; i++) {
398 threads[i] = std::thread(GetHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
399 }
400
401 std::this_thread::sleep_for(std::chrono::seconds(4));
402 // 3. start judge
403 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
404
405 unsigned int getedOut = 0;
406 unsigned int ungetedOut = 0;
407 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
408
409 ASSERT_EQ(getedOut, QUEUE_SLOTS);
410 ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
411
412 // 2. put one in and wait some get out
413 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
414 demoDatas[0].Put(t);
415 }
416
417 std::this_thread::sleep_for(std::chrono::seconds(2));
418 // queue is full and some threads is blocked and is not joined
419 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
420 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
421 ASSERT_EQ(getedOut, THREAD_NUM);
422 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
423
424 for (auto& t : threads) {
425 t.join();
426 }
427
428 while (!DemoThreadData::shareQueue.IsEmpty()) {
429 demoDatas[0].Get();
430 }
431 }
432
433 /*
434 * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
435 * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
436 * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
437 */
438 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level1)
439 {
440 // 1. prepare
441 std::thread threads[THREAD_NUM];
442 std::array<DemoThreadData, THREAD_NUM> demoDatas;
443 demoDatas.fill(DemoThreadData());
444
445 using std::chrono::system_clock;
446
447 const unsigned int REMAIN_SLOTS = 5;
448 std::time_t timeT = system_clock::to_time_t(system_clock::now());
449 cout << "start time: " << timeT << endl;
450 timeT += 2;
451 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
452 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
453 int t = i;
454 DemoThreadData::shareQueue.Push(t);
455 }
456
457 // 2. start thread put in not full queue
458 for (unsigned int i = 0; i < THREAD_NUM; i++) {
459 threads[i] = std::thread(GetHandleThreadDataTime,
460 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
461 }
462
463 std::this_thread::sleep_for(std::chrono::seconds(3));
464
465 unsigned int getedOut = 0;
466 unsigned int ungetedOut = 0;
467 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
468 ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
469 ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
470
471 // 3. put ungetedOut
472 for (unsigned int i = 0; i < ungetedOut; i++) {
473 int t = i;
474 DemoThreadData::shareQueue.Push(t);
475 }
476
477 std::this_thread::sleep_for(std::chrono::seconds(1));
478 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
479 ASSERT_EQ(getedOut, THREAD_NUM);
480 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
481
482 for (auto& t : threads) {
483 t.join();
484 }
485
486 while (!DemoThreadData::shareQueue.IsEmpty()) {
487 demoDatas[0].Get();
488 }
489 }
490
491 /*
492 * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
493 * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
494 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
495 */
496 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level1)
497 {
498 // 1. prepare
499 std::thread threads[THREAD_NUM];
500 std::array<DemoThreadData, THREAD_NUM> demoDatas;
501 demoDatas.fill(DemoThreadData());
502
503 using std::chrono::system_clock;
504
505 const unsigned int REMAIN_SLOTS = 5;
506 std::time_t timeT = system_clock::to_time_t(system_clock::now());
507 cout << "start time: " << timeT << endl;
508 timeT += 2;
509 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
510 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
511 int t = i;
512 DemoThreadData::shareQueue.Push(t);
513 }
514
515 // 2. start thread put in not full queue
516 for (unsigned int i = 0; i < THREAD_NUM; i++) {
517 threads[i] = std::thread(PutHandleThreadDataTime,
518 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
519 }
520
521 std::this_thread::sleep_for(std::chrono::seconds(3));
522
523 unsigned int putedin = 0;
524 unsigned int unputedin = 0;
525 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
526 ASSERT_EQ(putedin, REMAIN_SLOTS);
527 ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
528
529 // 3. put ungetedOut
530 for (unsigned int i = 0; i < unputedin; i++) {
531 DemoThreadData::shareQueue.Pop();
532 }
533
534 std::this_thread::sleep_for(std::chrono::seconds(1));
535 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
536 ASSERT_EQ(putedin, THREAD_NUM);
537 ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
538
539 for (auto& t : threads) {
540 t.join();
541 }
542
543 while (!DemoThreadData::shareQueue.IsEmpty()) {
544 demoDatas[0].Get();
545 }
546 }
547
548 /*
549 * @tc.name: testMutilthreadConcurrentGetAndPopInblankqueue001
550 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the empty queue. When all threads are waiting to reach
551 * a certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
552 */
553 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInblankqueue001, TestSize.Level1)
554 {
555 // 1. prepare
556 std::thread threadsout[THREAD_NUM];
557 std::array<DemoThreadData, THREAD_NUM> demoDatas;
558 demoDatas.fill(DemoThreadData());
559
560 std::thread threadsin[THREAD_NUM];
561
562 using std::chrono::system_clock;
563
564 std::time_t timeT = system_clock::to_time_t(system_clock::now());
565 cout << "start time: " << timeT << endl;
566 timeT += 2;
567 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
568
569 // 2. start thread put and get in empty queue
570
571 for (unsigned int i = 0; i < THREAD_NUM; i++) {
572 threadsout[i] = std::thread(GetHandleThreadDataTime,
573 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
574 }
575
576 for (unsigned int i = 0; i < THREAD_NUM; i++) {
577 threadsin[i] = std::thread(PutHandleThreadDataTime,
578 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
579 }
580
581 std::this_thread::sleep_for(std::chrono::seconds(3));
582
583 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
584 unsigned int getedOut = 0;
585 unsigned int ungetedOut = 0;
586 unsigned int pushedIn = 0;
587 unsigned int unpushedIn = 0;
588 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
589 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
590
591 ASSERT_EQ(pushedIn, THREAD_NUM);
592 ASSERT_EQ(getedOut, THREAD_NUM);
593
594 for (auto& t : threadsout) {
595 t.join();
596 }
597
598 for (auto& t : threadsin) {
599 t.join();
600 }
601
602 while (!DemoThreadData::shareQueue.IsEmpty()) {
603 demoDatas[0].Get();
604 }
605 }
606
607 /*
608 * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
609 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue.
610 * When all threads are waiting to reach a certain
611 * time-point, everyone run concurrently to see the status of the queue and the state of the thread.
612 */
613 HWTEST_F(UtilsSafeBlockQueue, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level1)
614 {
615 // 1. prepare
616 std::thread threadsout[THREAD_NUM];
617 std::array<DemoThreadData, THREAD_NUM> demoDatas;
618 demoDatas.fill(DemoThreadData());
619
620 std::thread threadsin[THREAD_NUM];
621
622 using std::chrono::system_clock;
623
624 std::time_t timeT = system_clock::to_time_t(system_clock::now());
625 cout << "start time: " << timeT << endl;
626 timeT += 2;
627 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
628 int t = 1;
629 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
630 DemoThreadData::shareQueue.Push(t);
631 }
632 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
633 // 2. start thread put in not full queue
634 for (unsigned int i = 0; i < THREAD_NUM; i++) {
635 threadsin[i] = std::thread(PutHandleThreadDataTime,
636 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
637 }
638
639 for (unsigned int i = 0; i < THREAD_NUM; i++) {
640 threadsout[i] = std::thread(GetHandleThreadDataTime,
641 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
642 }
643
644 std::this_thread::sleep_for(std::chrono::seconds(3));
645
646 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
647 unsigned int getedOut = 0;
648 unsigned int ungetedOut = 0;
649 unsigned int pushedIn = 0;
650 unsigned int unpushedIn = 0;
651 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
652 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
653
654 ASSERT_EQ(pushedIn, THREAD_NUM);
655 ASSERT_EQ(getedOut, THREAD_NUM);
656
657 for (auto& t : threadsout) {
658 t.join();
659 }
660
661 for (auto& t : threadsin) {
662 t.join();
663 }
664
665 while (!DemoThreadData::shareQueue.IsEmpty()) {
666 demoDatas[0].Get();
667 }
668 }
669