1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_DEBUG_TENSOR_LOAD_H_ 17 #define MINDSPORE_CCSRC_DEBUG_TENSOR_LOAD_H_ 18 19 #include <memory> 20 #include <vector> 21 #include <map> 22 #include <mutex> 23 #include <tuple> 24 #include <string> 25 #include <utility> 26 #include <deque> 27 #include <algorithm> 28 #include "debug/tensor_data.h" 29 #ifdef ONLINE_DBG_MODE 30 #include "debug/data_dump/dump_json_parser.h" 31 namespace mindspore { 32 #endif 33 class TensorLoader { 34 public: TensorLoader()35 TensorLoader() : iter_num_(-1), mem_total_(0), mem_usage_(0) {} 36 ~TensorLoader()37 ~TensorLoader() { EmptyTensor(); } 38 MoveTensorCurrentToPrev(std::string tensor_name)39 void MoveTensorCurrentToPrev(std::string tensor_name) { 40 auto handle = tensor_list_map_.extract(tensor_name); 41 if (!handle.empty()) { 42 MS_LOG(INFO) << "Moving " << tensor_name << " from current map to previous map"; 43 prev_tensor_list_map_.insert(std::move(handle)); 44 } 45 } 46 SwapCurrentPrev()47 void SwapCurrentPrev() { tensor_list_map_.swap(prev_tensor_list_map_); } 48 TensorExistsInCurrent(std::string tensor_name)49 bool TensorExistsInCurrent(std::string tensor_name) const { 50 return tensor_list_map_.find(tensor_name) != tensor_list_map_.end(); 51 } 52 53 // only parameters will return true PrevTensorExistsInCurrent(std::string tensor_name)54 bool PrevTensorExistsInCurrent(std::string tensor_name) const { return TensorExistsInCurrent(tensor_name + ":prev"); } 55 MoveParametersCurrentToPrev()56 void MoveParametersCurrentToPrev() { 57 MS_LOG(INFO) << "Moving parameters from current map to previous map"; 58 auto iter = tensor_list_map_.begin(); 59 while (iter != tensor_list_map_.end()) { 60 auto key = iter->first; 61 if (PrevTensorExistsInCurrent(key)) { 62 // :prev tensor only exists for parameter. Move it to prev 63 ++iter; 64 MoveTensorCurrentToPrev(key); 65 } else { 66 ++iter; 67 } 68 } 69 } 70 IsPrevTensor(std::string tensor_name)71 bool IsPrevTensor(std::string tensor_name) const { 72 const std::string suffix = ":prev"; 73 if (tensor_name.length() <= suffix.length()) return false; 74 return std::equal(suffix.rbegin(), suffix.rend(), tensor_name.rbegin()); 75 } 76 LoadNewTensor(std::shared_ptr<TensorData> tensor,bool keep_prev)77 bool LoadNewTensor(std::shared_ptr<TensorData> tensor, bool keep_prev) { 78 lock_.lock(); 79 auto tensor_name = tensor->GetName(); 80 if (keep_prev) { 81 // add prev step tensor into current step map with ":prev" suffix 82 auto handle = prev_tensor_list_map_.extract(tensor_name); 83 if (!handle.empty()) { 84 handle.key() = tensor_name + ":prev"; 85 tensor_list_map_.insert(std::move(handle)); 86 } 87 } 88 std::string key_name = tensor_name; 89 #ifdef OFFLINE_DBG_MODE 90 key_name += (":" + std::to_string(tensor->GetDeviceId()) + ":" + std::to_string(tensor->GetRootGraphId()) + ":" + 91 std::to_string(tensor->GetIsOutput()) + ":" + std::to_string(tensor->GetSlot())); 92 if (tensor_list_map_.find(key_name) != tensor_list_map_.end() && 93 tensor->GetIteration() == tensor_list_map_[key_name]->GetIteration() - 1) { 94 key_name += ":prev"; 95 } 96 auto iter = tensor_list_map_.find(key_name); 97 if (iter != tensor_list_map_.end()) { 98 iter->second->DeleteDataPtr(); 99 } 100 #endif 101 tensor_list_map_[key_name] = tensor; // use [] instead of insert to ensure latest value 102 lock_.unlock(); 103 return true; 104 } 105 GetTensor()106 std::vector<std::shared_ptr<TensorData>> GetTensor() { 107 std::vector<std::shared_ptr<TensorData>> tensor_list; 108 for (auto &it : tensor_list_map_) { 109 if (!IsPrevTensor(it.first)) tensor_list.push_back(it.second); 110 } 111 return tensor_list; 112 } 113 GetTensor(const std::string & tensor_name)114 std::shared_ptr<TensorData> GetTensor(const std::string &tensor_name) const { 115 auto iter = tensor_list_map_.find(tensor_name); 116 if (iter != tensor_list_map_.end()) return iter->second; 117 return nullptr; 118 } 119 GetPrevTensor(const std::string & tensor_name)120 std::shared_ptr<TensorData> GetPrevTensor(const std::string &tensor_name) { 121 if (tensor_list_map_.find(tensor_name + ":prev") != tensor_list_map_.end()) { 122 return tensor_list_map_[tensor_name + ":prev"]; 123 } 124 return nullptr; 125 } 126 SearchTensors(const std::vector<std::string> & search_list,std::vector<std::tuple<std::string,std::shared_ptr<TensorData>>> * result_list)127 void SearchTensors(const std::vector<std::string> &search_list, 128 std::vector<std::tuple<std::string, std::shared_ptr<TensorData>>> *result_list) { 129 for (auto i : search_list) { 130 std::map<std::string, std::shared_ptr<TensorData>>::iterator iter; 131 iter = tensor_list_map_.find(i); 132 if (iter != tensor_list_map_.end()) { 133 result_list->push_back(std::make_tuple(i, iter->second)); 134 } else { 135 result_list->push_back(std::make_tuple(i, nullptr)); 136 } 137 } 138 } 139 EmptyTensor()140 void EmptyTensor() { 141 std::lock_guard<std::mutex> lg(lock_); 142 prev_tensor_list_map_.clear(); 143 tensor_list_map_.swap(prev_tensor_list_map_); 144 } 145 EmptyCurrentTensor()146 void EmptyCurrentTensor() { tensor_list_map_.clear(); } 147 EnableMemoryControl()148 bool EnableMemoryControl() { return mem_total_ > 0; } 149 AppendToCacheEvictQueue(const std::string & tensor_name)150 void AppendToCacheEvictQueue(const std::string &tensor_name) { 151 std::lock_guard<std::mutex> lk(mem_lock_); 152 if (std::find(cache_evict_queue_.begin(), cache_evict_queue_.end(), tensor_name) == cache_evict_queue_.end()) { 153 cache_evict_queue_.push_back(tensor_name); 154 evict_cond.notify_one(); 155 } 156 } 157 CheckMemoryAvailable(const std::string & backend_name,const uint64_t data_size)158 bool CheckMemoryAvailable(const std::string &backend_name, const uint64_t data_size) { 159 // 1. Check if the tensor can fit in the entire limit. If not, don't attempt any read or evictions and generate 160 // warning. 161 if (data_size > mem_total_) { 162 MS_LOG(ERROR) << "Failed to load data of tensor " << backend_name << " because the its data size (" << data_size 163 << ") exceeds the maximum memory limit (" << mem_total_ << ")."; 164 return false; 165 } 166 // 2. Check if there's is enough cache space available for current tensor. If not, try evict cache. 167 bool ret = CheckAndEvictTensorCache(data_size); 168 return ret; 169 } 170 CheckAndEvictTensorCache(const uint64_t data_size)171 bool CheckAndEvictTensorCache(const uint64_t data_size) { 172 std::string candidate_name; 173 uint64_t candidates_size; 174 std::unique_lock<std::mutex> lk(mem_lock_); 175 while (data_size > mem_total_ - mem_usage_) { 176 // wait until there is any not-in-use candidate to be evicted from cache 177 evict_cond.wait(lk, [&] { return !cache_evict_queue_.empty(); }); 178 candidate_name = cache_evict_queue_.front(); 179 candidates_size = tensor_list_map_[candidate_name]->GetByteSize(); 180 // evict candidate tensor 181 lock_.lock(); 182 tensor_list_map_[candidate_name]->DeleteDataPtr(); 183 tensor_list_map_.erase(candidate_name); 184 lock_.unlock(); 185 cache_evict_queue_.pop_front(); 186 mem_usage_ = std::max(uint64_t(0), mem_usage_ - candidates_size); 187 MS_LOG(INFO) << "Evict tensor: " << candidate_name; 188 } 189 // Reserve space for the current target tensor. 190 mem_usage_ = std::min(mem_total_, mem_usage_ + data_size); 191 return true; 192 } 193 SetMemTotal(uint64_t total_mem_size)194 void SetMemTotal(uint64_t total_mem_size) { this->mem_total_ = total_mem_size; } 195 196 #ifdef ONLINE_DBG_MODE DumpTensorToFile(const std::string & tensor_name,bool trans_flag,const std::string & filepath,const std::string & host_fmt,const std::vector<int64_t> & host_shape,TypeId host_type,TypeId device_type,const std::string & addr_format,size_t slot)197 bool DumpTensorToFile(const std::string &tensor_name, bool trans_flag, const std::string &filepath, 198 const std::string &host_fmt, const std::vector<int64_t> &host_shape, TypeId host_type, 199 TypeId device_type, const std::string &addr_format, size_t slot) { 200 if (filepath.empty()) { 201 MS_LOG(ERROR) << "Dump file path is null!"; 202 return false; 203 } 204 std::string path = ""; 205 if (trans_flag) { 206 path = filepath + '.' + host_fmt; 207 } else { 208 path = filepath + '.' + addr_format; 209 } 210 211 MS_LOG(INFO) << "Dump path is " << path; 212 213 std::string tensor_loader_name = tensor_name + ":" + std::to_string(slot); 214 auto iter = tensor_list_map_.find(tensor_loader_name); 215 if (iter != tensor_list_map_.end()) { 216 std::shared_ptr<TensorData> node = iter->second; 217 size_t host_size = node->GetByteSize(); 218 219 return DumpJsonParser::DumpToFile(path, node->GetDataPtr(), host_size, host_shape, host_type); 220 } 221 MS_LOG(INFO) << "Tensor name:" << tensor_name << " not found in tensor_list_map_"; 222 return true; 223 } 224 #endif 225 226 private: 227 // the pair is (device_id, iteration) 228 std::map<std::string, std::shared_ptr<TensorData>> tensor_list_map_; 229 std::map<std::string, std::shared_ptr<TensorData>> prev_tensor_list_map_; 230 uint32_t iter_num_; 231 std::mutex lock_; 232 std::mutex mem_lock_; 233 uint64_t mem_total_; 234 uint64_t mem_usage_; 235 std::deque<std::string> cache_evict_queue_; 236 std::condition_variable evict_cond; 237 }; 238 #ifdef ONLINE_DBG_MODE 239 } // namespace mindspore 240 #endif 241 #endif // MINDSPORE_CCSRC_DEBUG_TENSOR_LOAD_H_ 242