1 /**
2 * Copyright 2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/device/gsm/swap_manager.h"
18
19 #include <functional>
20 #include <string>
21 #include <utility>
22
23 #include "include/common/utils/offload_context.h"
24 #include "utils/file_utils.h"
25 #include "utils/temp_file_manager.h"
26
27 namespace mindspore {
28 namespace device {
29 constexpr char kLinuxAioLibName[] = "libaio_plugin.so";
30 constexpr char kLinuxAioInstanceFuncName[] = "get_aio_instance";
31 constexpr size_t kFirstSizeLevel = 0xFFFFFFFFFFFFFFFF << 24; // 16M
32 constexpr size_t kSizeLevelNum = 8;
33 constexpr size_t kSwapMemAlignSize = 512;
34
CandidateIter(mindspore::device::SwappableTensorCandidates * candidates)35 SwappableTensorCandidates::CandidateIter::CandidateIter(mindspore::device::SwappableTensorCandidates *candidates)
36 : swappable_tensors_(candidates->swappable_tensors_),
37 null_index_(candidates->null_index_),
38 all_swappable_tensors_(candidates->all_swappable_tensors_) {
39 if (!swappable_tensors_.empty()) {
40 current_size_level_ = swappable_tensors_.size() - 1;
41 Next();
42 }
43 }
44
IsEnd()45 bool SwappableTensorCandidates::CandidateIter::IsEnd() {
46 return current_size_level_ == 0 && (current_candidate_idx_ >= swappable_tensors_.at(current_size_level_).size());
47 }
48
Next()49 void SwappableTensorCandidates::CandidateIter::Next() {
50 const auto &next_idx = [&]() {
51 ++current_candidate_idx_;
52 if (current_candidate_idx_ < swappable_tensors_.at(current_size_level_).size()) {
53 return true;
54 }
55 if (current_size_level_ == 0) {
56 return false;
57 }
58 do {
59 --current_size_level_;
60 } while (swappable_tensors_.at(current_size_level_).empty() && current_size_level_ != 0);
61 current_candidate_idx_ = 0;
62 return !(swappable_tensors_.at(current_size_level_).empty());
63 };
64 CandidateItem current_candidate;
65 bool valid_idx = next_idx();
66 while (valid_idx) {
67 current_candidate = swappable_tensors_.at(current_size_level_).at(current_candidate_idx_);
68 if (current_candidate.second == nullptr) {
69 valid_idx = next_idx();
70 continue;
71 }
72 if (current_candidate.first.lock() == nullptr) {
73 (void)all_swappable_tensors_.erase(current_candidate.second);
74 current_candidate.second = nullptr;
75 null_index_.at(current_size_level_).push(current_candidate_idx_);
76 valid_idx = next_idx();
77 continue;
78 }
79 return;
80 }
81 }
82
Get()83 DeviceAddressPtr SwappableTensorCandidates::CandidateIter::Get() {
84 if (IsEnd()) {
85 return nullptr;
86 }
87 return swappable_tensors_.at(current_size_level_).at(current_candidate_idx_).first.lock();
88 }
89
Init(size_t size_level_num)90 void SwappableTensorCandidates::Init(size_t size_level_num) {
91 size_level_num_ = size_level_num;
92 swappable_tensors_.resize(size_level_num);
93 null_index_.resize(size_level_num);
94 }
95
GetSizeLevel(size_t size) const96 size_t SwappableTensorCandidates::GetSizeLevel(size_t size) const {
97 size_t mask = kFirstSizeLevel;
98 for (size_t i = 0; i < size_level_num_; i += 1) {
99 if ((size & mask) == 0) {
100 return i;
101 }
102 mask = mask << 1;
103 }
104 return size_level_num_ == 0 ? 0 : size_level_num_ - 1;
105 }
106
Begin()107 SwappableTensorCandidates::CandidateIter SwappableTensorCandidates::Begin() { return CandidateIter(this); }
108
GetLowerBoundCandidate(size_t size)109 DeviceAddressPtr SwappableTensorCandidates::GetLowerBoundCandidate(size_t size) {
110 const auto size_level = GetSizeLevel(size);
111 auto &candidates = swappable_tensors_[size_level];
112 for (size_t idx = 0; idx < candidates.size(); ++idx) {
113 auto candidate_lock = candidates[idx].first.lock();
114 if (candidate_lock == nullptr) {
115 all_swappable_tensors_.erase(candidates[idx].second);
116 candidates[idx].second = nullptr;
117 null_index_.at(size_level).push(idx);
118 continue;
119 }
120 if (candidate_lock->GetSize() >= size) {
121 return candidate_lock;
122 }
123 }
124 return nullptr;
125 }
126
Add(const DeviceAddressPtr & candidate)127 void SwappableTensorCandidates::Add(const DeviceAddressPtr &candidate) {
128 if (candidate == nullptr) {
129 return;
130 }
131 (void)all_swappable_tensors_.insert(candidate.get());
132 const auto size_level = GetSizeLevel(candidate->GetSize());
133 if (null_index_[size_level].empty()) {
134 (void)swappable_tensors_[size_level].emplace_back(std::make_pair(candidate, candidate.get()));
135 return;
136 }
137 const auto idx = null_index_[size_level].front();
138 null_index_[size_level].pop();
139 swappable_tensors_[size_level][idx] = std::make_pair(candidate, candidate.get());
140 }
141
SwapManager(size_t stream_id,mindspore::device::DynamicMemPoolBestFit * device_memory_pool,PinMemPool * pin_mem_pool)142 SwapManager::SwapManager(size_t stream_id, mindspore::device::DynamicMemPoolBestFit *device_memory_pool,
143 PinMemPool *pin_mem_pool)
144 : stream_id_(stream_id),
145 device_memory_pool_(device_memory_pool),
146 pin_mem_pool_(pin_mem_pool),
147 size_level_num_(kSizeLevelNum) {
148 const auto &offload_context = OffloadContext::GetInstance();
149 io_handle_ = std::make_shared<IOHandle>();
150 if (offload_context != nullptr) {
151 if (offload_context->enable_aio()) {
152 io_handle_->LoadAio(kLinuxAioLibName, kLinuxAioInstanceFuncName);
153 }
154 max_file_size_ = offload_context->offload_disk_size();
155 }
156 candidates_.Init(size_level_num_);
157 (void)FileUtils::CreateNotExistDirs(offload_context->offload_path(), true);
158 }
159
160 template <class Input, class Output>
TryAllocate(std::queue<const DeviceAddress * > queue,const Input & input,uint32_t stream_id,Output (SwapManager::* allocate_func)(const Input &,uint32_t),const std::function<bool (Output)> & success,Output * output)161 bool SwapManager::TryAllocate(std::queue<const DeviceAddress *> queue, const Input &input, uint32_t stream_id,
162 Output (SwapManager::*allocate_func)(const Input &, uint32_t),
163 const std::function<bool(Output)> &success, Output *output) {
164 MS_EXCEPTION_IF_NULL(allocate_func);
165 MS_EXCEPTION_IF_NULL(output);
166 (*output) = (this->*allocate_func)(input, stream_id);
167 if (success(*output)) {
168 return true;
169 }
170 // Wait swapping tensors.
171 while (!queue.empty()) {
172 const auto &front = queue.front();
173 MS_EXCEPTION_IF_NULL(front);
174 if (front->Wait()) {
175 (*output) = (this->*allocate_func)(input, stream_id);
176 if (success(*output)) {
177 return true;
178 }
179 }
180 queue.pop();
181 }
182 return false;
183 }
184
185 template <class Input, class Output>
SwapOutTemp(const std::pair<DeviceAddressStatus,StorageType> & swap_type,size_t total_size,const Input & input,uint32_t stream_id,Output (mindspore::device::SwapManager::* allocate_func)(const Input &,uint32_t),const std::function<bool (Output)> & success,Output * output)186 bool SwapManager::SwapOutTemp(const std::pair<DeviceAddressStatus, StorageType> &swap_type, size_t total_size,
187 const Input &input, uint32_t stream_id,
188 Output (mindspore::device::SwapManager::*allocate_func)(const Input &, uint32_t),
189 const std::function<bool(Output)> &success, Output *output) {
190 MS_EXCEPTION_IF_NULL(allocate_func);
191 MS_EXCEPTION_IF_NULL(output);
192 const auto target_device_address_status = swap_type.first;
193 const auto swap_out_to = swap_type.second;
194 const auto swap_temp_func = [&](const DeviceAddressPtr &candidate) -> bool {
195 if (!candidate->swappable() || candidate->status() != target_device_address_status) {
196 return false;
197 }
198 if (candidate->status() == DeviceAddressStatus::kInDevice && candidate->GetPtr() == nullptr) {
199 return false;
200 }
201 if (!candidate->MoveTo(swap_out_to, false, stream_id)) {
202 return false;
203 }
204 (*output) = (this->*allocate_func)(input, stream_id);
205 return success(*output);
206 };
207 auto lower_bound_candidate = candidates_.GetLowerBoundCandidate(total_size);
208 if (lower_bound_candidate != nullptr && swap_temp_func(lower_bound_candidate)) {
209 return true;
210 }
211 for (auto iter = candidates_.Begin(); !iter.IsEnd(); iter.Next()) {
212 const auto &candidate = iter.Get();
213 if (swap_temp_func(candidate)) {
214 return true;
215 }
216 }
217 return false;
218 }
219
AllocDeviceMemorySimply(const size_t & size,uint32_t stream_id)220 void *SwapManager::AllocDeviceMemorySimply(const size_t &size, uint32_t stream_id) {
221 MS_EXCEPTION_IF_NULL(device_memory_pool_);
222 return device_memory_pool_->AllocTensorMem(size + kSwapMemAlignSize, stream_id);
223 }
224
AllocDeviceMemory(size_t size,uint32_t stream_id)225 void *SwapManager::AllocDeviceMemory(size_t size, uint32_t stream_id) {
226 void *ret = nullptr;
227 void *(SwapManager::*allocate_func)(const size_t &, uint32_t) = &SwapManager::AllocDeviceMemorySimply;
228 std::function<bool(void *)> success = [](void *ptr) { return ptr != nullptr; };
229 std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
230 if (!TryAllocate(swapping_tensors_device_, size, stream_id, allocate_func, success, &ret) &&
231 !SwapOutTemp(std::make_pair(DeviceAddressStatus::kInDevice, StorageType::kHost), size, size, stream_id,
232 allocate_func, success, &ret)) {
233 MS_LOG(WARNING) << "Allocate device memory failed, size: " << size;
234 }
235 return ret;
236 }
237
AllocDeviceContinuousMemSimply(const std::vector<size_t> & size_list,uint32_t stream_id)238 std::vector<void *> SwapManager::AllocDeviceContinuousMemSimply(const std::vector<size_t> &size_list,
239 uint32_t stream_id) {
240 MS_EXCEPTION_IF_NULL(device_memory_pool_);
241 return device_memory_pool_->AllocContinuousTensorMem(size_list, stream_id);
242 }
243
AllocDeviceContinuousMem(const std::vector<size_t> & size_list,uint32_t stream_id)244 std::vector<void *> SwapManager::AllocDeviceContinuousMem(const std::vector<size_t> &size_list, uint32_t stream_id) {
245 std::vector<void *> ret;
246 std::vector<void *> (SwapManager::*allocate_func)(const std::vector<size_t> &, uint32_t) =
247 &SwapManager::AllocDeviceContinuousMemSimply;
248 std::function<bool(std::vector<void *>)> success = [](const std::vector<void *> &ptrs) { return !ptrs.empty(); };
249 std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
250 if (!TryAllocate(swapping_tensors_device_, size_list, stream_id, allocate_func, success, &ret)) {
251 const size_t total_size = std::accumulate(size_list.begin(), size_list.end(), size_t(1), std::multiplies<>());
252 if (!SwapOutTemp(std::make_pair(DeviceAddressStatus::kInDevice, StorageType::kHost), total_size, size_list,
253 stream_id, allocate_func, success, &ret)) {
254 MS_LOG(WARNING) << "Allocate continuous device mem failed, size list: " << size_list;
255 }
256 }
257 return ret;
258 }
259
FreeDeviceMemory(void * ptr)260 void SwapManager::FreeDeviceMemory(void *ptr) {
261 MS_EXCEPTION_IF_NULL(device_memory_pool_);
262 device_memory_pool_->FreeTensorMem(ptr);
263 }
264
AllocHostMemorySimply(const size_t & size,uint32_t)265 void *SwapManager::AllocHostMemorySimply(const size_t &size, uint32_t /*stream_id*/) {
266 MS_EXCEPTION_IF_NULL(pin_mem_pool_);
267 return pin_mem_pool_->AllocPinMem(size);
268 }
269
AllocHostMemory(size_t size)270 void *SwapManager::AllocHostMemory(size_t size) {
271 void *ret = nullptr;
272 void *(SwapManager::*allocate_func)(const size_t &, uint32_t) = &SwapManager::AllocHostMemorySimply;
273 std::function<bool(void *)> success = [](void *ptr) { return ptr != nullptr; };
274 std::lock_guard<std::mutex> lock(swapping_tensors_host_mutex_);
275 if (!TryAllocate(swapping_tensors_host_, size, kDefaultStreamIndex, allocate_func, success, &ret) &&
276 !SwapOutTemp(std::make_pair(DeviceAddressStatus::kInHost, StorageType::kFile), size, size, kDefaultStreamIndex,
277 allocate_func, success, &ret)) {
278 MS_LOG(WARNING) << "Allocate host memory failed, size: " << size;
279 }
280 return ret;
281 }
282
FreeHostMemory(void * ptr)283 void SwapManager::FreeHostMemory(void *ptr) {
284 MS_EXCEPTION_IF_NULL(pin_mem_pool_);
285 pin_mem_pool_->FreeTensorMem(ptr);
286 }
287
CreateFile(const std::string & file_name,size_t file_size)288 bool SwapManager::CreateFile(const std::string &file_name, size_t file_size) {
289 MS_EXCEPTION_IF_NULL(io_handle_);
290 bool (SwapManager::*allocate_func)(const size_t &size, uint32_t) = &SwapManager::EnoughFileSpace;
291 std::function<bool(bool)> success = [](bool ret) { return ret; };
292 {
293 std::lock_guard<std::mutex> lock(swapping_tensors_file_mutex_);
294 bool enough = false;
295 if (!TryAllocate(swapping_tensors_file_, file_size, kDefaultStreamIndex, allocate_func, success, &enough)) {
296 MS_LOG(WARNING) << "There is no enough disk space for creating file, size: " << file_size;
297 return false;
298 }
299 }
300 current_used_file_size_ += file_size;
301 file_size_[file_name] = file_size;
302 TempFileManager::GetInstance().Register(file_name);
303 return io_handle_->CreateSwapFile(file_name);
304 }
305
DeleteFile(const std::string & file_name)306 bool SwapManager::DeleteFile(const std::string &file_name) {
307 MS_EXCEPTION_IF_NULL(io_handle_);
308 const auto &iter = file_size_.find(file_name);
309 if (iter == file_size_.end()) {
310 MS_LOG(WARNING) << "Can not file size for file[" << file_name << "]";
311 } else {
312 current_used_file_size_ -= iter->second;
313 iter->second = 0;
314 }
315 TempFileManager::GetInstance().UnRegister(file_name);
316 return io_handle_->DeleteSwapFile(file_name);
317 }
318
FileToHostMemory(void * host_memory,const std::string & file_name,size_t byte_num,bool async,AsyncIOToken * sync_key)319 bool SwapManager::FileToHostMemory(void *host_memory, const std::string &file_name, size_t byte_num, bool async,
320 AsyncIOToken *sync_key) {
321 MS_EXCEPTION_IF_NULL(io_handle_);
322 if (async) {
323 return io_handle_->ReadAsync(file_name, host_memory, byte_num, sync_key);
324 } else {
325 return io_handle_->Read(file_name, host_memory, byte_num);
326 }
327 }
328
EnoughFileSpace(const size_t & size,uint32_t)329 bool SwapManager::EnoughFileSpace(const size_t &size, uint32_t /*stream_id*/) {
330 return current_used_file_size_ + size <= max_file_size_;
331 }
332
HostMemoryToFile(const std::string & file_name,const void * data,size_t byte_num,bool async,AsyncIOToken * sync_key)333 bool SwapManager::HostMemoryToFile(const std::string &file_name, const void *data, size_t byte_num, bool async,
334 AsyncIOToken *sync_key) {
335 MS_EXCEPTION_IF_NULL(io_handle_);
336 if (async) {
337 return io_handle_->WriteAsync(file_name, data, byte_num, sync_key);
338 } else {
339 return io_handle_->Write(file_name, data, byte_num);
340 }
341 }
342
WaitAsyncIO(mindspore::device::AsyncIOToken sync_token)343 bool SwapManager::WaitAsyncIO(mindspore::device::AsyncIOToken sync_token) {
344 MS_EXCEPTION_IF_NULL(io_handle_);
345 return io_handle_->Wait(sync_token);
346 }
347
AddSwappableTensor(const DeviceAddressPtr & device_address)348 void SwapManager::AddSwappableTensor(const DeviceAddressPtr &device_address) {
349 candidates_.Add(device_address);
350 device_address->set_swappable(true);
351 }
352
AddSwappingTensor(const mindspore::device::DeviceAddress * device_address)353 void SwapManager::AddSwappingTensor(const mindspore::device::DeviceAddress *device_address) {
354 if (device_address == nullptr) {
355 return;
356 }
357 if (device_address->status() == DeviceAddressStatus::kInFileToHost) {
358 std::lock_guard<std::mutex> lock(swapping_tensors_file_mutex_);
359 (void)swapping_tensors_file_.push(device_address);
360 } else if (device_address->status() == DeviceAddressStatus::kInDeviceToHost) {
361 std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
362 (void)swapping_tensors_device_.push(device_address);
363 } else {
364 std::lock_guard<std::mutex> lock(swapping_tensors_host_mutex_);
365 (void)swapping_tensors_host_.push(device_address);
366 }
367 }
368
SetSwappableBeforeMemAllocate(const std::vector<DeviceAddress * > & inputs,const std::vector<DeviceAddress * > & outputs) const369 void SwapManager::SetSwappableBeforeMemAllocate(const std::vector<DeviceAddress *> &inputs,
370 const std::vector<DeviceAddress *> &outputs) const {
371 for (const auto &device_address : inputs) {
372 if (device_address == nullptr) {
373 continue;
374 }
375 device_address->set_swappable(false);
376 }
377 for (const auto &device_address : outputs) {
378 if (device_address == nullptr) {
379 continue;
380 }
381 device_address->set_swappable(false);
382 }
383 }
384
SetSwappableBeforeMemFree(const std::vector<DeviceAddress * > & inputs,const std::vector<DeviceAddress * > & outputs,const mindspore::device::KernelInfo * kernel_info) const385 void SwapManager::SetSwappableBeforeMemFree(const std::vector<DeviceAddress *> &inputs,
386 const std::vector<DeviceAddress *> &outputs,
387 const mindspore::device::KernelInfo *kernel_info) const {
388 for (const auto &device_address : inputs) {
389 if (device_address == nullptr) {
390 continue;
391 }
392 device_address->set_swappable(true);
393 }
394 for (const auto &device_address : outputs) {
395 if (device_address == nullptr) {
396 continue;
397 }
398 device_address->set_swappable(true);
399 }
400 for (const auto &out_in : kernel_info->out_in_ref_map()) {
401 if (inputs[out_in.second] != outputs[out_in.first]) {
402 outputs[out_in.first]->set_swappable(false);
403 }
404 }
405 }
406 } // namespace device
407 } // namespace mindspore
408