• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "safe_block_queue.h"
16 
17 #include <array>
18 #include <future>
19 #include <gtest/gtest.h>
20 #include <thread>
21 #include <iostream>
22 #include <chrono> // std::chrono::seconds
23 using namespace testing::ext;
24 using namespace std;
25 
26 namespace OHOS {
27 namespace {
28 class UtilsSafeBlockQueueTracking : public testing::Test {
29 };
30 
31 const unsigned int QUEUE_SLOTS = 10;
32 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
33 
34 class DemoThreadData {
35 public:
DemoThreadData()36     DemoThreadData()
37     {
38         putStatus = false;
39         getStatus = false;
40         joinStatus = false;
41     }
42 
43     static SafeBlockQueueTracking<int> shareQueue;
44     static bool joinStatus;
45     bool putStatus;
46     bool getStatus;
47 
Put(int i)48     void Put(int i)
49     {
50         shareQueue.Push(i);
51         putStatus = true;
52     }
53 
Get()54     void Get()
55     {
56         shareQueue.Pop();
57         getStatus = true;
58     }
59 
GetAndOneTaskDone()60     void GetAndOneTaskDone()
61     {
62         shareQueue.Pop();
63         getStatus = true;
64         shareQueue.OneTaskDone();
65     }
66 
Join()67     void Join()
68     {
69         shareQueue.Join();
70         joinStatus = true;
71     }
72 };
73 
74 SafeBlockQueueTracking<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
75 bool DemoThreadData::joinStatus = false;
76 
PutHandleThreadData(DemoThreadData & q,int i)77 void PutHandleThreadData(DemoThreadData& q, int i)
78 {
79     q.Put(i);
80 }
81 
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)82 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
83                                              unsigned int& pushedIn, unsigned int& unpushedIn)
84 {
85     pushedIn = 0;
86     unpushedIn = 0;
87     for (auto& t : demoDatas) {
88         if (t.putStatus) {
89             pushedIn++;
90         } else {
91             unpushedIn++;
92         }
93     }
94 }
95 
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)96 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
97                                            unsigned int& getedOut, unsigned int& ungetedOut)
98 {
99     getedOut = 0;
100     ungetedOut = 0;
101     for (auto& t : demoDatas) {
102         if (t.getStatus) {
103             getedOut++;
104         } else {
105             ungetedOut++;
106         }
107     }
108 }
109 
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112     std::this_thread::sleep_until(absTime);
113 
114     q.Put(i);
115 }
116 
GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)117 void GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
118 {
119     std::this_thread::sleep_until(absTime);
120 
121     q.GetAndOneTaskDone();
122 }
123 
124 /*
125  * @tc.name: testPut001
126  * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
127  */
128 HWTEST_F(UtilsSafeBlockQueueTracking, testPut001, TestSize.Level0)
129 {
130     SafeBlockQueueTracking<int> qi(10);
131     int i = 1;
132     qi.Push(i);
133     EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
134     ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
135 }
136 
137 /*
138  * @tc.name: testGet001
139  * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
140  */
141 HWTEST_F(UtilsSafeBlockQueueTracking, testGet001, TestSize.Level0)
142 {
143     SafeBlockQueueTracking<int> qi(10);
144     for (int i = 0; i < 3; i++) {
145         qi.Push(i);
146     }
147     EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
148     int t = qi.Pop();
149     ASSERT_EQ(t, 0);
150     ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
151 }
152 
153 /*
154  * @tc.name: testMutilthreadPutAndBlock001
155  * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
156  */
157 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadPutAndBlock001, TestSize.Level0)
158 {
159     std::thread threads[THREAD_NUM];
160 
161     std::array<DemoThreadData, THREAD_NUM> demoDatas;
162     demoDatas.fill(DemoThreadData());
163     demoDatas[0].Put(1);
164     ASSERT_FALSE(demoDatas[0].joinStatus);
165 
166     // start thread to join
167     DemoThreadData tmp = DemoThreadData();
168     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
169     ASSERT_FALSE(demoDatas[0].joinStatus);
170 
171     for (unsigned int i = 0; i < THREAD_NUM; i++) {
172         threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
173     }
174     ASSERT_FALSE(demoDatas[0].joinStatus);
175     // 1. queue is full and some threads is blocked
176     std::this_thread::sleep_for(std::chrono::seconds(2));
177     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
178 
179     ASSERT_FALSE(demoDatas[0].joinStatus);
180     // 2.  get one out  and wait some put in
181     for (unsigned int i = 0; i < THREAD_NUM; i++) {
182         demoDatas[0].GetAndOneTaskDone();
183         ASSERT_FALSE(demoDatas[0].joinStatus);
184     }
185     ASSERT_FALSE(demoDatas[0].joinStatus);
186     demoDatas[0].GetAndOneTaskDone();
187     std::this_thread::sleep_for(std::chrono::seconds(1));
188 
189     ASSERT_TRUE(demoDatas[0].joinStatus);
190 
191     // recover state
192     for (auto& t : threads) {
193         t.join();
194     }
195     joinThread.join();
196 
197     while (!DemoThreadData::shareQueue.IsEmpty()) {
198         demoDatas[0].GetAndOneTaskDone();
199     }
200     demoDatas[0].joinStatus = false;
201 }
202 
203 /*
204  * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
205  * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
206  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
207  */
208 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
209 {
210     // 1. prepare
211     std::thread threads[THREAD_NUM];
212     std::array<DemoThreadData, THREAD_NUM> demoDatas;
213     demoDatas.fill(DemoThreadData());
214 
215     using std::chrono::system_clock;
216 
217     std::time_t timeT = system_clock::to_time_t(system_clock::now());
218     timeT += 2;
219 
220     // 2. start thread
221     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
222     for (unsigned int i = 0; i < THREAD_NUM; i++) {
223         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
224     }
225 
226     // 3. queue is full and some threads is blocked
227     std::this_thread::sleep_for(std::chrono::seconds(4));
228 
229     // start thread to join
230     DemoThreadData tmp = DemoThreadData();
231     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
232     ASSERT_FALSE(demoDatas[0].joinStatus);
233 
234     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
235     unsigned int pushedIn = 0;
236     unsigned int unpushedIn = 0;
237     unsigned int getedOut = 0;
238     unsigned int ungetedOut = 0;
239     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
240 
241     ASSERT_EQ(pushedIn, QUEUE_SLOTS);
242     ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
243 
244     // get one out and wait some put in
245     for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
246         demoDatas[0].GetAndOneTaskDone();
247         ASSERT_FALSE(demoDatas[0].joinStatus);
248     }
249 
250     std::this_thread::sleep_for(std::chrono::seconds(2));
251     // queue is full and some threads is blocked and is not joined
252     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
253     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
254     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
255     ASSERT_EQ(pushedIn, THREAD_NUM);
256     ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
257 
258     for (auto& t : threads) {
259         t.join();
260     }
261 
262     while (!DemoThreadData::shareQueue.IsEmpty()) {
263         demoDatas[0].GetAndOneTaskDone();
264     }
265 
266     std::this_thread::sleep_for(std::chrono::seconds(1));
267     ASSERT_TRUE(demoDatas[0].joinStatus);
268     demoDatas[0].joinStatus = false;
269     joinThread.join();
270 }
271 
272 /*
273  * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
274  * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
275  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
276  */
277 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
278 {
279     // 1. prepare
280     std::thread threads[THREAD_NUM];
281     std::array<DemoThreadData, THREAD_NUM> demoDatas;
282     demoDatas.fill(DemoThreadData());
283 
284     using std::chrono::system_clock;
285 
286     std::time_t timeT = system_clock::to_time_t(system_clock::now());
287     timeT += 2;
288     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
289     for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
290         int t = i;
291         DemoThreadData::shareQueue.Push(t);
292     }
293     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
294 
295     // 2. start thread put in full queue
296     for (unsigned int i = 0; i < THREAD_NUM; i++) {
297         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
298     }
299 
300     std::this_thread::sleep_for(std::chrono::seconds(3));
301 
302     // 3. now thread is running and all is blocked
303     DemoThreadData tmp = DemoThreadData();
304     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
305     ASSERT_FALSE(demoDatas[0].joinStatus);
306 
307     unsigned int pushedIn = 0;
308     unsigned int unpushedIn = 0;
309     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
310     ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
311     ASSERT_EQ(unpushedIn, THREAD_NUM);
312     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
313     for (unsigned int i = 0; i < THREAD_NUM; i++) {
314         DemoThreadData::shareQueue.Pop();
315         DemoThreadData::shareQueue.OneTaskDone();
316 
317         std::this_thread::sleep_for(std::chrono::seconds(1));
318         ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
319         GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
320         ASSERT_EQ(pushedIn, i + 1);
321         ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
322     }
323 
324     for (auto& t : threads) {
325         t.join();
326     }
327 
328     while (!DemoThreadData::shareQueue.IsEmpty()) {
329         demoDatas[0].GetAndOneTaskDone();
330     }
331     demoDatas[0].joinStatus = false;
332     joinThread.join();
333 }
334 
335 /*
336  * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
337  * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
338  * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
339  */
340 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
341 {
342     // 1. prepare
343     std::thread threads[THREAD_NUM];
344     std::array<DemoThreadData, THREAD_NUM> demoDatas;
345     demoDatas.fill(DemoThreadData());
346 
347     using std::chrono::system_clock;
348 
349     std::time_t timeT = system_clock::to_time_t(system_clock::now());
350     timeT += 2;
351     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
352 
353     // 2. start thread put in empty queue
354     for (unsigned int i = 0; i < THREAD_NUM; i++) {
355         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
356             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
357     }
358     std::this_thread::sleep_for(std::chrono::seconds(3));
359 
360     // 3. now thread is running and all is blocked
361     unsigned int getedOut = 0;
362     unsigned int ungetedOut = 0;
363     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
364     ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
365     ASSERT_EQ(ungetedOut, THREAD_NUM);
366     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
367 
368     // start thread to join
369     DemoThreadData tmp = DemoThreadData();
370     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
371     ASSERT_FALSE(demoDatas[0].joinStatus);
372 
373     int value = 1;
374     for (unsigned int i = 0; i < THREAD_NUM; i++) {
375         DemoThreadData::shareQueue.Push(value);
376         std::this_thread::sleep_for(std::chrono::seconds(1));
377         ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
378         GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
379         ASSERT_EQ(getedOut, i + 1);
380         ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
381     }
382 
383     for (auto& t : threads) {
384         t.join();
385     }
386 
387     while (!DemoThreadData::shareQueue.IsEmpty()) {
388         demoDatas[0].GetAndOneTaskDone();
389     }
390 
391     ASSERT_TRUE(demoDatas[0].joinStatus);
392     demoDatas[0].joinStatus = false;
393     joinThread.join();
394 }
395 
396 /*
397  * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
398  * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
399  * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
400  */
401 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
402 {
403     // 1. prepare
404     std::thread threads[THREAD_NUM];
405     std::array<DemoThreadData, THREAD_NUM> demoDatas;
406     demoDatas.fill(DemoThreadData());
407 
408     using std::chrono::system_clock;
409 
410     std::time_t timeT = system_clock::to_time_t(system_clock::now());
411     timeT += 2;
412     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
413     int t = 1;
414     for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
415         DemoThreadData::shareQueue.Push(t);
416     }
417     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
418 
419     // start thread to join
420     demoDatas[0].joinStatus = false;
421     DemoThreadData tmp = DemoThreadData();
422     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
423     ASSERT_FALSE(demoDatas[0].joinStatus);
424 
425     // 2. start thread get in full queue
426     for (unsigned int i = 0; i < THREAD_NUM; i++) {
427         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
428             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
429     }
430 
431     std::this_thread::sleep_for(std::chrono::seconds(4));
432 
433     // 3. start judge
434     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
435     ASSERT_TRUE(demoDatas[0].joinStatus);
436 
437     unsigned int getedOut = 0;
438     unsigned int ungetedOut = 0;
439     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
440 
441     ASSERT_EQ(getedOut, QUEUE_SLOTS);
442     ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
443 
444     // put one in and wait some get out
445     for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
446         demoDatas[0].Put(t);
447     }
448 
449     demoDatas[0].joinStatus = false;
450     DemoThreadData tmp2 = DemoThreadData();
451     std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
452 
453     std::this_thread::sleep_for(std::chrono::seconds(2));
454 
455     // queue is full and some threads is blocked and is not joined
456     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
457     ASSERT_TRUE(demoDatas[0].joinStatus);
458 
459     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
460     ASSERT_EQ(getedOut, THREAD_NUM);
461     ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
462 
463     for (auto& t : threads) {
464         t.join();
465     }
466 
467     while (!DemoThreadData::shareQueue.IsEmpty()) {
468         demoDatas[0].GetAndOneTaskDone();
469     }
470 
471     demoDatas[0].joinStatus = false;
472     joinThread.join();
473     joinThread2.join();
474 }
475 
476 /*
477  * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
478  * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
479  * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
480  */
481 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
482 {
483     // 1. prepare
484     std::thread threads[THREAD_NUM];
485     std::array<DemoThreadData, THREAD_NUM> demoDatas;
486     demoDatas.fill(DemoThreadData());
487 
488     using std::chrono::system_clock;
489 
490     const unsigned int REMAIN_SLOTS = 5;
491     std::time_t timeT = system_clock::to_time_t(system_clock::now());
492     timeT += 2;
493     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
494     for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
495         int t = i;
496         DemoThreadData::shareQueue.Push(t);
497     }
498 
499     // start thread to join
500     demoDatas[0].joinStatus = false;
501     DemoThreadData tmp = DemoThreadData();
502     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
503     ASSERT_FALSE(demoDatas[0].joinStatus);
504 
505     // 2. start thread put in not full queue
506     for (unsigned int i = 0; i < THREAD_NUM; i++) {
507         threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
508             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
509     }
510 
511     std::this_thread::sleep_for(std::chrono::seconds(3));
512 
513     unsigned int getedOut = 0;
514     unsigned int ungetedOut = 0;
515     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
516     ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
517     ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
518     ASSERT_TRUE(demoDatas[0].joinStatus);
519     // 3. put ungetedOut
520     for (unsigned int i = 0; i < ungetedOut; i++) {
521         int t = i;
522         DemoThreadData::shareQueue.Push(t);
523     }
524     demoDatas[0].joinStatus = false;
525     DemoThreadData tmp2 = DemoThreadData();
526     std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
527 
528     std::this_thread::sleep_for(std::chrono::seconds(1));
529     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
530     ASSERT_EQ(getedOut, THREAD_NUM);
531     ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
532 
533     for (auto& t : threads) {
534         t.join();
535     }
536 
537     ASSERT_TRUE(demoDatas[0].joinStatus);
538     demoDatas[0].joinStatus = false;
539     joinThread.join();
540     joinThread2.join();
541 }
542 
543 /*
544  * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
545  * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
546  * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
547  */
548 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
549 {
550     // 1. prepare
551     std::thread threads[THREAD_NUM];
552     std::array<DemoThreadData, THREAD_NUM> demoDatas;
553     demoDatas.fill(DemoThreadData());
554 
555     using std::chrono::system_clock;
556 
557     const unsigned int REMAIN_SLOTS = 5;
558     std::time_t timeT = system_clock::to_time_t(system_clock::now());
559     timeT += 2;
560     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
561     for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
562         int t = i;
563         DemoThreadData::shareQueue.Push(t);
564     }
565 
566     // start thread to join
567     demoDatas[0].joinStatus = false;
568     DemoThreadData tmp = DemoThreadData();
569     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
570     ASSERT_FALSE(demoDatas[0].joinStatus);
571 
572     // 2. start thread put in not full queue
573     for (unsigned int i = 0; i < THREAD_NUM; i++) {
574         threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
575     }
576     ASSERT_FALSE(demoDatas[0].joinStatus);
577     std::this_thread::sleep_for(std::chrono::seconds(3));
578     ASSERT_FALSE(demoDatas[0].joinStatus);
579     unsigned int putedin = 0;
580     unsigned int unputedin = 0;
581     GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
582     ASSERT_EQ(putedin, REMAIN_SLOTS);
583     ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
584 
585     // 3. put ungetedOut
586     for (unsigned int i = 0; i < unputedin; i++) {
587         DemoThreadData::shareQueue.Pop();
588         DemoThreadData::shareQueue.OneTaskDone();
589     }
590 
591     std::this_thread::sleep_for(std::chrono::seconds(1));
592     GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
593     ASSERT_EQ(putedin, THREAD_NUM);
594     ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
595 
596     ASSERT_FALSE(demoDatas[0].joinStatus);
597 
598     for (auto& t : threads) {
599         t.join();
600     }
601 
602     while (!DemoThreadData::shareQueue.IsEmpty()) {
603         demoDatas[0].GetAndOneTaskDone();
604     }
605 
606     std::this_thread::sleep_for(std::chrono::seconds(1));
607     ASSERT_TRUE(demoDatas[0].joinStatus);
608     joinThread.join();
609     demoDatas[0].joinStatus = false;
610 }
611 
612 /*
613  * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
614  * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue. When all threads are waiting to reach a
615  * certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
616  */
617 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
618 {
619     // 1. prepare
620     std::thread threadsout[THREAD_NUM];
621     std::array<DemoThreadData, THREAD_NUM> demoDatas;
622     demoDatas.fill(DemoThreadData());
623 
624     std::thread threadsin[THREAD_NUM];
625 
626     using std::chrono::system_clock;
627 
628     std::time_t timeT = system_clock::to_time_t(system_clock::now());
629     timeT += 2;
630     ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
631     int t = 1;
632     for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
633         DemoThreadData::shareQueue.Push(t);
634     }
635     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
636 
637     // start thread to join
638     demoDatas[0].joinStatus = false;
639     DemoThreadData tmp = DemoThreadData();
640     std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
641     ASSERT_FALSE(demoDatas[0].joinStatus);
642 
643     // 2. start thread put in not full queue
644     for (unsigned int i = 0; i < THREAD_NUM; i++) {
645         threadsin[i] = std::thread(PutHandleThreadDataTime,
646             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
647     }
648 
649     for (unsigned int i = 0; i < THREAD_NUM; i++) {
650         threadsout[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
651             std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
652     }
653 
654     std::this_thread::sleep_for(std::chrono::seconds(3));
655 
656     ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
657     unsigned int getedOut = 0;
658     unsigned int ungetedOut = 0;
659     unsigned int pushedIn = 0;
660     unsigned int unpushedIn = 0;
661     GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
662     GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
663 
664     ASSERT_EQ(pushedIn, THREAD_NUM);
665     ASSERT_EQ(getedOut, THREAD_NUM);
666 
667     ASSERT_FALSE(demoDatas[0].joinStatus);
668     demoDatas[0].joinStatus = false;
669 
670     for (auto& t : threadsout) {
671         t.join();
672     }
673 
674     for (auto& t : threadsin) {
675         t.join();
676     }
677 
678     while (!DemoThreadData::shareQueue.IsEmpty()) {
679         demoDatas[0].GetAndOneTaskDone();
680     }
681     joinThread.join();
682     ASSERT_TRUE(demoDatas[0].joinStatus);
683     demoDatas[0].joinStatus = false;
684 }
685 }  // namespace
686 }  // namespace OHOS