1 /* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "workerQueue.h" 17 18 namespace panda { 19 DependsOn(WorkerJob * job)20void WorkerJob::DependsOn(WorkerJob *job) 21 { 22 job->dependants_.push_back(this); 23 dependencies_++; 24 } 25 Signal()26void WorkerJob::Signal() 27 { 28 { 29 std::lock_guard<std::mutex> lock(m_); 30 dependencies_--; 31 } 32 33 cond_.notify_one(); 34 } 35 WorkerQueue(size_t threadCount)36WorkerQueue::WorkerQueue(size_t threadCount) 37 { 38 threads_.reserve(threadCount); 39 40 for (size_t i = 0; i < threadCount; i++) { 41 threads_.push_back(os::thread::ThreadStart(Worker, this)); 42 } 43 } 44 ~WorkerQueue()45WorkerQueue::~WorkerQueue() 46 { 47 void *retval = nullptr; 48 49 std::unique_lock<std::mutex> lock(m_); 50 terminate_ = true; 51 lock.unlock(); 52 jobsAvailable_.notify_all(); 53 54 for (const auto handle_id : threads_) { 55 os::thread::ThreadJoin(handle_id, &retval); 56 } 57 } 58 Worker(WorkerQueue * queue)59bool WorkerQueue::Worker(WorkerQueue *queue) 60 { 61 while (true) { 62 std::unique_lock<std::mutex> lock(queue->m_); 63 queue->jobsAvailable_.wait(lock, [queue]() { return queue->terminate_ || queue->jobsCount_ != 0; }); 64 65 if (queue->terminate_) { 66 return false; 67 } 68 69 lock.unlock(); 70 if (!queue->Consume()) { 71 return false; 72 } 73 74 queue->jobsFinished_.notify_one(); 75 return true; 76 } 77 } 78 Consume()79bool WorkerQueue::Consume() 80 { 81 std::unique_lock<std::mutex> lock(m_); 82 activeWorkers_++; 83 84 while (jobsCount_ > 0) { 85 --jobsCount_; 86 auto &job = *(jobs_[jobsCount_]); 87 88 lock.unlock(); 89 if (!job.Run()) { 90 return false; 91 } 92 lock.lock(); 93 } 94 95 activeWorkers_--; 96 return true; 97 } 98 Wait()99void WorkerQueue::Wait() 100 { 101 std::unique_lock<std::mutex> lock(m_); 102 jobsFinished_.wait(lock, [this]() { return activeWorkers_ == 0 && jobsCount_ == 0; }); 103 for (auto it = jobs_.begin(); it != jobs_.end(); it++) { 104 if (*it != nullptr) { 105 delete *it; 106 *it = nullptr; 107 } 108 } 109 jobs_.clear(); 110 } 111 } // namespace panda 112