1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "safe_block_queue.h"
16
17 #include <array>
18 #include <future>
19 #include <gtest/gtest.h>
20 #include <thread>
21 #include <iostream>
22 #include <chrono> // std::chrono::seconds
23 using namespace testing::ext;
24 using namespace std;
25
26 namespace OHOS {
27 namespace {
28 class UtilsSafeBlockQueueTracking : public testing::Test {
29 };
30
31 const unsigned int QUEUE_SLOTS = 10;
32 const unsigned int THREAD_NUM = QUEUE_SLOTS + 1;
33
34 class DemoThreadData {
35 public:
DemoThreadData()36 DemoThreadData()
37 {
38 putStatus = false;
39 getStatus = false;
40 joinStatus = false;
41 }
42
43 static SafeBlockQueueTracking<int> shareQueue;
44 static bool joinStatus;
45 bool putStatus;
46 bool getStatus;
47
Put(int i)48 void Put(int i)
49 {
50 shareQueue.Push(i);
51 putStatus = true;
52 }
53
Get()54 void Get()
55 {
56 shareQueue.Pop();
57 getStatus = true;
58 }
59
GetAndOneTaskDone()60 void GetAndOneTaskDone()
61 {
62 shareQueue.Pop();
63 getStatus = true;
64 shareQueue.OneTaskDone();
65 }
66
Join()67 void Join()
68 {
69 shareQueue.Join();
70 joinStatus = true;
71 }
72 };
73
74 SafeBlockQueueTracking<int> DemoThreadData::shareQueue(QUEUE_SLOTS);
75 bool DemoThreadData::joinStatus = false;
76
PutHandleThreadData(DemoThreadData & q,int i)77 void PutHandleThreadData(DemoThreadData& q, int i)
78 {
79 q.Put(i);
80 }
81
GetThreadDatePushedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & pushedIn,unsigned int & unpushedIn)82 void GetThreadDatePushedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
83 unsigned int& pushedIn, unsigned int& unpushedIn)
84 {
85 pushedIn = 0;
86 unpushedIn = 0;
87 for (auto& t : demoDatas) {
88 if (t.putStatus) {
89 pushedIn++;
90 } else {
91 unpushedIn++;
92 }
93 }
94 }
95
GetThreadDateGetedStatus(std::array<DemoThreadData,THREAD_NUM> & demoDatas,unsigned int & getedOut,unsigned int & ungetedOut)96 void GetThreadDateGetedStatus(std::array<DemoThreadData, THREAD_NUM>& demoDatas,
97 unsigned int& getedOut, unsigned int& ungetedOut)
98 {
99 getedOut = 0;
100 ungetedOut = 0;
101 for (auto& t : demoDatas) {
102 if (t.getStatus) {
103 getedOut++;
104 } else {
105 ungetedOut++;
106 }
107 }
108 }
109
PutHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)110 void PutHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
111 {
112 std::this_thread::sleep_until(absTime);
113
114 q.Put(i);
115 }
116
GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData & q,int i,std::chrono::system_clock::time_point absTime)117 void GetAndOneTaskDoneHandleThreadDataTime(DemoThreadData& q, int i, std::chrono::system_clock::time_point absTime)
118 {
119 std::this_thread::sleep_until(absTime);
120
121 q.GetAndOneTaskDone();
122 }
123
124 /*
125 * @tc.name: testPut001
126 * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
127 */
128 HWTEST_F(UtilsSafeBlockQueueTracking, testPut001, TestSize.Level0)
129 {
130 SafeBlockQueueTracking<int> qi(10);
131 int i = 1;
132 qi.Push(i);
133 EXPECT_EQ(static_cast<unsigned>(1), qi.Size());
134 ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
135 }
136
137 /*
138 * @tc.name: testGet001
139 * @tc.desc: Single-threaded call put and get to determine that the normal scenario is working properly
140 */
141 HWTEST_F(UtilsSafeBlockQueueTracking, testGet001, TestSize.Level0)
142 {
143 SafeBlockQueueTracking<int> qi(10);
144 for (int i = 0; i < 3; i++) {
145 qi.Push(i);
146 }
147 EXPECT_EQ(static_cast<unsigned>(3), qi.Size());
148 int t = qi.Pop();
149 ASSERT_EQ(t, 0);
150 ASSERT_GT(qi.GetUnfinishTaskNum(), 0);
151 }
152
153 /*
154 * @tc.name: testMutilthreadPutAndBlock001
155 * @tc.desc: Multiple threads put until blocking runs, one thread gets, all threads finish running normally
156 */
157 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadPutAndBlock001, TestSize.Level0)
158 {
159 std::thread threads[THREAD_NUM];
160
161 std::array<DemoThreadData, THREAD_NUM> demoDatas;
162 demoDatas.fill(DemoThreadData());
163 demoDatas[0].Put(1);
164 ASSERT_FALSE(demoDatas[0].joinStatus);
165
166 // start thread to join
167 DemoThreadData tmp = DemoThreadData();
168 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
169 ASSERT_FALSE(demoDatas[0].joinStatus);
170
171 for (unsigned int i = 0; i < THREAD_NUM; i++) {
172 threads[i] = std::thread(PutHandleThreadData, std::ref(demoDatas[i]), i);
173 }
174 ASSERT_FALSE(demoDatas[0].joinStatus);
175 // 1. queue is full and some threads is blocked
176 std::this_thread::sleep_for(std::chrono::seconds(2));
177 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
178
179 ASSERT_FALSE(demoDatas[0].joinStatus);
180 // 2. get one out and wait some put in
181 for (unsigned int i = 0; i < THREAD_NUM; i++) {
182 demoDatas[0].GetAndOneTaskDone();
183 ASSERT_FALSE(demoDatas[0].joinStatus);
184 }
185 ASSERT_FALSE(demoDatas[0].joinStatus);
186 demoDatas[0].GetAndOneTaskDone();
187 std::this_thread::sleep_for(std::chrono::seconds(1));
188
189 ASSERT_TRUE(demoDatas[0].joinStatus);
190
191 // recover state
192 for (auto& t : threads) {
193 t.join();
194 }
195 joinThread.join();
196
197 while (!DemoThreadData::shareQueue.IsEmpty()) {
198 demoDatas[0].GetAndOneTaskDone();
199 }
200 demoDatas[0].joinStatus = false;
201 }
202
203 /*
204 * @tc.name: testMutilthreadConcurrentPutAndBlockInblankqueue001
205 * @tc.desc: Multi-threaded put() on the empty queue. When n threads are waiting to reach a certain
206 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
207 */
208 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInblankqueue001, TestSize.Level0)
209 {
210 // 1. prepare
211 std::thread threads[THREAD_NUM];
212 std::array<DemoThreadData, THREAD_NUM> demoDatas;
213 demoDatas.fill(DemoThreadData());
214
215 using std::chrono::system_clock;
216
217 std::time_t timeT = system_clock::to_time_t(system_clock::now());
218 timeT += 2;
219
220 // 2. start thread
221 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
222 for (unsigned int i = 0; i < THREAD_NUM; i++) {
223 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
224 }
225
226 // 3. queue is full and some threads is blocked
227 std::this_thread::sleep_for(std::chrono::seconds(4));
228
229 // start thread to join
230 DemoThreadData tmp = DemoThreadData();
231 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
232 ASSERT_FALSE(demoDatas[0].joinStatus);
233
234 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
235 unsigned int pushedIn = 0;
236 unsigned int unpushedIn = 0;
237 unsigned int getedOut = 0;
238 unsigned int ungetedOut = 0;
239 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
240
241 ASSERT_EQ(pushedIn, QUEUE_SLOTS);
242 ASSERT_EQ(unpushedIn, THREAD_NUM - QUEUE_SLOTS);
243
244 // get one out and wait some put in
245 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
246 demoDatas[0].GetAndOneTaskDone();
247 ASSERT_FALSE(demoDatas[0].joinStatus);
248 }
249
250 std::this_thread::sleep_for(std::chrono::seconds(2));
251 // queue is full and some threads is blocked and is not joined
252 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
253 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
254 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
255 ASSERT_EQ(pushedIn, THREAD_NUM);
256 ASSERT_EQ(getedOut, THREAD_NUM - QUEUE_SLOTS);
257
258 for (auto& t : threads) {
259 t.join();
260 }
261
262 while (!DemoThreadData::shareQueue.IsEmpty()) {
263 demoDatas[0].GetAndOneTaskDone();
264 }
265
266 std::this_thread::sleep_for(std::chrono::seconds(1));
267 ASSERT_TRUE(demoDatas[0].joinStatus);
268 demoDatas[0].joinStatus = false;
269 joinThread.join();
270 }
271
272 /*
273 * @tc.name: testMutilthreadConcurrentPutAndBlockInfullqueue001
274 * @tc.desc: Multi-threaded put() on the full queue. When n threads are waiting to reach a certain
275 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
276 */
277 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInfullqueue001, TestSize.Level0)
278 {
279 // 1. prepare
280 std::thread threads[THREAD_NUM];
281 std::array<DemoThreadData, THREAD_NUM> demoDatas;
282 demoDatas.fill(DemoThreadData());
283
284 using std::chrono::system_clock;
285
286 std::time_t timeT = system_clock::to_time_t(system_clock::now());
287 timeT += 2;
288 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
289 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
290 int t = i;
291 DemoThreadData::shareQueue.Push(t);
292 }
293 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
294
295 // 2. start thread put in full queue
296 for (unsigned int i = 0; i < THREAD_NUM; i++) {
297 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
298 }
299
300 std::this_thread::sleep_for(std::chrono::seconds(3));
301
302 // 3. now thread is running and all is blocked
303 DemoThreadData tmp = DemoThreadData();
304 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
305 ASSERT_FALSE(demoDatas[0].joinStatus);
306
307 unsigned int pushedIn = 0;
308 unsigned int unpushedIn = 0;
309 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
310 ASSERT_EQ(pushedIn, static_cast<unsigned int>(0));
311 ASSERT_EQ(unpushedIn, THREAD_NUM);
312 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
313 for (unsigned int i = 0; i < THREAD_NUM; i++) {
314 DemoThreadData::shareQueue.Pop();
315 DemoThreadData::shareQueue.OneTaskDone();
316
317 std::this_thread::sleep_for(std::chrono::seconds(1));
318 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
319 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
320 ASSERT_EQ(pushedIn, i + 1);
321 ASSERT_EQ(unpushedIn, THREAD_NUM - (i + 1));
322 }
323
324 for (auto& t : threads) {
325 t.join();
326 }
327
328 while (!DemoThreadData::shareQueue.IsEmpty()) {
329 demoDatas[0].GetAndOneTaskDone();
330 }
331 demoDatas[0].joinStatus = false;
332 joinThread.join();
333 }
334
335 /*
336 * @tc.name: testMutilthreadConcurrentGetAndBlockInblankqueue001
337 * @tc.desc: Multi-threaded get() on the empty queue. When n threads are waiting to reach a certain
338 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
339 */
340 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInblankqueue001, TestSize.Level0)
341 {
342 // 1. prepare
343 std::thread threads[THREAD_NUM];
344 std::array<DemoThreadData, THREAD_NUM> demoDatas;
345 demoDatas.fill(DemoThreadData());
346
347 using std::chrono::system_clock;
348
349 std::time_t timeT = system_clock::to_time_t(system_clock::now());
350 timeT += 2;
351 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
352
353 // 2. start thread put in empty queue
354 for (unsigned int i = 0; i < THREAD_NUM; i++) {
355 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
356 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
357 }
358 std::this_thread::sleep_for(std::chrono::seconds(3));
359
360 // 3. now thread is running and all is blocked
361 unsigned int getedOut = 0;
362 unsigned int ungetedOut = 0;
363 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
364 ASSERT_EQ(getedOut, static_cast<unsigned int>(0));
365 ASSERT_EQ(ungetedOut, THREAD_NUM);
366 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
367
368 // start thread to join
369 DemoThreadData tmp = DemoThreadData();
370 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
371 ASSERT_FALSE(demoDatas[0].joinStatus);
372
373 int value = 1;
374 for (unsigned int i = 0; i < THREAD_NUM; i++) {
375 DemoThreadData::shareQueue.Push(value);
376 std::this_thread::sleep_for(std::chrono::seconds(1));
377 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
378 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
379 ASSERT_EQ(getedOut, i + 1);
380 ASSERT_EQ(ungetedOut, THREAD_NUM - (i + 1));
381 }
382
383 for (auto& t : threads) {
384 t.join();
385 }
386
387 while (!DemoThreadData::shareQueue.IsEmpty()) {
388 demoDatas[0].GetAndOneTaskDone();
389 }
390
391 ASSERT_TRUE(demoDatas[0].joinStatus);
392 demoDatas[0].joinStatus = false;
393 joinThread.join();
394 }
395
396 /*
397 * @tc.name: testMutilthreadConcurrentGetAndBlockInfullqueue001
398 * @tc.desc: Multi-threaded get() on the full queue. When n threads are waiting to reach a certain
399 * time-point, everyone gets concurrent to see the status of the queue and the state of the thread.
400 */
401 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInfullqueue001, TestSize.Level0)
402 {
403 // 1. prepare
404 std::thread threads[THREAD_NUM];
405 std::array<DemoThreadData, THREAD_NUM> demoDatas;
406 demoDatas.fill(DemoThreadData());
407
408 using std::chrono::system_clock;
409
410 std::time_t timeT = system_clock::to_time_t(system_clock::now());
411 timeT += 2;
412 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
413 int t = 1;
414 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
415 DemoThreadData::shareQueue.Push(t);
416 }
417 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
418
419 // start thread to join
420 demoDatas[0].joinStatus = false;
421 DemoThreadData tmp = DemoThreadData();
422 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
423 ASSERT_FALSE(demoDatas[0].joinStatus);
424
425 // 2. start thread get in full queue
426 for (unsigned int i = 0; i < THREAD_NUM; i++) {
427 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
428 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
429 }
430
431 std::this_thread::sleep_for(std::chrono::seconds(4));
432
433 // 3. start judge
434 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
435 ASSERT_TRUE(demoDatas[0].joinStatus);
436
437 unsigned int getedOut = 0;
438 unsigned int ungetedOut = 0;
439 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
440
441 ASSERT_EQ(getedOut, QUEUE_SLOTS);
442 ASSERT_EQ(ungetedOut, THREAD_NUM - QUEUE_SLOTS);
443
444 // put one in and wait some get out
445 for (unsigned int i = 0; i < THREAD_NUM - QUEUE_SLOTS; i++) {
446 demoDatas[0].Put(t);
447 }
448
449 demoDatas[0].joinStatus = false;
450 DemoThreadData tmp2 = DemoThreadData();
451 std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
452
453 std::this_thread::sleep_for(std::chrono::seconds(2));
454
455 // queue is full and some threads is blocked and is not joined
456 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
457 ASSERT_TRUE(demoDatas[0].joinStatus);
458
459 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
460 ASSERT_EQ(getedOut, THREAD_NUM);
461 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
462
463 for (auto& t : threads) {
464 t.join();
465 }
466
467 while (!DemoThreadData::shareQueue.IsEmpty()) {
468 demoDatas[0].GetAndOneTaskDone();
469 }
470
471 demoDatas[0].joinStatus = false;
472 joinThread.join();
473 joinThread2.join();
474 }
475
476 /*
477 * @tc.name: testMutilthreadConcurrentGetAndBlockInnotfullqueue001
478 * @tc.desc: Multi-threaded get() on the notfull queue. When n threads are waiting to reach a certain
479 * time-point, everyone get concurrent to see the status of the queue and the state of the thread.
480 */
481 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndBlockInnotfullqueue001, TestSize.Level0)
482 {
483 // 1. prepare
484 std::thread threads[THREAD_NUM];
485 std::array<DemoThreadData, THREAD_NUM> demoDatas;
486 demoDatas.fill(DemoThreadData());
487
488 using std::chrono::system_clock;
489
490 const unsigned int REMAIN_SLOTS = 5;
491 std::time_t timeT = system_clock::to_time_t(system_clock::now());
492 timeT += 2;
493 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
494 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
495 int t = i;
496 DemoThreadData::shareQueue.Push(t);
497 }
498
499 // start thread to join
500 demoDatas[0].joinStatus = false;
501 DemoThreadData tmp = DemoThreadData();
502 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
503 ASSERT_FALSE(demoDatas[0].joinStatus);
504
505 // 2. start thread put in not full queue
506 for (unsigned int i = 0; i < THREAD_NUM; i++) {
507 threads[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
508 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
509 }
510
511 std::this_thread::sleep_for(std::chrono::seconds(3));
512
513 unsigned int getedOut = 0;
514 unsigned int ungetedOut = 0;
515 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
516 ASSERT_EQ(getedOut, QUEUE_SLOTS - REMAIN_SLOTS);
517 ASSERT_EQ(ungetedOut, THREAD_NUM - (QUEUE_SLOTS - REMAIN_SLOTS));
518 ASSERT_TRUE(demoDatas[0].joinStatus);
519 // 3. put ungetedOut
520 for (unsigned int i = 0; i < ungetedOut; i++) {
521 int t = i;
522 DemoThreadData::shareQueue.Push(t);
523 }
524 demoDatas[0].joinStatus = false;
525 DemoThreadData tmp2 = DemoThreadData();
526 std::thread joinThread2 = std::thread(&DemoThreadData::Join, tmp2);
527
528 std::this_thread::sleep_for(std::chrono::seconds(1));
529 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
530 ASSERT_EQ(getedOut, THREAD_NUM);
531 ASSERT_EQ(ungetedOut, static_cast<unsigned int>(0));
532
533 for (auto& t : threads) {
534 t.join();
535 }
536
537 ASSERT_TRUE(demoDatas[0].joinStatus);
538 demoDatas[0].joinStatus = false;
539 joinThread.join();
540 joinThread2.join();
541 }
542
543 /*
544 * @tc.name: testMutilthreadConcurrentPutAndBlockInnotfullqueue001
545 * @tc.desc: Multi-threaded put() on the not full queue. When n threads are waiting to reach a certain
546 * time-point, everyone puts concurrent to see the status of the queue and the state of the thread.
547 */
548 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentPutAndBlockInnotfullqueue001, TestSize.Level0)
549 {
550 // 1. prepare
551 std::thread threads[THREAD_NUM];
552 std::array<DemoThreadData, THREAD_NUM> demoDatas;
553 demoDatas.fill(DemoThreadData());
554
555 using std::chrono::system_clock;
556
557 const unsigned int REMAIN_SLOTS = 5;
558 std::time_t timeT = system_clock::to_time_t(system_clock::now());
559 timeT += 2;
560 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
561 for (unsigned int i = 0; i < QUEUE_SLOTS - REMAIN_SLOTS; i++) {
562 int t = i;
563 DemoThreadData::shareQueue.Push(t);
564 }
565
566 // start thread to join
567 demoDatas[0].joinStatus = false;
568 DemoThreadData tmp = DemoThreadData();
569 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
570 ASSERT_FALSE(demoDatas[0].joinStatus);
571
572 // 2. start thread put in not full queue
573 for (unsigned int i = 0; i < THREAD_NUM; i++) {
574 threads[i] = std::thread(PutHandleThreadDataTime, std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
575 }
576 ASSERT_FALSE(demoDatas[0].joinStatus);
577 std::this_thread::sleep_for(std::chrono::seconds(3));
578 ASSERT_FALSE(demoDatas[0].joinStatus);
579 unsigned int putedin = 0;
580 unsigned int unputedin = 0;
581 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
582 ASSERT_EQ(putedin, REMAIN_SLOTS);
583 ASSERT_EQ(unputedin, THREAD_NUM - REMAIN_SLOTS);
584
585 // 3. put ungetedOut
586 for (unsigned int i = 0; i < unputedin; i++) {
587 DemoThreadData::shareQueue.Pop();
588 DemoThreadData::shareQueue.OneTaskDone();
589 }
590
591 std::this_thread::sleep_for(std::chrono::seconds(1));
592 GetThreadDatePushedStatus(demoDatas, putedin, unputedin);
593 ASSERT_EQ(putedin, THREAD_NUM);
594 ASSERT_EQ(unputedin, static_cast<unsigned int>(0));
595
596 ASSERT_FALSE(demoDatas[0].joinStatus);
597
598 for (auto& t : threads) {
599 t.join();
600 }
601
602 while (!DemoThreadData::shareQueue.IsEmpty()) {
603 demoDatas[0].GetAndOneTaskDone();
604 }
605
606 std::this_thread::sleep_for(std::chrono::seconds(1));
607 ASSERT_TRUE(demoDatas[0].joinStatus);
608 joinThread.join();
609 demoDatas[0].joinStatus = false;
610 }
611
612 /*
613 * @tc.name: testMutilthreadConcurrentGetAndPopInfullqueue001
614 * @tc.desc: Multi-threaded put() and Multi-threaded get() on the full queue. When all threads are waiting to reach a
615 * certain time-point, everyone run concurrently to see the status of the queue and the state of the thread.
616 */
617 HWTEST_F(UtilsSafeBlockQueueTracking, testMutilthreadConcurrentGetAndPopInfullqueue001, TestSize.Level0)
618 {
619 // 1. prepare
620 std::thread threadsout[THREAD_NUM];
621 std::array<DemoThreadData, THREAD_NUM> demoDatas;
622 demoDatas.fill(DemoThreadData());
623
624 std::thread threadsin[THREAD_NUM];
625
626 using std::chrono::system_clock;
627
628 std::time_t timeT = system_clock::to_time_t(system_clock::now());
629 timeT += 2;
630 ASSERT_TRUE(DemoThreadData::shareQueue.IsEmpty());
631 int t = 1;
632 for (unsigned int i = 0; i < QUEUE_SLOTS; i++) {
633 DemoThreadData::shareQueue.Push(t);
634 }
635 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
636
637 // start thread to join
638 demoDatas[0].joinStatus = false;
639 DemoThreadData tmp = DemoThreadData();
640 std::thread joinThread = std::thread(&DemoThreadData::Join, tmp);
641 ASSERT_FALSE(demoDatas[0].joinStatus);
642
643 // 2. start thread put in not full queue
644 for (unsigned int i = 0; i < THREAD_NUM; i++) {
645 threadsin[i] = std::thread(PutHandleThreadDataTime,
646 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
647 }
648
649 for (unsigned int i = 0; i < THREAD_NUM; i++) {
650 threadsout[i] = std::thread(GetAndOneTaskDoneHandleThreadDataTime,
651 std::ref(demoDatas[i]), i, system_clock::from_time_t(timeT));
652 }
653
654 std::this_thread::sleep_for(std::chrono::seconds(3));
655
656 ASSERT_TRUE(DemoThreadData::shareQueue.IsFull());
657 unsigned int getedOut = 0;
658 unsigned int ungetedOut = 0;
659 unsigned int pushedIn = 0;
660 unsigned int unpushedIn = 0;
661 GetThreadDateGetedStatus(demoDatas, getedOut, ungetedOut);
662 GetThreadDatePushedStatus(demoDatas, pushedIn, unpushedIn);
663
664 ASSERT_EQ(pushedIn, THREAD_NUM);
665 ASSERT_EQ(getedOut, THREAD_NUM);
666
667 ASSERT_FALSE(demoDatas[0].joinStatus);
668 demoDatas[0].joinStatus = false;
669
670 for (auto& t : threadsout) {
671 t.join();
672 }
673
674 for (auto& t : threadsin) {
675 t.join();
676 }
677
678 while (!DemoThreadData::shareQueue.IsEmpty()) {
679 demoDatas[0].GetAndOneTaskDone();
680 }
681 joinThread.join();
682 ASSERT_TRUE(demoDatas[0].joinStatus);
683 demoDatas[0].joinStatus = false;
684 }
685 } // namespace
686 } // namespace OHOS