• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "ps/core/communicator/task_executor.h"
18 
19 namespace mindspore {
20 namespace ps {
21 namespace core {
TaskExecutor(size_t thread_num,size_t max_task_num,size_t submit_timeout)22 TaskExecutor::TaskExecutor(size_t thread_num, size_t max_task_num, size_t submit_timeout)
23     : running_(true),
24       thread_num_(thread_num),
25       idle_thread_num_(0),
26       submit_timeout_(submit_timeout),
27       max_task_num_(max_task_num),
28       task_num_(0) {
29   for (size_t i = 0; i < thread_num; i++) {
30     working_threads_.emplace_back([this]() {
31       std::function<void()> task;
32       while (true) {
33         std::unique_lock<std::mutex> lock(mtx_);
34         // Idle thread number increases when the mtx_ is locked.
35         idle_thread_num_++;
36 
37         if (!running_) {
38           // To avoid thread from blocking after destructor.
39           return;
40         }
41 
42         cv_.wait(lock);
43 
44         if (!running_ || task_queue_.empty()) {
45           return;
46         }
47 
48         task = task_queue_.front();
49         task_queue_.pop();
50         if (lock.owns_lock()) {
51           lock.unlock();
52         }
53 
54         task();
55       }
56     });
57   }
58   notify_thread_ = std::thread([this]() {
59     // If there is no idle thread, wait until the working thread is available.
60     while (running_) {
61       {
62         std::unique_lock<std::mutex> lock(mtx_);
63         if (idle_thread_num_ > 0 && task_num_ > 0) {
64           idle_thread_num_--;
65           task_num_--;
66           lock.unlock();
67           cv_.notify_one();
68         }
69       }
70       std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskIntervalInMs));
71     }
72   });
73 }
74 
~TaskExecutor()75 TaskExecutor::~TaskExecutor() {
76   {
77     std::unique_lock<std::mutex> lock(mtx_);
78     running_ = false;
79   }
80   cv_.notify_all();
81   for (auto &t : working_threads_) {
82     t.join();
83   }
84   notify_thread_.join();
85 }
86 }  // namespace core
87 }  // namespace ps
88 }  // namespace mindspore
89