• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "distributed/persistent/storage/local_file.h"
18 #include <dirent.h>
19 #include <cmath>
20 #include <algorithm>
21 #include <numeric>
22 #include <tuple>
23 #include <utility>
24 #include "utils/convert_utils_base.h"
25 #include "utils/file_utils.h"
26 #include "utils/log_adapter.h"
27 #include "distributed/persistent/storage/constants.h"
28 #include "utils/system/env.h"
29 #include "base/float16.h"
30 #include "base/bfloat16.h"
31 
32 namespace mindspore {
33 namespace distributed {
34 namespace storage {
35 template <typename KeyType, typename ValueType>
LocalFile(const std::map<std::string,std::string> & storage_config)36 LocalFile<KeyType, ValueType>::LocalFile(const std::map<std::string, std::string> &storage_config) {
37   auto file_path_iter = storage_config.find(kFileStoragePath);
38   if (file_path_iter != storage_config.end()) {
39     file_path_ = file_path_iter->second;
40   }
41 
42   auto block_length_iter = storage_config.find(kMaxBlockLength);
43   if (block_length_iter != storage_config.end() && !(block_length_iter->second).empty()) {
44     max_block_length_ = std::stoul(block_length_iter->second);
45   } else {
46     max_block_length_ = DEFAULT_MAX_BLOCK_LENGTH;
47   }
48 
49   auto element_size_iter = storage_config.find(kElementSize);
50   if (element_size_iter != storage_config.end()) {
51     element_size_ = std::stoul(element_size_iter->second);
52   } else {
53     element_size_ = 0;
54   }
55 }
56 
57 template <typename KeyType, typename ValueType>
~LocalFile()58 LocalFile<KeyType, ValueType>::~LocalFile() {
59   for (const auto &file : block_files_) {
60     if (file == nullptr) {
61       continue;
62     }
63     (void)file->Close();
64     ChangeFileMode(file->get_file_name(), S_IRUSR | S_IWUSR);
65   }
66 }
67 
68 template <typename KeyType, typename ValueType>
Initialize()69 void LocalFile<KeyType, ValueType>::Initialize() {
70   fs_ = system::Env::GetFileSystem();
71   MS_EXCEPTION_IF_NULL(fs_);
72 
73   MS_EXCEPTION_IF_ZERO("element_size_", element_size_);
74   block_size_ = max_block_length_ / (element_size_ * sizeof(ValueType));
75 }
76 
77 template <typename KeyType, typename ValueType>
Finalize()78 void LocalFile<KeyType, ValueType>::Finalize() {
79   block_files_.clear();
80   fs_ = nullptr;
81   keys_to_locations_.clear();
82 }
83 
84 template <typename KeyType, typename ValueType>
Write(const InputData & input,const DirtyInfo & dirty_info)85 void LocalFile<KeyType, ValueType>::Write(const InputData &input, const DirtyInfo &dirty_info) {
86   std::vector<InputData> inputs = {input};
87   Write(inputs, dirty_info);
88 }
89 
90 template <typename KeyType, typename ValueType>
Write(const std::vector<InputData> & inputs,const DirtyInfo & dirty_info)91 void LocalFile<KeyType, ValueType>::Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info) {
92   if (inputs.empty()) {
93     MS_LOG(EXCEPTION) << "The inputs is empty";
94   }
95 
96   // The block file has been created, only the blocks related to the dirty information need to be rewritten.
97   if (finish_create_block_files_) {
98     std::vector<int> block_indices;
99     TransformDirtyInfoToBlockIndices(dirty_info, &block_indices);
100 
101     for (const auto &block_index : block_indices) {
102       WriteOneBlockFile(IntToSize(block_index), inputs);
103     }
104     return;
105   }
106 
107   // Create block files and write inputs_data to block files.
108   WriteBlockFiles(inputs);
109 }
110 
111 template <typename KeyType, typename ValueType>
TransformDirtyInfoToBlockIndices(const DirtyInfo & dirty_info,std::vector<int> * block_indices) const112 void LocalFile<KeyType, ValueType>::TransformDirtyInfoToBlockIndices(const DirtyInfo &dirty_info,
113                                                                      std::vector<int> *block_indices) const {
114   MS_EXCEPTION_IF_NULL(block_indices);
115   if (block_meta_list_.empty()) {
116     MS_LOG(EXCEPTION) << "The block meta list is empty";
117   }
118 
119   size_t block_index = 0;
120   bool block_index_alread_insert_vec = false;
121   auto block_meta_ptr = block_meta_list_.at(block_index);
122   MS_EXCEPTION_IF_NULL(block_meta_ptr);
123   int cur_lower_bound = block_meta_ptr->template Get<int>(kShardRangeLowerBound);
124   int cur_upper_bound = block_meta_ptr->template Get<int>(kShardRangeUpperBound);
125 
126   for (const auto &dirty_value : dirty_info) {
127     if (dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound) {
128       if (!block_index_alread_insert_vec) {
129         block_index_alread_insert_vec = true;
130         block_indices->push_back(block_index);
131       }
132       continue;
133     }
134 
135     while (!(dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound)) {
136       if (++block_index >= block_meta_list_.size()) {
137         break;
138       }
139       block_meta_ptr = block_meta_list_[block_index];
140       MS_EXCEPTION_IF_NULL(block_meta_ptr);
141       cur_lower_bound = block_meta_ptr->template Get<int>(kShardRangeLowerBound);
142       cur_upper_bound = block_meta_ptr->template Get<int>(kShardRangeUpperBound);
143     }
144 
145     if (block_index < block_meta_list_.size()) {
146       block_indices->push_back(block_index);
147     }
148   }
149 }
150 
151 template <typename KeyType, typename ValueType>
WriteBlockFiles(const std::vector<InputData> & inputs)152 void LocalFile<KeyType, ValueType>::WriteBlockFiles(const std::vector<InputData> &inputs) {
153   if (inputs.empty()) {
154     MS_LOG(EXCEPTION) << "The inputs is empty";
155   }
156 
157   const std::vector<int> &shape = std::get<0>(inputs.front());
158   size_t first_dim = 0;
159   if (shape.size() > 0) {
160     first_dim = IntToSize(shape[0]);
161   }
162   if (first_dim == 0) {
163     MS_LOG(EXCEPTION) << "The dimension of input shape contain zero.";
164   }
165 
166   size_t non_first_dims_size = std::get<2>(inputs.front()) / first_dim;
167   if (non_first_dims_size == 0) {
168     MS_LOG(EXCEPTION) << "The size of input tensor is zero.";
169   }
170 
171   size_t tensor_num = inputs.size();
172   size_t slice_size = static_cast<size_t>(
173     std::floor(static_cast<float>(static_cast<float>(max_block_length_) / tensor_num) / non_first_dims_size));
174   if (slice_size == 0) {
175     MS_LOG(EXCEPTION) << "The slice size in block is zero.";
176   }
177 
178   size_t block_num = static_cast<size_t>(std::ceil(static_cast<float>(first_dim) / slice_size));
179 
180   size_t offset = 0;
181   for (size_t block_index = 0; block_index < block_num; ++block_index) {
182     // Create block meta.
183     std::string block_meta_file_name =
184       file_path_ + "/" + kBlockMetaFilePrefix + std::to_string(block_index) + kJsonSuffix;
185     auto block_meta_ptr = std::make_shared<BlockMeta>(block_meta_file_name);
186     if (!block_meta_ptr->Initialize()) {
187       MS_LOG(EXCEPTION) << "Initialize block meta failed, file name [" << block_meta_file_name << "]";
188     }
189 
190     size_t cur_lower_bound = slice_size * block_index;
191     block_meta_ptr->Insert(kShardRangeLowerBound, cur_lower_bound);
192     size_t cur_upper_bound = std::min(cur_lower_bound + slice_size, first_dim);
193     block_meta_ptr->Insert(kShardRangeUpperBound, cur_upper_bound);
194 
195     size_t field_length = (cur_upper_bound - cur_lower_bound) * non_first_dims_size;
196     block_meta_ptr->Insert(kFieldsLength, field_length);
197     block_meta_ptr->Insert(kOffset, offset);
198     offset += field_length;
199     block_meta_list_.push_back(block_meta_ptr);
200 
201     // Create block.
202     auto block_ptr = std::make_shared<Block>(file_path_ + "/" + kBlockFilePrefix + std::to_string(block_index));
203     block_ptr->set_block_meta(block_meta_ptr);
204     block_list_.push_back(block_ptr);
205   }
206 
207   finish_create_block_files_ = true;
208 
209   // Write inputs_data to block files and Gen Sha256 seq.
210   for (size_t block_index = 0; block_index < block_num; ++block_index) {
211     WriteOneBlockFile(block_index, inputs);
212   }
213 }
214 
215 template <typename KeyType, typename ValueType>
WriteOneBlockFile(size_t block_index,const std::vector<InputData> & inputs) const216 void LocalFile<KeyType, ValueType>::WriteOneBlockFile(size_t block_index, const std::vector<InputData> &inputs) const {
217   const auto &block_meta_ptr = block_meta_list_.at(block_index);
218   MS_EXCEPTION_IF_NULL(block_meta_ptr);
219   size_t field_size = block_meta_ptr->template Get<size_t>(kFieldsLength);
220   size_t offset = block_meta_ptr->template Get<size_t>(kOffset);
221   std::vector<std::pair<const void *, size_t>> block_inputs_data;
222 
223   for (size_t input_index = 0; input_index < inputs.size(); ++input_index) {
224     const void *data_ptr = reinterpret_cast<const char *>(std::get<1>(inputs.at(input_index))) + offset;
225     size_t data_size = field_size;
226     (void)block_inputs_data.emplace_back(data_ptr, data_size);
227   }
228 
229   const auto &block_ptr = block_list_.at(block_index);
230   MS_EXCEPTION_IF_NULL(block_ptr);
231   // Rewrite the current block file.
232   if (!FileIOUtils::Write(block_ptr->block_file_name(), block_inputs_data)) {
233     MS_LOG(EXCEPTION) << "Write to block file[" << block_ptr->block_file_name() << "] failed.";
234   }
235 
236   ChangeFileMode(block_ptr->block_file_name(), S_IRUSR | S_IWUSR);
237 
238   // Generate sha256 hash sequence.
239   block_ptr->GenSha256Seq();
240 }
241 
242 template <typename KeyType, typename ValueType>
Write(const ConstDataWithLen & keys,const ConstDataWithLen & values)243 void LocalFile<KeyType, ValueType>::Write(const ConstDataWithLen &keys, const ConstDataWithLen &values) {
244   // Check input data valid.
245   const KeyType *keys_data = reinterpret_cast<const KeyType *>(keys.data_);
246   size_t keys_len = keys.data_len_;
247   const ValueType *values_data = reinterpret_cast<const ValueType *>(values.data_);
248   size_t values_len = values.data_len_;
249   MS_EXCEPTION_IF_NULL(keys_data);
250   MS_EXCEPTION_IF_NULL(values_data);
251   size_t key_num = keys_len / sizeof(KeyType);
252   if (key_num == 0) {
253     return;
254   }
255 
256   size_t element_len = element_size_ * sizeof(ValueType);
257   if (values_len != key_num * element_len) {
258     MS_LOG(EXCEPTION) << "The value length is invalid, expected length[" << key_num * element_len << "], but got["
259                       << values_len << "]";
260   }
261 
262   for (size_t i = 0; i < key_num; i++) {
263     auto iter = keys_to_locations_.find(keys_data[i]);
264     // 1. Write the new values for the keys already exist in local file.
265     if (iter != keys_to_locations_.end()) {
266       const std::pair<size_t, size_t> &offset = iter->second;
267       const system::WriteFilePtr &block_file = block_files_.at(offset.first);
268       MS_EXCEPTION_IF_NULL(block_file);
269       size_t offset_in_block = offset.second;
270       MS_EXCEPTION_IF_CHECK_FAIL(block_file->PWrite(values_data + i * element_size_, element_len, offset_in_block),
271                                  "PWrite file failed.");
272       continue;
273     }
274 
275     // 2. Write the values for the keys doesn't exist in local file:
276     if (current_offset_in_block_ == 0 || current_offset_in_block_ == block_size_ * element_len) {
277       // Create new block file for beginning or all block files are written fully.
278       std::string block_file_name = file_path_ + "/" + kBlockFilePrefix + std::to_string(block_files_.size());
279       auto block_file_ptr = fs_->CreateWriteFile(block_file_name, "wb+");
280       MS_EXCEPTION_IF_NULL(block_file_ptr);
281       MS_EXCEPTION_IF_CHECK_FAIL(block_file_ptr->Trunc(block_size_ * element_len), "Truncate file failed.");
282       (void)block_files_.emplace_back(block_file_ptr);
283       ChangeFileMode(block_file_name, S_IRUSR | S_IWUSR);
284 
285       // Reset offset cursor in block.
286       current_offset_in_block_ = 0;
287     }
288 
289     // Record which block file the value corresponding to a key is stored in, and the offset location of the block file.
290     (void)keys_to_locations_.emplace(keys_data[i],
291                                      std::pair<size_t, size_t>(block_files_.size() - 1, current_offset_in_block_));
292     // Write the values into latest created block file.
293     MS_EXCEPTION_IF_NULL(block_files_.back());
294     MS_EXCEPTION_IF_CHECK_FAIL(
295       block_files_.back()->PWrite(values_data + i * element_size_, element_len, current_offset_in_block_),
296       "PWrite file failed.");
297 
298     current_offset_in_block_ += element_len;
299   }
300 }
301 
302 template <typename KeyType, typename ValueType>
Read(const OutputData & output)303 void LocalFile<KeyType, ValueType>::Read(const OutputData &output) {
304   std::vector<OutputData> outputs = {output};
305   Read(outputs);
306 }
307 
308 template <typename KeyType, typename ValueType>
Read(const std::vector<OutputData> & outputs)309 void LocalFile<KeyType, ValueType>::Read(const std::vector<OutputData> &outputs) {
310   if (block_list_.empty() || block_meta_list_.empty()) {
311     // Load file list info of block files and block meta files in the current folder to block list and block meta list.
312     if (!LoadBlocksInfo()) {
313       MS_LOG(EXCEPTION) << "LoadBlocksInfo failed";
314     }
315   }
316 
317   // Read all block files.
318   for (size_t block_index = 0; block_index < block_list_.size(); ++block_index) {
319     std::vector<std::pair<void *, size_t>> block_output_data;
320     const auto &block_meta_ptr = block_meta_list_[block_index];
321     MS_EXCEPTION_IF_NULL(block_meta_ptr);
322     size_t field_size = block_meta_ptr->template Get<size_t>(kFieldsLength);
323     size_t offset = block_meta_ptr->template Get<size_t>(kOffset);
324 
325     for (size_t output_index = 0; output_index < outputs.size(); ++output_index) {
326       void *data_ptr = reinterpret_cast<char *>(std::get<0>(outputs[output_index])) + offset;
327       size_t data_size = field_size;
328       (void)block_output_data.emplace_back(data_ptr, data_size);
329     }
330 
331     const auto &block_ptr = block_list_[block_index];
332     MS_EXCEPTION_IF_NULL(block_ptr);
333     if (!block_ptr->CheckSha256Seq()) {
334       MS_LOG(EXCEPTION) << "CheckSha256 failed, file name [" << block_ptr->block_file_name() << "]";
335     }
336 
337     if (!FileIOUtils::Read(block_ptr->block_file_name(), block_output_data)) {
338       MS_LOG(EXCEPTION) << "Read block file failed, file name [" << block_ptr->block_file_name() << "]";
339     }
340   }
341 }
342 
343 template <typename KeyType, typename ValueType>
LoadBlocksInfo()344 bool LocalFile<KeyType, ValueType>::LoadBlocksInfo() {
345   DIR *dir = opendir(file_path_.c_str());
346   if (dir == nullptr) {
347     MS_LOG(ERROR) << "The file path [" << file_path_ << "] is not exist";
348     return false;
349   }
350   std::vector<std::string> block_file_name_list;
351   std::vector<std::string> block_meta_file_name_list;
352   struct dirent *entry;
353 
354   // Get file names of all block file and block meta file in the current folder.
355   while ((entry = readdir(dir)) != nullptr) {
356     std::string file_name = entry->d_name;
357     if (file_name.length() <= JSON_SUFFIX_LENS) {
358       continue;
359     }
360 
361     std::string real_storage_file_path = file_path_ + "/" + file_name;
362     auto suffix = file_name.substr(file_name.length() - JSON_SUFFIX_LENS);
363     if (suffix == kJsonSuffix) {
364       block_meta_file_name_list.push_back(real_storage_file_path);
365     } else {
366       block_file_name_list.push_back(real_storage_file_path);
367     }
368   }
369   (void)closedir(dir);
370 
371   if (block_file_name_list.size() != block_meta_file_name_list.size()) {
372     MS_LOG(ERROR) << "The block file number[" << block_file_name_list.size()
373                   << "] is not equal to block meta file number[" << block_meta_file_name_list.size() << "]";
374     return false;
375   }
376 
377   sort(block_file_name_list.begin(), block_file_name_list.end());
378   sort(block_meta_file_name_list.begin(), block_meta_file_name_list.end());
379   for (size_t i = 0; i < block_file_name_list.size(); i++) {
380     auto block_meta_ptr = std::make_shared<BlockMeta>(block_meta_file_name_list[i]);
381     if (!block_meta_ptr->Initialize()) {
382       MS_LOG(ERROR) << "Initialize block meta failed, file name [" << block_meta_file_name_list[i] << "]";
383       return false;
384     }
385     block_meta_list_.push_back(block_meta_ptr);
386 
387     auto block_ptr = std::make_shared<Block>(block_file_name_list[i]);
388     block_ptr->set_block_meta(block_meta_ptr);
389     block_list_.push_back(block_ptr);
390   }
391   return true;
392 }
393 
394 template <typename KeyType, typename ValueType>
Read(const ConstDataWithLen & keys,const DataWithLen & values)395 void LocalFile<KeyType, ValueType>::Read(const ConstDataWithLen &keys, const DataWithLen &values) {
396   // Check input data valid.
397   const KeyType *keys_data = reinterpret_cast<const KeyType *>(keys.data_);
398   size_t keys_len = keys.data_len_;
399   ValueType *values_data = reinterpret_cast<ValueType *>(values.data_);
400   size_t values_len = values.data_len_;
401   MS_EXCEPTION_IF_NULL(keys_data);
402   MS_EXCEPTION_IF_NULL(values_data);
403 
404   size_t key_num = keys_len / sizeof(KeyType);
405   if (key_num == 0) {
406     return;
407   }
408 
409   size_t element_len = element_size_ * sizeof(ValueType);
410   if (values_len < key_num * element_len) {
411     MS_LOG(EXCEPTION) << "The value length is insufficient.";
412   }
413 
414   for (size_t i = 0; i < key_num; i++) {
415     // 1. Find the position offset measured in bytes from the beginning of this file by keys.
416     auto iter = keys_to_locations_.find(keys_data[i]);
417     if (iter == keys_to_locations_.end()) {
418       MS_LOG(DEBUG) << "Can not find key: " << keys_data[i] << " to locate the position in file.";
419       continue;
420     }
421     const std::pair<size_t, size_t> &offset = iter->second;
422 
423     const system::WriteFilePtr &block_file = block_files_.at(offset.first);
424     MS_EXCEPTION_IF_NULL(block_file);
425     size_t offset_in_block = offset.second;
426     // 2. Read the values corresponding to keys.
427     MS_EXCEPTION_IF_CHECK_FAIL(block_file->PRead(values_data + i * element_size_, element_len, offset_in_block),
428                                "PRead file failed.");
429   }
430 }
431 
432 template <typename KeyType, typename ValueType>
GetAllKeys() const433 std::unique_ptr<std::vector<KeyType>> LocalFile<KeyType, ValueType>::GetAllKeys() const {
434   size_t keys_num = keys_to_locations_.size();
435   auto keys_vec = std::make_unique<std::vector<KeyType>>(keys_num);
436   MS_EXCEPTION_IF_NULL(keys_vec);
437   size_t index = 0;
438   for (const auto &item : keys_to_locations_) {
439     keys_vec->at(index) = item.first;
440     ++index;
441   }
442   return keys_vec;
443 }
444 
445 template class LocalFile<int32_t, bool>;
446 template class LocalFile<int32_t, int8_t>;
447 template class LocalFile<int32_t, int16_t>;
448 template class LocalFile<int32_t, int32_t>;
449 template class LocalFile<int32_t, int64_t>;
450 template class LocalFile<int32_t, uint8_t>;
451 template class LocalFile<int32_t, uint16_t>;
452 template class LocalFile<int32_t, uint32_t>;
453 template class LocalFile<int32_t, uint64_t>;
454 template class LocalFile<int32_t, float16>;
455 template class LocalFile<int32_t, float>;
456 template class LocalFile<int32_t, double>;
457 template class LocalFile<int32_t, bfloat16>;
458 
459 template class LocalFile<int64_t, bool>;
460 template class LocalFile<int64_t, int8_t>;
461 template class LocalFile<int64_t, int16_t>;
462 template class LocalFile<int64_t, int32_t>;
463 template class LocalFile<int64_t, int64_t>;
464 template class LocalFile<int64_t, uint8_t>;
465 template class LocalFile<int64_t, uint16_t>;
466 template class LocalFile<int64_t, uint32_t>;
467 template class LocalFile<int64_t, uint64_t>;
468 template class LocalFile<int64_t, float16>;
469 template class LocalFile<int64_t, float>;
470 template class LocalFile<int64_t, double>;
471 template class LocalFile<int64_t, bfloat16>;
472 }  // namespace storage
473 }  // namespace distributed
474 }  // namespace mindspore
475