1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "distributed/persistent/storage/local_file.h"
18 #include <dirent.h>
19 #include <cmath>
20 #include <algorithm>
21 #include <numeric>
22 #include <tuple>
23 #include <utility>
24 #include "utils/convert_utils_base.h"
25 #include "utils/file_utils.h"
26 #include "utils/log_adapter.h"
27 #include "distributed/persistent/storage/constants.h"
28 #include "utils/system/env.h"
29 #include "base/float16.h"
30 #include "base/bfloat16.h"
31
32 namespace mindspore {
33 namespace distributed {
34 namespace storage {
35 template <typename KeyType, typename ValueType>
LocalFile(const std::map<std::string,std::string> & storage_config)36 LocalFile<KeyType, ValueType>::LocalFile(const std::map<std::string, std::string> &storage_config) {
37 auto file_path_iter = storage_config.find(kFileStoragePath);
38 if (file_path_iter != storage_config.end()) {
39 file_path_ = file_path_iter->second;
40 }
41
42 auto block_length_iter = storage_config.find(kMaxBlockLength);
43 if (block_length_iter != storage_config.end() && !(block_length_iter->second).empty()) {
44 max_block_length_ = std::stoul(block_length_iter->second);
45 } else {
46 max_block_length_ = DEFAULT_MAX_BLOCK_LENGTH;
47 }
48
49 auto element_size_iter = storage_config.find(kElementSize);
50 if (element_size_iter != storage_config.end()) {
51 element_size_ = std::stoul(element_size_iter->second);
52 } else {
53 element_size_ = 0;
54 }
55 }
56
57 template <typename KeyType, typename ValueType>
~LocalFile()58 LocalFile<KeyType, ValueType>::~LocalFile() {
59 for (const auto &file : block_files_) {
60 if (file == nullptr) {
61 continue;
62 }
63 (void)file->Close();
64 ChangeFileMode(file->get_file_name(), S_IRUSR | S_IWUSR);
65 }
66 }
67
68 template <typename KeyType, typename ValueType>
Initialize()69 void LocalFile<KeyType, ValueType>::Initialize() {
70 fs_ = system::Env::GetFileSystem();
71 MS_EXCEPTION_IF_NULL(fs_);
72
73 MS_EXCEPTION_IF_ZERO("element_size_", element_size_);
74 block_size_ = max_block_length_ / (element_size_ * sizeof(ValueType));
75 }
76
77 template <typename KeyType, typename ValueType>
Finalize()78 void LocalFile<KeyType, ValueType>::Finalize() {
79 block_files_.clear();
80 fs_ = nullptr;
81 keys_to_locations_.clear();
82 }
83
84 template <typename KeyType, typename ValueType>
Write(const InputData & input,const DirtyInfo & dirty_info)85 void LocalFile<KeyType, ValueType>::Write(const InputData &input, const DirtyInfo &dirty_info) {
86 std::vector<InputData> inputs = {input};
87 Write(inputs, dirty_info);
88 }
89
90 template <typename KeyType, typename ValueType>
Write(const std::vector<InputData> & inputs,const DirtyInfo & dirty_info)91 void LocalFile<KeyType, ValueType>::Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info) {
92 if (inputs.empty()) {
93 MS_LOG(EXCEPTION) << "The inputs is empty";
94 }
95
96 // The block file has been created, only the blocks related to the dirty information need to be rewritten.
97 if (finish_create_block_files_) {
98 std::vector<int> block_indices;
99 TransformDirtyInfoToBlockIndices(dirty_info, &block_indices);
100
101 for (const auto &block_index : block_indices) {
102 WriteOneBlockFile(IntToSize(block_index), inputs);
103 }
104 return;
105 }
106
107 // Create block files and write inputs_data to block files.
108 WriteBlockFiles(inputs);
109 }
110
111 template <typename KeyType, typename ValueType>
TransformDirtyInfoToBlockIndices(const DirtyInfo & dirty_info,std::vector<int> * block_indices) const112 void LocalFile<KeyType, ValueType>::TransformDirtyInfoToBlockIndices(const DirtyInfo &dirty_info,
113 std::vector<int> *block_indices) const {
114 MS_EXCEPTION_IF_NULL(block_indices);
115 if (block_meta_list_.empty()) {
116 MS_LOG(EXCEPTION) << "The block meta list is empty";
117 }
118
119 size_t block_index = 0;
120 bool block_index_alread_insert_vec = false;
121 auto block_meta_ptr = block_meta_list_.at(block_index);
122 MS_EXCEPTION_IF_NULL(block_meta_ptr);
123 int cur_lower_bound = block_meta_ptr->template Get<int>(kShardRangeLowerBound);
124 int cur_upper_bound = block_meta_ptr->template Get<int>(kShardRangeUpperBound);
125
126 for (const auto &dirty_value : dirty_info) {
127 if (dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound) {
128 if (!block_index_alread_insert_vec) {
129 block_index_alread_insert_vec = true;
130 block_indices->push_back(block_index);
131 }
132 continue;
133 }
134
135 while (!(dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound)) {
136 if (++block_index >= block_meta_list_.size()) {
137 break;
138 }
139 block_meta_ptr = block_meta_list_[block_index];
140 MS_EXCEPTION_IF_NULL(block_meta_ptr);
141 cur_lower_bound = block_meta_ptr->template Get<int>(kShardRangeLowerBound);
142 cur_upper_bound = block_meta_ptr->template Get<int>(kShardRangeUpperBound);
143 }
144
145 if (block_index < block_meta_list_.size()) {
146 block_indices->push_back(block_index);
147 }
148 }
149 }
150
151 template <typename KeyType, typename ValueType>
WriteBlockFiles(const std::vector<InputData> & inputs)152 void LocalFile<KeyType, ValueType>::WriteBlockFiles(const std::vector<InputData> &inputs) {
153 if (inputs.empty()) {
154 MS_LOG(EXCEPTION) << "The inputs is empty";
155 }
156
157 const std::vector<int> &shape = std::get<0>(inputs.front());
158 size_t first_dim = 0;
159 if (shape.size() > 0) {
160 first_dim = IntToSize(shape[0]);
161 }
162 if (first_dim == 0) {
163 MS_LOG(EXCEPTION) << "The dimension of input shape contain zero.";
164 }
165
166 size_t non_first_dims_size = std::get<2>(inputs.front()) / first_dim;
167 if (non_first_dims_size == 0) {
168 MS_LOG(EXCEPTION) << "The size of input tensor is zero.";
169 }
170
171 size_t tensor_num = inputs.size();
172 size_t slice_size = static_cast<size_t>(
173 std::floor(static_cast<float>(static_cast<float>(max_block_length_) / tensor_num) / non_first_dims_size));
174 if (slice_size == 0) {
175 MS_LOG(EXCEPTION) << "The slice size in block is zero.";
176 }
177
178 size_t block_num = static_cast<size_t>(std::ceil(static_cast<float>(first_dim) / slice_size));
179
180 size_t offset = 0;
181 for (size_t block_index = 0; block_index < block_num; ++block_index) {
182 // Create block meta.
183 std::string block_meta_file_name =
184 file_path_ + "/" + kBlockMetaFilePrefix + std::to_string(block_index) + kJsonSuffix;
185 auto block_meta_ptr = std::make_shared<BlockMeta>(block_meta_file_name);
186 if (!block_meta_ptr->Initialize()) {
187 MS_LOG(EXCEPTION) << "Initialize block meta failed, file name [" << block_meta_file_name << "]";
188 }
189
190 size_t cur_lower_bound = slice_size * block_index;
191 block_meta_ptr->Insert(kShardRangeLowerBound, cur_lower_bound);
192 size_t cur_upper_bound = std::min(cur_lower_bound + slice_size, first_dim);
193 block_meta_ptr->Insert(kShardRangeUpperBound, cur_upper_bound);
194
195 size_t field_length = (cur_upper_bound - cur_lower_bound) * non_first_dims_size;
196 block_meta_ptr->Insert(kFieldsLength, field_length);
197 block_meta_ptr->Insert(kOffset, offset);
198 offset += field_length;
199 block_meta_list_.push_back(block_meta_ptr);
200
201 // Create block.
202 auto block_ptr = std::make_shared<Block>(file_path_ + "/" + kBlockFilePrefix + std::to_string(block_index));
203 block_ptr->set_block_meta(block_meta_ptr);
204 block_list_.push_back(block_ptr);
205 }
206
207 finish_create_block_files_ = true;
208
209 // Write inputs_data to block files and Gen Sha256 seq.
210 for (size_t block_index = 0; block_index < block_num; ++block_index) {
211 WriteOneBlockFile(block_index, inputs);
212 }
213 }
214
215 template <typename KeyType, typename ValueType>
WriteOneBlockFile(size_t block_index,const std::vector<InputData> & inputs) const216 void LocalFile<KeyType, ValueType>::WriteOneBlockFile(size_t block_index, const std::vector<InputData> &inputs) const {
217 const auto &block_meta_ptr = block_meta_list_.at(block_index);
218 MS_EXCEPTION_IF_NULL(block_meta_ptr);
219 size_t field_size = block_meta_ptr->template Get<size_t>(kFieldsLength);
220 size_t offset = block_meta_ptr->template Get<size_t>(kOffset);
221 std::vector<std::pair<const void *, size_t>> block_inputs_data;
222
223 for (size_t input_index = 0; input_index < inputs.size(); ++input_index) {
224 const void *data_ptr = reinterpret_cast<const char *>(std::get<1>(inputs.at(input_index))) + offset;
225 size_t data_size = field_size;
226 (void)block_inputs_data.emplace_back(data_ptr, data_size);
227 }
228
229 const auto &block_ptr = block_list_.at(block_index);
230 MS_EXCEPTION_IF_NULL(block_ptr);
231 // Rewrite the current block file.
232 if (!FileIOUtils::Write(block_ptr->block_file_name(), block_inputs_data)) {
233 MS_LOG(EXCEPTION) << "Write to block file[" << block_ptr->block_file_name() << "] failed.";
234 }
235
236 ChangeFileMode(block_ptr->block_file_name(), S_IRUSR | S_IWUSR);
237
238 // Generate sha256 hash sequence.
239 block_ptr->GenSha256Seq();
240 }
241
242 template <typename KeyType, typename ValueType>
Write(const ConstDataWithLen & keys,const ConstDataWithLen & values)243 void LocalFile<KeyType, ValueType>::Write(const ConstDataWithLen &keys, const ConstDataWithLen &values) {
244 // Check input data valid.
245 const KeyType *keys_data = reinterpret_cast<const KeyType *>(keys.data_);
246 size_t keys_len = keys.data_len_;
247 const ValueType *values_data = reinterpret_cast<const ValueType *>(values.data_);
248 size_t values_len = values.data_len_;
249 MS_EXCEPTION_IF_NULL(keys_data);
250 MS_EXCEPTION_IF_NULL(values_data);
251 size_t key_num = keys_len / sizeof(KeyType);
252 if (key_num == 0) {
253 return;
254 }
255
256 size_t element_len = element_size_ * sizeof(ValueType);
257 if (values_len != key_num * element_len) {
258 MS_LOG(EXCEPTION) << "The value length is invalid, expected length[" << key_num * element_len << "], but got["
259 << values_len << "]";
260 }
261
262 for (size_t i = 0; i < key_num; i++) {
263 auto iter = keys_to_locations_.find(keys_data[i]);
264 // 1. Write the new values for the keys already exist in local file.
265 if (iter != keys_to_locations_.end()) {
266 const std::pair<size_t, size_t> &offset = iter->second;
267 const system::WriteFilePtr &block_file = block_files_.at(offset.first);
268 MS_EXCEPTION_IF_NULL(block_file);
269 size_t offset_in_block = offset.second;
270 MS_EXCEPTION_IF_CHECK_FAIL(block_file->PWrite(values_data + i * element_size_, element_len, offset_in_block),
271 "PWrite file failed.");
272 continue;
273 }
274
275 // 2. Write the values for the keys doesn't exist in local file:
276 if (current_offset_in_block_ == 0 || current_offset_in_block_ == block_size_ * element_len) {
277 // Create new block file for beginning or all block files are written fully.
278 std::string block_file_name = file_path_ + "/" + kBlockFilePrefix + std::to_string(block_files_.size());
279 auto block_file_ptr = fs_->CreateWriteFile(block_file_name, "wb+");
280 MS_EXCEPTION_IF_NULL(block_file_ptr);
281 MS_EXCEPTION_IF_CHECK_FAIL(block_file_ptr->Trunc(block_size_ * element_len), "Truncate file failed.");
282 (void)block_files_.emplace_back(block_file_ptr);
283 ChangeFileMode(block_file_name, S_IRUSR | S_IWUSR);
284
285 // Reset offset cursor in block.
286 current_offset_in_block_ = 0;
287 }
288
289 // Record which block file the value corresponding to a key is stored in, and the offset location of the block file.
290 (void)keys_to_locations_.emplace(keys_data[i],
291 std::pair<size_t, size_t>(block_files_.size() - 1, current_offset_in_block_));
292 // Write the values into latest created block file.
293 MS_EXCEPTION_IF_NULL(block_files_.back());
294 MS_EXCEPTION_IF_CHECK_FAIL(
295 block_files_.back()->PWrite(values_data + i * element_size_, element_len, current_offset_in_block_),
296 "PWrite file failed.");
297
298 current_offset_in_block_ += element_len;
299 }
300 }
301
302 template <typename KeyType, typename ValueType>
Read(const OutputData & output)303 void LocalFile<KeyType, ValueType>::Read(const OutputData &output) {
304 std::vector<OutputData> outputs = {output};
305 Read(outputs);
306 }
307
308 template <typename KeyType, typename ValueType>
Read(const std::vector<OutputData> & outputs)309 void LocalFile<KeyType, ValueType>::Read(const std::vector<OutputData> &outputs) {
310 if (block_list_.empty() || block_meta_list_.empty()) {
311 // Load file list info of block files and block meta files in the current folder to block list and block meta list.
312 if (!LoadBlocksInfo()) {
313 MS_LOG(EXCEPTION) << "LoadBlocksInfo failed";
314 }
315 }
316
317 // Read all block files.
318 for (size_t block_index = 0; block_index < block_list_.size(); ++block_index) {
319 std::vector<std::pair<void *, size_t>> block_output_data;
320 const auto &block_meta_ptr = block_meta_list_[block_index];
321 MS_EXCEPTION_IF_NULL(block_meta_ptr);
322 size_t field_size = block_meta_ptr->template Get<size_t>(kFieldsLength);
323 size_t offset = block_meta_ptr->template Get<size_t>(kOffset);
324
325 for (size_t output_index = 0; output_index < outputs.size(); ++output_index) {
326 void *data_ptr = reinterpret_cast<char *>(std::get<0>(outputs[output_index])) + offset;
327 size_t data_size = field_size;
328 (void)block_output_data.emplace_back(data_ptr, data_size);
329 }
330
331 const auto &block_ptr = block_list_[block_index];
332 MS_EXCEPTION_IF_NULL(block_ptr);
333 if (!block_ptr->CheckSha256Seq()) {
334 MS_LOG(EXCEPTION) << "CheckSha256 failed, file name [" << block_ptr->block_file_name() << "]";
335 }
336
337 if (!FileIOUtils::Read(block_ptr->block_file_name(), block_output_data)) {
338 MS_LOG(EXCEPTION) << "Read block file failed, file name [" << block_ptr->block_file_name() << "]";
339 }
340 }
341 }
342
343 template <typename KeyType, typename ValueType>
LoadBlocksInfo()344 bool LocalFile<KeyType, ValueType>::LoadBlocksInfo() {
345 DIR *dir = opendir(file_path_.c_str());
346 if (dir == nullptr) {
347 MS_LOG(ERROR) << "The file path [" << file_path_ << "] is not exist";
348 return false;
349 }
350 std::vector<std::string> block_file_name_list;
351 std::vector<std::string> block_meta_file_name_list;
352 struct dirent *entry;
353
354 // Get file names of all block file and block meta file in the current folder.
355 while ((entry = readdir(dir)) != nullptr) {
356 std::string file_name = entry->d_name;
357 if (file_name.length() <= JSON_SUFFIX_LENS) {
358 continue;
359 }
360
361 std::string real_storage_file_path = file_path_ + "/" + file_name;
362 auto suffix = file_name.substr(file_name.length() - JSON_SUFFIX_LENS);
363 if (suffix == kJsonSuffix) {
364 block_meta_file_name_list.push_back(real_storage_file_path);
365 } else {
366 block_file_name_list.push_back(real_storage_file_path);
367 }
368 }
369 (void)closedir(dir);
370
371 if (block_file_name_list.size() != block_meta_file_name_list.size()) {
372 MS_LOG(ERROR) << "The block file number[" << block_file_name_list.size()
373 << "] is not equal to block meta file number[" << block_meta_file_name_list.size() << "]";
374 return false;
375 }
376
377 sort(block_file_name_list.begin(), block_file_name_list.end());
378 sort(block_meta_file_name_list.begin(), block_meta_file_name_list.end());
379 for (size_t i = 0; i < block_file_name_list.size(); i++) {
380 auto block_meta_ptr = std::make_shared<BlockMeta>(block_meta_file_name_list[i]);
381 if (!block_meta_ptr->Initialize()) {
382 MS_LOG(ERROR) << "Initialize block meta failed, file name [" << block_meta_file_name_list[i] << "]";
383 return false;
384 }
385 block_meta_list_.push_back(block_meta_ptr);
386
387 auto block_ptr = std::make_shared<Block>(block_file_name_list[i]);
388 block_ptr->set_block_meta(block_meta_ptr);
389 block_list_.push_back(block_ptr);
390 }
391 return true;
392 }
393
394 template <typename KeyType, typename ValueType>
Read(const ConstDataWithLen & keys,const DataWithLen & values)395 void LocalFile<KeyType, ValueType>::Read(const ConstDataWithLen &keys, const DataWithLen &values) {
396 // Check input data valid.
397 const KeyType *keys_data = reinterpret_cast<const KeyType *>(keys.data_);
398 size_t keys_len = keys.data_len_;
399 ValueType *values_data = reinterpret_cast<ValueType *>(values.data_);
400 size_t values_len = values.data_len_;
401 MS_EXCEPTION_IF_NULL(keys_data);
402 MS_EXCEPTION_IF_NULL(values_data);
403
404 size_t key_num = keys_len / sizeof(KeyType);
405 if (key_num == 0) {
406 return;
407 }
408
409 size_t element_len = element_size_ * sizeof(ValueType);
410 if (values_len < key_num * element_len) {
411 MS_LOG(EXCEPTION) << "The value length is insufficient.";
412 }
413
414 for (size_t i = 0; i < key_num; i++) {
415 // 1. Find the position offset measured in bytes from the beginning of this file by keys.
416 auto iter = keys_to_locations_.find(keys_data[i]);
417 if (iter == keys_to_locations_.end()) {
418 MS_LOG(DEBUG) << "Can not find key: " << keys_data[i] << " to locate the position in file.";
419 continue;
420 }
421 const std::pair<size_t, size_t> &offset = iter->second;
422
423 const system::WriteFilePtr &block_file = block_files_.at(offset.first);
424 MS_EXCEPTION_IF_NULL(block_file);
425 size_t offset_in_block = offset.second;
426 // 2. Read the values corresponding to keys.
427 MS_EXCEPTION_IF_CHECK_FAIL(block_file->PRead(values_data + i * element_size_, element_len, offset_in_block),
428 "PRead file failed.");
429 }
430 }
431
432 template <typename KeyType, typename ValueType>
GetAllKeys() const433 std::unique_ptr<std::vector<KeyType>> LocalFile<KeyType, ValueType>::GetAllKeys() const {
434 size_t keys_num = keys_to_locations_.size();
435 auto keys_vec = std::make_unique<std::vector<KeyType>>(keys_num);
436 MS_EXCEPTION_IF_NULL(keys_vec);
437 size_t index = 0;
438 for (const auto &item : keys_to_locations_) {
439 keys_vec->at(index) = item.first;
440 ++index;
441 }
442 return keys_vec;
443 }
444
445 template class LocalFile<int32_t, bool>;
446 template class LocalFile<int32_t, int8_t>;
447 template class LocalFile<int32_t, int16_t>;
448 template class LocalFile<int32_t, int32_t>;
449 template class LocalFile<int32_t, int64_t>;
450 template class LocalFile<int32_t, uint8_t>;
451 template class LocalFile<int32_t, uint16_t>;
452 template class LocalFile<int32_t, uint32_t>;
453 template class LocalFile<int32_t, uint64_t>;
454 template class LocalFile<int32_t, float16>;
455 template class LocalFile<int32_t, float>;
456 template class LocalFile<int32_t, double>;
457 template class LocalFile<int32_t, bfloat16>;
458
459 template class LocalFile<int64_t, bool>;
460 template class LocalFile<int64_t, int8_t>;
461 template class LocalFile<int64_t, int16_t>;
462 template class LocalFile<int64_t, int32_t>;
463 template class LocalFile<int64_t, int64_t>;
464 template class LocalFile<int64_t, uint8_t>;
465 template class LocalFile<int64_t, uint16_t>;
466 template class LocalFile<int64_t, uint32_t>;
467 template class LocalFile<int64_t, uint64_t>;
468 template class LocalFile<int64_t, float16>;
469 template class LocalFile<int64_t, float>;
470 template class LocalFile<int64_t, double>;
471 template class LocalFile<int64_t, bfloat16>;
472 } // namespace storage
473 } // namespace distributed
474 } // namespace mindspore
475