1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <chrono>
17 #include <thread>
18
19 #include <gtest/gtest.h>
20
21 #include <base/containers/unique_ptr.h>
22 #include <core/implementation_uids.h>
23
24 #include "TestRunner.h"
25 #include "io/file_manager.h"
26 #include "threading/dispatcher_task_queue.h"
27 #include "threading/parallel_task_queue.h"
28 #include "threading/sequential_impl.h"
29 #include "threading/sequential_task_queue.h"
30 #include "threading/task_queue.h"
31
32 using namespace CORE_NS;
33 using namespace testing;
34 using namespace testing::ext;
35
36 namespace {
37 class Storage {
38 public:
Reset()39 void Reset()
40 {
41 data.clear();
42 }
Store(int number)43 void Store(int number)
44 {
45 std::lock_guard<std::mutex> lock(mutex);
46 data.push_back(number);
47 }
48
CheckValidity(size_t count)49 void CheckValidity(size_t count)
50 {
51 ASSERT_EQ(count, data.size());
52
53 for (size_t i = 1; i < data.size(); ++i) {
54 ASSERT_TRUE(data[i] > data[i - 1]);
55 }
56 }
57
58 std::vector<int> data;
59 std::mutex mutex;
60 };
61
62 static Storage g_storage;
63
64 template<size_t VALUE>
TestFunction()65 void TestFunction()
66 {
67 g_storage.Store(VALUE);
68 }
69
Wait(int ms)70 void Wait(int ms)
71 {
72 std::this_thread::sleep_for(std::chrono::milliseconds(ms));
73 }
74 } // namespace
75
76 struct TestContext {
77 std::shared_ptr<ISceneInit> sceneInit_ = nullptr;
78 CORE_NS::IEcs::Ptr ecs_;
79 };
80 static TestContext g_context;
81
82 using IntfPtr = BASE_NS::shared_ptr<CORE_NS::IInterface>;
83 using IntfWeakPtr = BASE_NS::weak_ptr<CORE_NS::IInterface>;
84 static constexpr BASE_NS::Uid ENGINE_THREAD{"2070e705-d061-40e4-bfb7-90fad2c280af"};
85 static constexpr BASE_NS::Uid APP_THREAD{"b2e8cef3-453a-4651-b564-5190f8b5190d"};
86 static constexpr BASE_NS::Uid IO_QUEUE{"be88e9a0-9cd8-45ab-be48-937953dc258f"};
87 static constexpr BASE_NS::Uid JS_RELEASE_THREAD{"3784fa96-b25b-4e9c-bbf1-e897d36f73af"};
88
SceneDispose(TestContext & context)89 bool SceneDispose(TestContext &context)
90 {
91 context.ecs_ = nullptr;
92 context.sceneInit_ = nullptr;
93 return true;
94 }
95
SceneCreate(TestContext & context)96 bool SceneCreate(TestContext &context)
97 {
98 context.sceneInit_ = CreateTestScene();
99 context.sceneInit_->LoadPluginsAndInit();
100 if (!context.sceneInit_->GetEngineInstance().engine_) {
101 WIDGET_LOGE("fail to get engine");
102 return false;
103 }
104 context.ecs_ = context.sceneInit_->GetEngineInstance().engine_->CreateEcs();
105 if (!context.ecs_) {
106 WIDGET_LOGE("fail to get ecs");
107 return false;
108 }
109 auto factory = GetInstance<ISystemGraphLoaderFactory>(UID_SYSTEM_GRAPH_LOADER);
110 auto systemGraphLoader = factory->Create(context.sceneInit_->GetEngineInstance().engine_->GetFileManager());
111 systemGraphLoader->Load("rofs3D://systemGraph.json", *(context.ecs_));
112 auto& ecs = *(context.ecs_);
113 ecs.Initialize();
114
115 using namespace SCENE_NS;
116 #if SCENE_META_TEST
117 auto fun = [&context]() {
118 auto &obr = META_NS::GetObjectRegistry();
119
120 context.params_ = interface_pointer_cast<META_NS::IMetadata>(obr.GetDefaultObjectContext());
121 if (!context.params_) {
122 CORE_LOG_E("default obj null");
123 }
124 context.scene_ =
125 interface_pointer_cast<SCENE_NS::IScene>(obr.Create(SCENE_NS::ClassId::Scene, context.params_));
126
127 auto onLoaded = META_NS::MakeCallback<META_NS::IOnChanged>([&context]() {
128 bool complete = false;
129 auto status = context.scene_->Status()->GetValue();
130 if (status == SCENE_NS::IScene::SCENE_STATUS_READY) {
131 // still in engine thread
132 complete = true;
133 } else if (status == SCENE_NS::IScene::SCENE_STATUS_LOADING_FAILED) {
134 // make sure we don't have anything in result if error
135 complete = true;
136 }
137
138 if (complete) {
139 if (context.scene_) {
140 auto &obr = META_NS::GetObjectRegistry();
141 // make sure we have renderconfig
142 auto rc = context.scene_->RenderConfiguration()->GetValue();
143 if (!rc) {
144 // Create renderconfig
145 rc = obr.Create<SCENE_NS::IRenderConfiguration>(SCENE_NS::ClassId::RenderConfiguration);
146 context.scene_->RenderConfiguration()->SetValue(rc);
147 }
148
149 interface_cast<IEcsScene>(context.scene_)
150 ->RenderMode()
151 ->SetValue(IEcsScene::RenderMode::RENDER_ALWAYS);
152 auto duh = context.params_->GetArrayPropertyByName<IntfWeakPtr>("Scenes");
153 if (!duh) {
154 return ;
155 }
156 duh->AddValue(interface_pointer_cast<CORE_NS::IInterface>(context.scene_));
157 }
158 }
159 });
160 context.scene_->Asynchronous()->SetValue(false);
161 context.scene_->Uri()->SetValue("scene://empty");
162 return META_NS::IAny::Ptr{};
163 };
164 // Should it be possible to cancel? (ie. do we need to Store the token for something ..)
165 META_NS::GetTaskQueueRegistry()
166 .GetTaskQueue(ENGINE_THREAD)
167 ->AddWaitableTask(META_NS::MakeCallback<META_NS::ITaskQueueWaitableTask>(BASE_NS::move(fun)))
168 ->Wait();
169 #endif
170 return true;
171 }
172
173 class TaskQueueTest : public testing::Test {
174 public:
SetUpTestSuite()175 static void SetUpTestSuite()
176 {
177 SceneCreate(g_context);
178 }
TearDownTestSuite()179 static void TearDownTestSuite()
180 {
181 SceneDispose(g_context);
182 }
SetUp()183 void SetUp() override {}
TearDown()184 void TearDown() override {}
185 };
186
187 /**
188 * @tc.name: TestExecutionOrder
189 * @tc.desc: test TestExecutionOrder
190 * @tc.type: FUNC
191 */
192 HWTEST_F(TaskQueueTest, TestExecutionOrder, TestSize.Level1)
193 {
194 SequentialTaskQueue queue(nullptr);
195
__anond6c8e3a20402() 196 queue.Submit(1, FunctionTask::Create([]() { g_storage.Store(1); }));
__anond6c8e3a20502() 197 queue.Submit(4, FunctionTask::Create([]() { g_storage.Store(4); }));
198 queue.SubmitAfter(1, 2, FunctionTask::Create(TestFunction<2>));
__anond6c8e3a20602() 199 queue.SubmitBefore(4, 3, FunctionTask::Create([]() { TestFunction<3>(); }));
200 queue.SubmitAfter(9, 5, FunctionTask::Create(TestFunction<5>));
201 queue.SubmitBefore(1, 0, FunctionTask::Create(std::bind(&Storage::Reset, &g_storage)));
202
203 // Execute and Wait for completion.
204 queue.Execute();
205
206 g_storage.CheckValidity(5);
207 }
208
209 /**
210 * @tc.name: TestHierarchy
211 * @tc.desc: test TestHierarchy
212 * @tc.type: FUNC
213 */
214 HWTEST_F(TaskQueueTest, TestHierarchy, TestSize.Level1)
215 {
216 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
217 auto threadPool = factory->CreateThreadPool(4U);
218 ASSERT_TRUE(threadPool);
219 EXPECT_EQ(threadPool->GetNumberOfThreads(), 4U);
220
221 SequentialTaskQueue mainQueue(threadPool);
222
223 mainQueue.Submit(0, FunctionTask::Create(std::bind(&Storage::Reset, &g_storage)));
224
__anond6c8e3a20702() 225 mainQueue.Submit(1, FunctionTask::Create([]() { g_storage.Store(1); }));
__anond6c8e3a20802() 226 mainQueue.Submit(2, FunctionTask::Create([]() { g_storage.Store(2); }));
__anond6c8e3a20902() 227 mainQueue.Submit(10, FunctionTask::Create([threadPool]() {
228 // Threaded sequential queue.
229 SequentialTaskQueue queue(threadPool);
230 queue.Submit(4, FunctionTask::Create([]() { g_storage.Store(4); }));
231 queue.Submit(5, FunctionTask::Create([]() { g_storage.Store(5); }));
232 queue.Submit(11, FunctionTask::Create([threadPool]() {
233 // Threaded parallel queue.
234 ParallelTaskQueue subQueue(threadPool);
235 subQueue.Submit(8, FunctionTask::Create([]() {
236 Wait(2000);
237 g_storage.Store(8);
238 }));
239 subQueue.Submit(7, FunctionTask::Create([]() {
240 Wait(1000);
241 g_storage.Store(7);
242 }));
243 subQueue.Execute();
244 }));
245
246 queue.SubmitBefore(11, 6, FunctionTask::Create([]() { g_storage.Store(6); }));
247 queue.SubmitAfter(11, 9, FunctionTask::Create([]() { g_storage.Store(9); }));
248 queue.Execute();
249 }));
250
__anond6c8e3a21102() 251 mainQueue.SubmitBefore(10, 3, FunctionTask::Create([]() { g_storage.Store(3); }));
252
253 // Execute (async) and Wait for completion.
254 mainQueue.ExecuteAsync();
255 mainQueue.Wait();
256
257 g_storage.CheckValidity(9);
258 }
259
260 /**
261 * @tc.name: TestParallelWithDependencies
262 * @tc.desc: test TestParallelWithDependencies
263 * @tc.type: FUNC
264 */
265 HWTEST_F(TaskQueueTest, TestParallelWithDependencies, TestSize.Level1)
266 {
267 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
268 auto threadPool = factory->CreateThreadPool(4);
269
270 ParallelTaskQueue mainQueue(threadPool);
271
272 // Reset.
__anond6c8e3a21202() 273 mainQueue.Submit(0, FunctionTask::Create([]() {
274 g_storage.Reset();
275 Wait(1000);
276 }));
277
278 // Store number 1
__anond6c8e3a21302() 279 mainQueue.SubmitAfter(0, 1, FunctionTask::Create([]() { g_storage.Store(1); }));
__anond6c8e3a21402() 280 mainQueue.SubmitAfter(0, 9, FunctionTask::Create([]() { g_storage.Store(9); }));
281 mainQueue.Remove(9);
282 mainQueue.Remove(10);
283 // Store number 2 after storing number 1.
__anond6c8e3a21502() 284 mainQueue.SubmitAfter(1, 2, FunctionTask::Create([]() { g_storage.Store(2); }));
285
286 // Store number 3 after Reset and storing number 2.
287 constexpr const uint64_t afterIds[] = { 2, 0 };
__anond6c8e3a21602() 288 mainQueue.SubmitAfter(afterIds, 3, FunctionTask::Create([]() { g_storage.Store(3); }));
289
290 // Execute (async) and Wait for completion.
291 mainQueue.ExecuteAsync();
292 mainQueue.Wait();
293
294 g_storage.CheckValidity(3);
295 }
296
297 /**
298 * @tc.name: TestBlocking
299 * @tc.desc: test TestBlocking
300 * @tc.type: FUNC
301 */
302 HWTEST_F(TaskQueueTest, TestBlocking, TestSize.Level1)
303 {
304 // Reserve 1 core for long running operations and 3 for quick operations.
305 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
306 auto threadPool = factory->CreateThreadPool(4);
307
308 g_storage.Reset();
309
310 // Create sequential queue with a few 'long running' tasks, this preserves one core.
311 SequentialTaskQueue sequentialQueue(threadPool);
312 for (int i = 0; i < 2; ++i) {
313 // First and last operation in storage.
__anond6c8e3a21702() 314 sequentialQueue.Submit(i, FunctionTask::Create([i]() {
315 g_storage.Store(i * 10);
316 Wait(2000);
317 }));
318 }
319
320 // Create parallel queue with 'quickly running' tasks, this preserves rest of the cores.
321 ParallelTaskQueue parallelQueue(threadPool);
322 for (int i = 0; i < 5; ++i) {
323 // Preserve index 0 for long running op that starts before this.
__anond6c8e3a21802() 324 parallelQueue.Submit(i, FunctionTask::Create([i]() {
325 Wait(50 * i);
326 g_storage.Store(i + 1);
327 }));
328 }
329
330 // Start long running operations on background.
331 sequentialQueue.ExecuteAsync();
332
333 // Wait 250ms to make sure 1st sequential task starts before the parallel ones.
334 Wait(250);
335
336 // Execute quick operations in parallel.
337 parallelQueue.Execute();
338
339 // Wait for long running operations to complete.
340 sequentialQueue.Wait();
341
342 g_storage.CheckValidity(7);
343 }
344
345 /**
346 * @tc.name: TestPrematureDestruction
347 * @tc.desc: test TestPrematureDestruction
348 * @tc.type: FUNC
349 */
350 HWTEST_F(TaskQueueTest, TestPrematureDestruction, TestSize.Level1)
351 {
352 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
353 auto threadPool = factory->CreateThreadPool(4);
354 {
355 g_storage.Reset();
356
357 // Creating with new so we can explicitly destroy this too early on purpose.
358 DispatcherTaskQueue* queue = new DispatcherTaskQueue(threadPool);
359
__anond6c8e3a21902() 360 queue->Submit(0, FunctionTask::Create([]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); }));
361
362 queue->ExecuteAsync();
363 delete queue;
364 }
365 }
366
367 /**
368 * @tc.name: TestDispatcherSync
369 * @tc.desc: test TestDispatcherSync
370 * @tc.type: FUNC
371 */
372 HWTEST_F(TaskQueueTest, TestDispatcherSync, TestSize.Level1)
373 {
374 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
375 auto threadPool = factory->CreateThreadPool(4);
376
377 g_storage.Reset();
378
379 DispatcherTaskQueue queue(threadPool);
380
381 // Submit a few tasks to queue.
382 for (int i = 0; i < 5; ++i) {
__anond6c8e3a21a02() 383 queue.Submit(i, FunctionTask::Create([i]() { g_storage.Store(i); }));
384 }
385
__anond6c8e3a21b02() 386 queue.SubmitAfter(5, 5, FunctionTask::Create([]() { g_storage.Store(5); }));
__anond6c8e3a21c02() 387 queue.SubmitAfter(1, 6, FunctionTask::Create([]() { g_storage.Store(6); }));
388
389 BASE_NS::vector<uint64_t> afterIdentifiers(1);
390 afterIdentifiers[0] = 6;
391 BASE_NS::array_view<const uint64_t> nullAfters;
392 BASE_NS::array_view<const uint64_t> singleafters =
393 BASE_NS::array_view<const uint64_t>(afterIdentifiers.data(), afterIdentifiers.size());
__anond6c8e3a21d02() 394 queue.SubmitAfter(nullAfters, 7, FunctionTask::Create([]() { g_storage.Store(7); }));
__anond6c8e3a21e02() 395 queue.SubmitAfter(singleafters, 7, FunctionTask::Create([]() { g_storage.Store(7); }));
396
397 queue.Remove(6);
398 queue.Remove(7);
399
400 for (int i = 0; i < 6; ++i) {
401 // Process one task from queue.
402 queue.Execute();
403
404 auto collectedTasks = queue.CollectFinishedTasks();
405 ASSERT_EQ(collectedTasks.size(), 1);
406 }
407
408 g_storage.CheckValidity(6);
409
410 queue.Clear();
411 }
412
413 /**
414 * @tc.name: TestDispatcherAsync
415 * @tc.desc: test TestDispatcherAsync
416 * @tc.type: FUNC
417 */
418 HWTEST_F(TaskQueueTest, TestDispatcherAsync, TestSize.Level1)
419 {
420 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
421 auto threadPool = factory->CreateThreadPool(4);
422
423 g_storage.Reset();
424
425 DispatcherTaskQueue queue(threadPool);
426
427 // Submit a few tasks to queue.
428 for (int i = 0; i < 5; ++i) {
__anond6c8e3a21f02() 429 queue.Submit(i, FunctionTask::Create([i]() { g_storage.Store(i); }));
430 }
431
432 // Process all tasks asynchronously, one by one.
433 size_t numberOfTasksCompleted = 0;
434 while (numberOfTasksCompleted != 5) {
435 // Progress queue.
436 queue.ExecuteAsync();
437
438 auto collectedTasks = queue.CollectFinishedTasks();
439 numberOfTasksCompleted += collectedTasks.size();
440 }
441
442 g_storage.CheckValidity(5);
443
444 // Test other taskqueue
445
446 const uint64_t afters[3] = { 9, 10, 11 };
447 BASE_NS::array_view<const uint64_t> afterIds(afters);
448 BASE_NS::array_view<const uint64_t> nullAfters;
449
450 auto dispatcher = factory->CreateDispatcherTaskQueue(threadPool);
__anond6c8e3a22002() 451 dispatcher->Submit(10, FunctionTask::Create([]() { g_storage.Store(10); }));
__anond6c8e3a22102() 452 dispatcher->SubmitAfter(10, 11, FunctionTask::Create([]() { g_storage.Store(11); }));
__anond6c8e3a22202() 453 dispatcher->SubmitAfter(afterIds, 12, FunctionTask::Create([]() { g_storage.Store(12); }));
__anond6c8e3a22302() 454 dispatcher->SubmitAfter(nullAfters, 15, FunctionTask::Create([]() { g_storage.Store(15); }));
455 dispatcher->Execute();
456 EXPECT_EQ(dispatcher->CollectFinishedTasks().size(), 1);
457 dispatcher->Clear();
458
459 auto sequential = factory->CreateSequentialTaskQueue(threadPool);
__anond6c8e3a22402() 460 sequential->Submit(10, FunctionTask::Create([]() { g_storage.Store(10); }));
__anond6c8e3a22502() 461 sequential->SubmitAfter(10, 11, FunctionTask::Create([]() { g_storage.Store(11); }));
__anond6c8e3a22602() 462 sequential->SubmitAfter(afterIds, 12, FunctionTask::Create([]() { g_storage.Store(12); }));
463 sequential->Execute();
464 sequential->Clear();
465
466 auto parallel = factory->CreateParallelTaskQueue(threadPool);
__anond6c8e3a22702() 467 parallel->Submit(10, FunctionTask::Create([]() { g_storage.Store(10); }));
__anond6c8e3a22802() 468 parallel->SubmitAfter(10, 11, FunctionTask::Create([]() { g_storage.Store(11); }));
__anond6c8e3a22902() 469 parallel->SubmitAfter(afterIds, 12, FunctionTask::Create([]() { g_storage.Store(12); }));
470 parallel->Execute();
471 parallel->Clear();
472
473 EXPECT_TRUE(threadPool->GetInterface(IThreadPool::UID) != nullptr);
474 EXPECT_FALSE(threadPool->GetInterface(UID_TASK_QUEUE_FACTORY) != nullptr);
475 const auto& constPool = *threadPool;
476 EXPECT_TRUE(constPool.GetInterface(IThreadPool::UID) != nullptr);
477 EXPECT_FALSE(constPool.GetInterface(UID_TASK_QUEUE_FACTORY) != nullptr);
478
479 factory->Ref();
480 factory->Unref();
481
482 EXPECT_TRUE(threadPool->Push(nullptr) != nullptr);
483 }
484
485 /**
486 * @tc.name: TestSequentialMethods
487 * @tc.desc: test TestSequentialMethods
488 * @tc.type: FUNC
489 */
490 HWTEST_F(TaskQueueTest, TestSequentialMethods, TestSize.Level1)
491 {
492 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
493 auto threadPool = factory->CreateThreadPool(1);
494
495 g_storage.Reset();
496
497 auto seq = factory->CreateSequentialTaskQueue(threadPool);
498 EXPECT_TRUE(seq != nullptr);
499 SequentialTaskQueue sq { threadPool };
500 sq.Remove(1);
__anond6c8e3a22a02() 501 sq.Submit(1, FunctionTask::Create([]() { g_storage.Store(1); }));
502 sq.Remove(1);
503 }
504
505 /**
506 * @tc.name: TestDispatcherOverMultipleFrames
507 * @tc.desc: test TestDispatcherOverMultipleFrames
508 * @tc.type: FUNC
509 */
510 HWTEST_F(TaskQueueTest, TestDispatcherOverMultipleFrames, TestSize.Level1)
511 {
512 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
513 auto threadPool = factory->CreateThreadPool(4);
514
515 g_storage.Reset();
516
517 DispatcherTaskQueue queue(threadPool);
518
519 const int tasksToSpawn = 10;
520 int numberOfTasksSpawned = 0;
521 size_t numberOfTasksExecuted = 0;
522
523 while (numberOfTasksExecuted != tasksToSpawn) {
524 if (numberOfTasksSpawned < tasksToSpawn) {
525 // Spawn new task.
526 int timeout = (numberOfTasksSpawned & 0x1) ? 8 : 24;
__anond6c8e3a22b02() 527 queue.Submit(numberOfTasksSpawned, FunctionTask::Create([numberOfTasksSpawned, timeout]() {
528 Wait(timeout);
529 g_storage.Store(numberOfTasksSpawned);
530 }));
531 numberOfTasksSpawned++;
532 }
533
534 // Progress queue.
535 queue.ExecuteAsync();
536
537 // Progress "frame".
538 Wait(16);
539
540 // Collect results.
541 auto collectedTasks = queue.CollectFinishedTasks();
542 numberOfTasksExecuted += collectedTasks.size();
543 }
544
545 g_storage.CheckValidity(10);
546 }
547
548 /**
549 * @tc.name: MultithreadIo
550 * @tc.desc: test MultithreadIo
551 * @tc.type: FUNC
552 */
553 HWTEST_F(TaskQueueTest, MultithreadIo, TestSize.Level1)
554 {
555 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
556 auto threadPool = factory->CreateThreadPool(4);
557
558 g_storage.Reset();
559
560 SequentialTaskQueue queue(threadPool);
561
__anond6c8e3a22c02() 562 queue.Submit(1, FunctionTask::Create([]() {
563 auto& files = g_context.sceneInit_->GetEngineInstance().engine_->GetFileManager();
564
565 auto directory = files.OpenDirectory("file:///data/local/test_data/io/test_directory");
566 ASSERT_TRUE(directory != nullptr);
567
568 // There should be 5 files and 1 directory.
569 int fileCount = 0;
570 int dirCount = 0;
571 for (const auto& entry : directory->GetEntries()) {
572 if (entry.type == IDirectory::Entry::FILE) {
573 fileCount++;
574 } else if (entry.type == IDirectory::Entry::DIRECTORY) {
575 if (entry.name != "." && entry.name != "..") {
576 dirCount++;
577 }
578 }
579 }
580 ASSERT_EQ(fileCount, 5);
581 ASSERT_EQ(dirCount, 1);
582
583 g_storage.Store(1);
584 }));
585
586 // Execute and Wait for completion.
587 queue.ExecuteAsync();
588 queue.Wait();
589
590 g_storage.CheckValidity(1);
591 }
592
593 /**
594 * @tc.name: TestThreadPool
595 * @tc.desc: test TestThreadPool
596 * @tc.type: FUNC
597 */
598 HWTEST_F(TaskQueueTest, TestThreadPool, TestSize.Level1)
599 {
600 const auto factory = GetInstance<ITaskQueueFactory>(UID_TASK_QUEUE_FACTORY);
601 auto threadPool = factory->CreateThreadPool(4U);
602 {
603 // task which resets the storage after a delay. the delay tries to give time to add more tasks.
__anond6c8e3a22d02() 604 auto resetTask = FunctionTask::Create([]() {
605 Wait(100);
606 g_storage.Reset();
607 });
608 const auto* resetTaskPtr = resetTask.get();
609 threadPool->PushNoWait(BASE_NS::move(resetTask));
610
611 // two tasks which should Wait for the Reset task.
__anond6c8e3a22e02() 612 auto task1 = FunctionTask::Create([]() {
613 g_storage.Store(1);
614 Wait(50);
615 });
616 auto task1Ptr = task1.get();
617 const CORE_NS::IThreadPool::ITask* deps0[] = { resetTaskPtr };
618 threadPool->PushNoWait(BASE_NS::move(task1), deps0);
619
__anond6c8e3a22f02() 620 auto task2 = FunctionTask::Create([]() {
621 Wait(50);
622 g_storage.Store(2);
623 });
624 auto task2Ptr = task2.get();
625 threadPool->PushNoWait(BASE_NS::move(task2), deps0);
626
627 // one more task which should start after the above tasks.
__anond6c8e3a23002() 628 auto task3 = FunctionTask::Create([]() { g_storage.Store(3); });
629 auto task3Ptr = task3.get();
630 const CORE_NS::IThreadPool::ITask* deps12[] = { task1Ptr, task2Ptr };
631 auto result = threadPool->Push(BASE_NS::move(task3), deps12);
632
633 // assuming tasks were created fast enough the last task isn't ready until we Wait.
634 EXPECT_FALSE(result->IsDone());
635 result->Wait();
636 EXPECT_TRUE(result->IsDone());
637
638 ASSERT_EQ(g_storage.data.size(), 3);
639 EXPECT_EQ(g_storage.data[2], 3);
640 }
641 // do the same without delays. dependencies should work even if the work was already completed.
642 {
643 // create all the tasks first to guarantee that the pointers are unique.
__anond6c8e3a23102() 644 auto resetTask = FunctionTask::Create([]() { g_storage.Reset(); });
__anond6c8e3a23202() 645 auto task1 = FunctionTask::Create([]() { g_storage.Store(1); });
__anond6c8e3a23302() 646 auto task2 = FunctionTask::Create([]() { g_storage.Store(2); });
__anond6c8e3a23402() 647 auto task3 = FunctionTask::Create([]() { g_storage.Store(3); });
648
649 const auto* resetTaskPtr = resetTask.get();
650 threadPool->PushNoWait(BASE_NS::move(resetTask));
651
652 // two tasks which should Wait for the Reset task.
653 auto task1Ptr = task1.get();
654 const CORE_NS::IThreadPool::ITask* deps0[] = { resetTaskPtr };
655 threadPool->PushNoWait(BASE_NS::move(task1), deps0);
656
657 auto task2Ptr = task2.get();
658 threadPool->PushNoWait(BASE_NS::move(task2), deps0);
659
660 // one more task which should start after the above tasks.
661 auto task3Ptr = task3.get();
662 const CORE_NS::IThreadPool::ITask* deps12[] = { task1Ptr, task2Ptr };
663 auto result = threadPool->Push(BASE_NS::move(task3), deps12);
664
665 // assuming tasks were created fast enough the last task isn't ready until we Wait.
666 EXPECT_FALSE(result->IsDone());
667 result->Wait();
668 EXPECT_TRUE(result->IsDone());
669
670 ASSERT_EQ(g_storage.data.size(), 3);
671 EXPECT_EQ(g_storage.data[2], 3);
672 }
673 }
674