1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fl/server/kernel/round/round_kernel.h"
18 #include <mutex>
19 #include <queue>
20 #include <chrono>
21 #include <thread>
22 #include <utility>
23 #include <string>
24 #include <vector>
25
26 namespace mindspore {
27 namespace fl {
28 namespace server {
29 namespace kernel {
RoundKernel()30 RoundKernel::RoundKernel() : name_(""), current_count_(0), required_count_(0), error_reason_(""), running_(true) {
31 release_thread_ = std::thread([&]() {
32 while (running_.load()) {
33 std::unique_lock<std::mutex> release_lock(release_mtx_);
34 // Detect whether there's any data needs to be released every 100 milliseconds.
35 if (heap_data_to_release_.empty()) {
36 release_lock.unlock();
37 std::this_thread::sleep_for(std::chrono::milliseconds(kReleaseDuration));
38 continue;
39 }
40
41 AddressPtr addr_ptr = heap_data_to_release_.front();
42 heap_data_to_release_.pop();
43 release_lock.unlock();
44
45 std::unique_lock<std::mutex> heap_data_lock(heap_data_mtx_);
46 if (heap_data_.count(addr_ptr) == 0) {
47 MS_LOG(ERROR) << "The data is not stored.";
48 continue;
49 }
50 // Manually release unique_ptr data.
51 heap_data_[addr_ptr].reset(nullptr);
52 (void)heap_data_.erase(heap_data_.find(addr_ptr));
53 }
54 });
55 }
56
~RoundKernel()57 RoundKernel::~RoundKernel() {
58 running_ = false;
59 if (release_thread_.joinable()) {
60 release_thread_.join();
61 }
62 }
63
OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &)64 void RoundKernel::OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &) { return; }
65
OnLastCountEvent(const std::shared_ptr<ps::core::MessageHandler> &)66 void RoundKernel::OnLastCountEvent(const std::shared_ptr<ps::core::MessageHandler> &) { return; }
67
StopTimer() const68 void RoundKernel::StopTimer() const {
69 if (stop_timer_cb_) {
70 stop_timer_cb_();
71 }
72 return;
73 }
74
FinishIteration() const75 void RoundKernel::FinishIteration() const {
76 if (finish_iteration_cb_) {
77 finish_iteration_cb_(true, "");
78 }
79 return;
80 }
81
Release(const AddressPtr & addr_ptr)82 void RoundKernel::Release(const AddressPtr &addr_ptr) {
83 if (addr_ptr == nullptr) {
84 MS_LOG(ERROR) << "Data to be released is empty.";
85 return;
86 }
87 std::unique_lock<std::mutex> lock(release_mtx_);
88 heap_data_to_release_.push(addr_ptr);
89 return;
90 }
91
set_name(const std::string & name)92 void RoundKernel::set_name(const std::string &name) { name_ = name; }
93
set_stop_timer_cb(const StopTimerCb & timer_stopper)94 void RoundKernel::set_stop_timer_cb(const StopTimerCb &timer_stopper) { stop_timer_cb_ = timer_stopper; }
95
set_finish_iteration_cb(const FinishIterCb & finish_iteration_cb)96 void RoundKernel::set_finish_iteration_cb(const FinishIterCb &finish_iteration_cb) {
97 finish_iteration_cb_ = finish_iteration_cb;
98 }
99
GenerateOutput(const std::vector<AddressPtr> & outputs,const void * data,size_t len)100 void RoundKernel::GenerateOutput(const std::vector<AddressPtr> &outputs, const void *data, size_t len) {
101 if (data == nullptr) {
102 MS_LOG(ERROR) << "The data is nullptr.";
103 return;
104 }
105
106 if (outputs.empty()) {
107 MS_LOG(ERROR) << "Generating output failed. Outputs size is empty.";
108 return;
109 }
110
111 std::unique_ptr<unsigned char[]> output_data = std::make_unique<unsigned char[]>(len);
112 if (output_data == nullptr) {
113 MS_LOG(ERROR) << "Output data is nullptr.";
114 return;
115 }
116
117 size_t dst_size = len;
118 int ret = memcpy_s(output_data.get(), dst_size, data, len);
119 if (ret != 0) {
120 MS_LOG(ERROR) << "memcpy_s error, errorno(" << ret << ")";
121 return;
122 }
123 outputs[0]->addr = output_data.get();
124 outputs[0]->size = len;
125
126 std::unique_lock<std::mutex> lock(heap_data_mtx_);
127 (void)heap_data_.insert(std::make_pair(outputs[0], std::move(output_data)));
128 return;
129 }
130 } // namespace kernel
131 } // namespace server
132 } // namespace fl
133 } // namespace mindspore
134