1 /* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "workerQueue.h" 17 18 namespace panda::es2panda::util { 19 DependsOn(WorkerJob * job)20void WorkerJob::DependsOn(WorkerJob *job) 21 { 22 job->dependants_.push_back(this); 23 dependencies_++; 24 } 25 Signal()26void WorkerJob::Signal() 27 { 28 { 29 std::lock_guard<std::mutex> lock(m_); 30 dependencies_--; 31 } 32 33 cond_.notify_one(); 34 } 35 WorkerQueue(size_t threadCount)36WorkerQueue::WorkerQueue(size_t threadCount) 37 { 38 threads_.reserve(threadCount); 39 40 for (size_t i = 0; i < threadCount; i++) { 41 threads_.push_back(os::thread::ThreadStart(Worker, this)); 42 } 43 } 44 ~WorkerQueue()45WorkerQueue::~WorkerQueue() 46 { 47 void *retval = nullptr; 48 49 std::unique_lock<std::mutex> lock(m_); 50 terminate_ = true; 51 lock.unlock(); 52 jobsAvailable_.notify_all(); 53 54 for (const auto handle_id : threads_) { 55 os::thread::ThreadJoin(handle_id, &retval); 56 } 57 } 58 Worker(WorkerQueue * queue)59void WorkerQueue::Worker(WorkerQueue *queue) 60 { 61 while (true) { 62 std::unique_lock<std::mutex> lock(queue->m_); 63 queue->jobsAvailable_.wait(lock, [queue]() { return queue->terminate_ || queue->jobsCount_ != 0; }); 64 65 if (queue->terminate_) { 66 return; 67 } 68 69 lock.unlock(); 70 71 queue->Consume(); 72 queue->jobsFinished_.notify_one(); 73 } 74 } 75 Consume()76void WorkerQueue::Consume() 77 { 78 std::unique_lock<std::mutex> lock(m_); 79 activeWorkers_++; 80 81 while (jobsCount_ > 0) { 82 --jobsCount_; 83 auto &job = *(jobs_[jobsCount_]); 84 85 lock.unlock(); 86 87 try { 88 job.Run(); 89 } catch (const Error &e) { 90 lock.lock(); 91 errors_.push_back(e); 92 lock.unlock(); 93 } 94 95 lock.lock(); 96 } 97 98 activeWorkers_--; 99 } 100 Wait()101void WorkerQueue::Wait() 102 { 103 std::unique_lock<std::mutex> lock(m_); 104 jobsFinished_.wait(lock, [this]() { return activeWorkers_ == 0 && jobsCount_ == 0; }); 105 for (auto it = jobs_.begin(); it != jobs_.end(); it++) { 106 if (*it != nullptr) { 107 delete *it; 108 *it = nullptr; 109 } 110 } 111 jobs_.clear(); 112 113 if (!errors_.empty()) { 114 // NOLINTNEXTLINE 115 throw errors_.front(); 116 } 117 } 118 } // namespace panda::es2panda::util 119