• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fl/server/kernel/round/round_kernel.h"
18 #include <mutex>
19 #include <queue>
20 #include <chrono>
21 #include <thread>
22 #include <utility>
23 #include <string>
24 #include <vector>
25 
26 namespace mindspore {
27 namespace fl {
28 namespace server {
29 namespace kernel {
RoundKernel()30 RoundKernel::RoundKernel() : name_(""), current_count_(0), required_count_(0), error_reason_(""), running_(true) {
31   release_thread_ = std::thread([&]() {
32     while (running_.load()) {
33       std::unique_lock<std::mutex> release_lock(release_mtx_);
34       // Detect whether there's any data needs to be released every 100 milliseconds.
35       if (heap_data_to_release_.empty()) {
36         release_lock.unlock();
37         std::this_thread::sleep_for(std::chrono::milliseconds(kReleaseDuration));
38         continue;
39       }
40 
41       AddressPtr addr_ptr = heap_data_to_release_.front();
42       heap_data_to_release_.pop();
43       release_lock.unlock();
44 
45       std::unique_lock<std::mutex> heap_data_lock(heap_data_mtx_);
46       if (heap_data_.count(addr_ptr) == 0) {
47         MS_LOG(ERROR) << "The data is not stored.";
48         continue;
49       }
50       // Manually release unique_ptr data.
51       heap_data_[addr_ptr].reset(nullptr);
52       (void)heap_data_.erase(heap_data_.find(addr_ptr));
53     }
54   });
55 }
56 
~RoundKernel()57 RoundKernel::~RoundKernel() {
58   running_ = false;
59   if (release_thread_.joinable()) {
60     release_thread_.join();
61   }
62 }
63 
OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &)64 void RoundKernel::OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &) { return; }
65 
OnLastCountEvent(const std::shared_ptr<ps::core::MessageHandler> &)66 void RoundKernel::OnLastCountEvent(const std::shared_ptr<ps::core::MessageHandler> &) { return; }
67 
StopTimer() const68 void RoundKernel::StopTimer() const {
69   if (stop_timer_cb_) {
70     stop_timer_cb_();
71   }
72   return;
73 }
74 
FinishIteration() const75 void RoundKernel::FinishIteration() const {
76   if (finish_iteration_cb_) {
77     finish_iteration_cb_(true, "");
78   }
79   return;
80 }
81 
Release(const AddressPtr & addr_ptr)82 void RoundKernel::Release(const AddressPtr &addr_ptr) {
83   if (addr_ptr == nullptr) {
84     MS_LOG(ERROR) << "Data to be released is empty.";
85     return;
86   }
87   std::unique_lock<std::mutex> lock(release_mtx_);
88   heap_data_to_release_.push(addr_ptr);
89   return;
90 }
91 
set_name(const std::string & name)92 void RoundKernel::set_name(const std::string &name) { name_ = name; }
93 
set_stop_timer_cb(const StopTimerCb & timer_stopper)94 void RoundKernel::set_stop_timer_cb(const StopTimerCb &timer_stopper) { stop_timer_cb_ = timer_stopper; }
95 
set_finish_iteration_cb(const FinishIterCb & finish_iteration_cb)96 void RoundKernel::set_finish_iteration_cb(const FinishIterCb &finish_iteration_cb) {
97   finish_iteration_cb_ = finish_iteration_cb;
98 }
99 
GenerateOutput(const std::vector<AddressPtr> & outputs,const void * data,size_t len)100 void RoundKernel::GenerateOutput(const std::vector<AddressPtr> &outputs, const void *data, size_t len) {
101   if (data == nullptr) {
102     MS_LOG(ERROR) << "The data is nullptr.";
103     return;
104   }
105 
106   if (outputs.empty()) {
107     MS_LOG(ERROR) << "Generating output failed. Outputs size is empty.";
108     return;
109   }
110 
111   std::unique_ptr<unsigned char[]> output_data = std::make_unique<unsigned char[]>(len);
112   if (output_data == nullptr) {
113     MS_LOG(ERROR) << "Output data is nullptr.";
114     return;
115   }
116 
117   size_t dst_size = len;
118   int ret = memcpy_s(output_data.get(), dst_size, data, len);
119   if (ret != 0) {
120     MS_LOG(ERROR) << "memcpy_s error, errorno(" << ret << ")";
121     return;
122   }
123   outputs[0]->addr = output_data.get();
124   outputs[0]->size = len;
125 
126   std::unique_lock<std::mutex> lock(heap_data_mtx_);
127   (void)heap_data_.insert(std::make_pair(outputs[0], std::move(output_data)));
128   return;
129 }
130 }  // namespace kernel
131 }  // namespace server
132 }  // namespace fl
133 }  // namespace mindspore
134