• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/device/gsm/swap_manager.h"
18 
19 #include <functional>
20 #include <string>
21 #include <utility>
22 
23 #include "include/common/utils/offload_context.h"
24 #include "utils/file_utils.h"
25 #include "utils/temp_file_manager.h"
26 
27 namespace mindspore {
28 namespace device {
29 constexpr char kLinuxAioLibName[] = "libaio_plugin.so";
30 constexpr char kLinuxAioInstanceFuncName[] = "get_aio_instance";
31 constexpr size_t kFirstSizeLevel = 0xFFFFFFFFFFFFFFFF << 24;  // 16M
32 constexpr size_t kSizeLevelNum = 8;
33 constexpr size_t kSwapMemAlignSize = 512;
34 
CandidateIter(mindspore::device::SwappableTensorCandidates * candidates)35 SwappableTensorCandidates::CandidateIter::CandidateIter(mindspore::device::SwappableTensorCandidates *candidates)
36     : swappable_tensors_(candidates->swappable_tensors_),
37       null_index_(candidates->null_index_),
38       all_swappable_tensors_(candidates->all_swappable_tensors_) {
39   if (!swappable_tensors_.empty()) {
40     current_size_level_ = swappable_tensors_.size() - 1;
41     Next();
42   }
43 }
44 
IsEnd()45 bool SwappableTensorCandidates::CandidateIter::IsEnd() {
46   return current_size_level_ == 0 && (current_candidate_idx_ >= swappable_tensors_.at(current_size_level_).size());
47 }
48 
Next()49 void SwappableTensorCandidates::CandidateIter::Next() {
50   const auto &next_idx = [&]() {
51     ++current_candidate_idx_;
52     if (current_candidate_idx_ < swappable_tensors_.at(current_size_level_).size()) {
53       return true;
54     }
55     if (current_size_level_ == 0) {
56       return false;
57     }
58     do {
59       --current_size_level_;
60     } while (swappable_tensors_.at(current_size_level_).empty() && current_size_level_ != 0);
61     current_candidate_idx_ = 0;
62     return !(swappable_tensors_.at(current_size_level_).empty());
63   };
64   CandidateItem current_candidate;
65   bool valid_idx = next_idx();
66   while (valid_idx) {
67     current_candidate = swappable_tensors_.at(current_size_level_).at(current_candidate_idx_);
68     if (current_candidate.second == nullptr) {
69       valid_idx = next_idx();
70       continue;
71     }
72     if (current_candidate.first.lock() == nullptr) {
73       (void)all_swappable_tensors_.erase(current_candidate.second);
74       current_candidate.second = nullptr;
75       null_index_.at(current_size_level_).push(current_candidate_idx_);
76       valid_idx = next_idx();
77       continue;
78     }
79     return;
80   }
81 }
82 
Get()83 DeviceAddressPtr SwappableTensorCandidates::CandidateIter::Get() {
84   if (IsEnd()) {
85     return nullptr;
86   }
87   return swappable_tensors_.at(current_size_level_).at(current_candidate_idx_).first.lock();
88 }
89 
Init(size_t size_level_num)90 void SwappableTensorCandidates::Init(size_t size_level_num) {
91   size_level_num_ = size_level_num;
92   swappable_tensors_.resize(size_level_num);
93   null_index_.resize(size_level_num);
94 }
95 
GetSizeLevel(size_t size) const96 size_t SwappableTensorCandidates::GetSizeLevel(size_t size) const {
97   size_t mask = kFirstSizeLevel;
98   for (size_t i = 0; i < size_level_num_; i += 1) {
99     if ((size & mask) == 0) {
100       return i;
101     }
102     mask = mask << 1;
103   }
104   return size_level_num_ == 0 ? 0 : size_level_num_ - 1;
105 }
106 
Begin()107 SwappableTensorCandidates::CandidateIter SwappableTensorCandidates::Begin() { return CandidateIter(this); }
108 
GetLowerBoundCandidate(size_t size)109 DeviceAddressPtr SwappableTensorCandidates::GetLowerBoundCandidate(size_t size) {
110   const auto size_level = GetSizeLevel(size);
111   auto &candidates = swappable_tensors_[size_level];
112   for (size_t idx = 0; idx < candidates.size(); ++idx) {
113     auto candidate_lock = candidates[idx].first.lock();
114     if (candidate_lock == nullptr) {
115       all_swappable_tensors_.erase(candidates[idx].second);
116       candidates[idx].second = nullptr;
117       null_index_.at(size_level).push(idx);
118       continue;
119     }
120     if (candidate_lock->GetSize() >= size) {
121       return candidate_lock;
122     }
123   }
124   return nullptr;
125 }
126 
Add(const DeviceAddressPtr & candidate)127 void SwappableTensorCandidates::Add(const DeviceAddressPtr &candidate) {
128   if (candidate == nullptr) {
129     return;
130   }
131   (void)all_swappable_tensors_.insert(candidate.get());
132   const auto size_level = GetSizeLevel(candidate->GetSize());
133   if (null_index_[size_level].empty()) {
134     (void)swappable_tensors_[size_level].emplace_back(std::make_pair(candidate, candidate.get()));
135     return;
136   }
137   const auto idx = null_index_[size_level].front();
138   null_index_[size_level].pop();
139   swappable_tensors_[size_level][idx] = std::make_pair(candidate, candidate.get());
140 }
141 
SwapManager(size_t stream_id,mindspore::device::DynamicMemPoolBestFit * device_memory_pool,PinMemPool * pin_mem_pool)142 SwapManager::SwapManager(size_t stream_id, mindspore::device::DynamicMemPoolBestFit *device_memory_pool,
143                          PinMemPool *pin_mem_pool)
144     : stream_id_(stream_id),
145       device_memory_pool_(device_memory_pool),
146       pin_mem_pool_(pin_mem_pool),
147       size_level_num_(kSizeLevelNum) {
148   const auto &offload_context = OffloadContext::GetInstance();
149   io_handle_ = std::make_shared<IOHandle>();
150   if (offload_context != nullptr) {
151     if (offload_context->enable_aio()) {
152       io_handle_->LoadAio(kLinuxAioLibName, kLinuxAioInstanceFuncName);
153     }
154     max_file_size_ = offload_context->offload_disk_size();
155   }
156   candidates_.Init(size_level_num_);
157   (void)FileUtils::CreateNotExistDirs(offload_context->offload_path(), true);
158 }
159 
160 template <class Input, class Output>
TryAllocate(std::queue<const DeviceAddress * > queue,const Input & input,uint32_t stream_id,Output (SwapManager::* allocate_func)(const Input &,uint32_t),const std::function<bool (Output)> & success,Output * output)161 bool SwapManager::TryAllocate(std::queue<const DeviceAddress *> queue, const Input &input, uint32_t stream_id,
162                               Output (SwapManager::*allocate_func)(const Input &, uint32_t),
163                               const std::function<bool(Output)> &success, Output *output) {
164   MS_EXCEPTION_IF_NULL(allocate_func);
165   MS_EXCEPTION_IF_NULL(output);
166   (*output) = (this->*allocate_func)(input, stream_id);
167   if (success(*output)) {
168     return true;
169   }
170   // Wait swapping tensors.
171   while (!queue.empty()) {
172     const auto &front = queue.front();
173     MS_EXCEPTION_IF_NULL(front);
174     if (front->Wait()) {
175       (*output) = (this->*allocate_func)(input, stream_id);
176       if (success(*output)) {
177         return true;
178       }
179     }
180     queue.pop();
181   }
182   return false;
183 }
184 
185 template <class Input, class Output>
SwapOutTemp(const std::pair<DeviceAddressStatus,StorageType> & swap_type,size_t total_size,const Input & input,uint32_t stream_id,Output (mindspore::device::SwapManager::* allocate_func)(const Input &,uint32_t),const std::function<bool (Output)> & success,Output * output)186 bool SwapManager::SwapOutTemp(const std::pair<DeviceAddressStatus, StorageType> &swap_type, size_t total_size,
187                               const Input &input, uint32_t stream_id,
188                               Output (mindspore::device::SwapManager::*allocate_func)(const Input &, uint32_t),
189                               const std::function<bool(Output)> &success, Output *output) {
190   MS_EXCEPTION_IF_NULL(allocate_func);
191   MS_EXCEPTION_IF_NULL(output);
192   const auto target_device_address_status = swap_type.first;
193   const auto swap_out_to = swap_type.second;
194   const auto swap_temp_func = [&](const DeviceAddressPtr &candidate) -> bool {
195     if (!candidate->swappable() || candidate->status() != target_device_address_status) {
196       return false;
197     }
198     if (candidate->status() == DeviceAddressStatus::kInDevice && candidate->GetPtr() == nullptr) {
199       return false;
200     }
201     if (!candidate->MoveTo(swap_out_to, false, stream_id)) {
202       return false;
203     }
204     (*output) = (this->*allocate_func)(input, stream_id);
205     return success(*output);
206   };
207   auto lower_bound_candidate = candidates_.GetLowerBoundCandidate(total_size);
208   if (lower_bound_candidate != nullptr && swap_temp_func(lower_bound_candidate)) {
209     return true;
210   }
211   for (auto iter = candidates_.Begin(); !iter.IsEnd(); iter.Next()) {
212     const auto &candidate = iter.Get();
213     if (swap_temp_func(candidate)) {
214       return true;
215     }
216   }
217   return false;
218 }
219 
AllocDeviceMemorySimply(const size_t & size,uint32_t stream_id)220 void *SwapManager::AllocDeviceMemorySimply(const size_t &size, uint32_t stream_id) {
221   MS_EXCEPTION_IF_NULL(device_memory_pool_);
222   return device_memory_pool_->AllocTensorMem(size + kSwapMemAlignSize, stream_id);
223 }
224 
AllocDeviceMemory(size_t size,uint32_t stream_id)225 void *SwapManager::AllocDeviceMemory(size_t size, uint32_t stream_id) {
226   void *ret = nullptr;
227   void *(SwapManager::*allocate_func)(const size_t &, uint32_t) = &SwapManager::AllocDeviceMemorySimply;
228   std::function<bool(void *)> success = [](void *ptr) { return ptr != nullptr; };
229   std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
230   if (!TryAllocate(swapping_tensors_device_, size, stream_id, allocate_func, success, &ret) &&
231       !SwapOutTemp(std::make_pair(DeviceAddressStatus::kInDevice, StorageType::kHost), size, size, stream_id,
232                    allocate_func, success, &ret)) {
233     MS_LOG(WARNING) << "Allocate device memory failed, size: " << size;
234   }
235   return ret;
236 }
237 
AllocDeviceContinuousMemSimply(const std::vector<size_t> & size_list,uint32_t stream_id)238 std::vector<void *> SwapManager::AllocDeviceContinuousMemSimply(const std::vector<size_t> &size_list,
239                                                                 uint32_t stream_id) {
240   MS_EXCEPTION_IF_NULL(device_memory_pool_);
241   return device_memory_pool_->AllocContinuousTensorMem(size_list, stream_id);
242 }
243 
AllocDeviceContinuousMem(const std::vector<size_t> & size_list,uint32_t stream_id)244 std::vector<void *> SwapManager::AllocDeviceContinuousMem(const std::vector<size_t> &size_list, uint32_t stream_id) {
245   std::vector<void *> ret;
246   std::vector<void *> (SwapManager::*allocate_func)(const std::vector<size_t> &, uint32_t) =
247     &SwapManager::AllocDeviceContinuousMemSimply;
248   std::function<bool(std::vector<void *>)> success = [](const std::vector<void *> &ptrs) { return !ptrs.empty(); };
249   std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
250   if (!TryAllocate(swapping_tensors_device_, size_list, stream_id, allocate_func, success, &ret)) {
251     const size_t total_size = std::accumulate(size_list.begin(), size_list.end(), size_t(1), std::multiplies<>());
252     if (!SwapOutTemp(std::make_pair(DeviceAddressStatus::kInDevice, StorageType::kHost), total_size, size_list,
253                      stream_id, allocate_func, success, &ret)) {
254       MS_LOG(WARNING) << "Allocate continuous device mem failed, size list: " << size_list;
255     }
256   }
257   return ret;
258 }
259 
FreeDeviceMemory(void * ptr)260 void SwapManager::FreeDeviceMemory(void *ptr) {
261   MS_EXCEPTION_IF_NULL(device_memory_pool_);
262   device_memory_pool_->FreeTensorMem(ptr);
263 }
264 
AllocHostMemorySimply(const size_t & size,uint32_t)265 void *SwapManager::AllocHostMemorySimply(const size_t &size, uint32_t /*stream_id*/) {
266   MS_EXCEPTION_IF_NULL(pin_mem_pool_);
267   return pin_mem_pool_->AllocPinMem(size);
268 }
269 
AllocHostMemory(size_t size)270 void *SwapManager::AllocHostMemory(size_t size) {
271   void *ret = nullptr;
272   void *(SwapManager::*allocate_func)(const size_t &, uint32_t) = &SwapManager::AllocHostMemorySimply;
273   std::function<bool(void *)> success = [](void *ptr) { return ptr != nullptr; };
274   std::lock_guard<std::mutex> lock(swapping_tensors_host_mutex_);
275   if (!TryAllocate(swapping_tensors_host_, size, kDefaultStreamIndex, allocate_func, success, &ret) &&
276       !SwapOutTemp(std::make_pair(DeviceAddressStatus::kInHost, StorageType::kFile), size, size, kDefaultStreamIndex,
277                    allocate_func, success, &ret)) {
278     MS_LOG(WARNING) << "Allocate host memory failed, size: " << size;
279   }
280   return ret;
281 }
282 
FreeHostMemory(void * ptr)283 void SwapManager::FreeHostMemory(void *ptr) {
284   MS_EXCEPTION_IF_NULL(pin_mem_pool_);
285   pin_mem_pool_->FreeTensorMem(ptr);
286 }
287 
CreateFile(const std::string & file_name,size_t file_size)288 bool SwapManager::CreateFile(const std::string &file_name, size_t file_size) {
289   MS_EXCEPTION_IF_NULL(io_handle_);
290   bool (SwapManager::*allocate_func)(const size_t &size, uint32_t) = &SwapManager::EnoughFileSpace;
291   std::function<bool(bool)> success = [](bool ret) { return ret; };
292   {
293     std::lock_guard<std::mutex> lock(swapping_tensors_file_mutex_);
294     bool enough = false;
295     if (!TryAllocate(swapping_tensors_file_, file_size, kDefaultStreamIndex, allocate_func, success, &enough)) {
296       MS_LOG(WARNING) << "There is no enough disk space for creating file, size: " << file_size;
297       return false;
298     }
299   }
300   current_used_file_size_ += file_size;
301   file_size_[file_name] = file_size;
302   TempFileManager::GetInstance().Register(file_name);
303   return io_handle_->CreateSwapFile(file_name);
304 }
305 
DeleteFile(const std::string & file_name)306 bool SwapManager::DeleteFile(const std::string &file_name) {
307   MS_EXCEPTION_IF_NULL(io_handle_);
308   const auto &iter = file_size_.find(file_name);
309   if (iter == file_size_.end()) {
310     MS_LOG(WARNING) << "Can not file size for file[" << file_name << "]";
311   } else {
312     current_used_file_size_ -= iter->second;
313     iter->second = 0;
314   }
315   TempFileManager::GetInstance().UnRegister(file_name);
316   return io_handle_->DeleteSwapFile(file_name);
317 }
318 
FileToHostMemory(void * host_memory,const std::string & file_name,size_t byte_num,bool async,AsyncIOToken * sync_key)319 bool SwapManager::FileToHostMemory(void *host_memory, const std::string &file_name, size_t byte_num, bool async,
320                                    AsyncIOToken *sync_key) {
321   MS_EXCEPTION_IF_NULL(io_handle_);
322   if (async) {
323     return io_handle_->ReadAsync(file_name, host_memory, byte_num, sync_key);
324   } else {
325     return io_handle_->Read(file_name, host_memory, byte_num);
326   }
327 }
328 
EnoughFileSpace(const size_t & size,uint32_t)329 bool SwapManager::EnoughFileSpace(const size_t &size, uint32_t /*stream_id*/) {
330   return current_used_file_size_ + size <= max_file_size_;
331 }
332 
HostMemoryToFile(const std::string & file_name,const void * data,size_t byte_num,bool async,AsyncIOToken * sync_key)333 bool SwapManager::HostMemoryToFile(const std::string &file_name, const void *data, size_t byte_num, bool async,
334                                    AsyncIOToken *sync_key) {
335   MS_EXCEPTION_IF_NULL(io_handle_);
336   if (async) {
337     return io_handle_->WriteAsync(file_name, data, byte_num, sync_key);
338   } else {
339     return io_handle_->Write(file_name, data, byte_num);
340   }
341 }
342 
WaitAsyncIO(mindspore::device::AsyncIOToken sync_token)343 bool SwapManager::WaitAsyncIO(mindspore::device::AsyncIOToken sync_token) {
344   MS_EXCEPTION_IF_NULL(io_handle_);
345   return io_handle_->Wait(sync_token);
346 }
347 
AddSwappableTensor(const DeviceAddressPtr & device_address)348 void SwapManager::AddSwappableTensor(const DeviceAddressPtr &device_address) {
349   candidates_.Add(device_address);
350   device_address->set_swappable(true);
351 }
352 
AddSwappingTensor(const mindspore::device::DeviceAddress * device_address)353 void SwapManager::AddSwappingTensor(const mindspore::device::DeviceAddress *device_address) {
354   if (device_address == nullptr) {
355     return;
356   }
357   if (device_address->status() == DeviceAddressStatus::kInFileToHost) {
358     std::lock_guard<std::mutex> lock(swapping_tensors_file_mutex_);
359     (void)swapping_tensors_file_.push(device_address);
360   } else if (device_address->status() == DeviceAddressStatus::kInDeviceToHost) {
361     std::lock_guard<std::mutex> lock(swapping_tensors_device_mutex_);
362     (void)swapping_tensors_device_.push(device_address);
363   } else {
364     std::lock_guard<std::mutex> lock(swapping_tensors_host_mutex_);
365     (void)swapping_tensors_host_.push(device_address);
366   }
367 }
368 
SetSwappableBeforeMemAllocate(const std::vector<DeviceAddress * > & inputs,const std::vector<DeviceAddress * > & outputs) const369 void SwapManager::SetSwappableBeforeMemAllocate(const std::vector<DeviceAddress *> &inputs,
370                                                 const std::vector<DeviceAddress *> &outputs) const {
371   for (const auto &device_address : inputs) {
372     if (device_address == nullptr) {
373       continue;
374     }
375     device_address->set_swappable(false);
376   }
377   for (const auto &device_address : outputs) {
378     if (device_address == nullptr) {
379       continue;
380     }
381     device_address->set_swappable(false);
382   }
383 }
384 
SetSwappableBeforeMemFree(const std::vector<DeviceAddress * > & inputs,const std::vector<DeviceAddress * > & outputs,const mindspore::device::KernelInfo * kernel_info) const385 void SwapManager::SetSwappableBeforeMemFree(const std::vector<DeviceAddress *> &inputs,
386                                             const std::vector<DeviceAddress *> &outputs,
387                                             const mindspore::device::KernelInfo *kernel_info) const {
388   for (const auto &device_address : inputs) {
389     if (device_address == nullptr) {
390       continue;
391     }
392     device_address->set_swappable(true);
393   }
394   for (const auto &device_address : outputs) {
395     if (device_address == nullptr) {
396       continue;
397     }
398     device_address->set_swappable(true);
399   }
400   for (const auto &out_in : kernel_info->out_in_ref_map()) {
401     if (inputs[out_in.second] != outputs[out_in.first]) {
402       outputs[out_in.first]->set_swappable(false);
403     }
404   }
405 }
406 }  // namespace device
407 }  // namespace mindspore
408