1 /**
2 * Copyright 2019-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h"
17
18 #include <algorithm>
19 #include <cstdint>
20 #include <utility>
21
22 #include "utils/ms_utils.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/core/global_context.h"
25 #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
26 #include "minddata/mindrecord/include/shard_column.h"
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/include/dataset/constants.h"
29 #include "minddata/dataset/util/log_adapter.h"
30
31 namespace mindspore {
32 namespace dataset {
33
34 using mindrecord::kInt64Len;
35 using mindrecord::MSRStatus;
36 using mindrecord::Schema;
37 using mindrecord::ShardOperator;
38 using mindrecord::ShardReader;
39
40 // Constructor of the MindRecordOp.
MindRecordOp(int32_t num_mind_record_workers,std::vector<std::string> dataset_file,bool load_dataset,int32_t op_connector_queue_size,const std::vector<std::string> & columns_to_load,const std::vector<std::shared_ptr<ShardOperator>> & operators,int64_t num_padded,const mindrecord::json & sample_json,const std::map<std::string,std::string> & sample_bytes,const ShuffleMode shuffle_mode,std::unique_ptr<ShardReader> shard_reader,std::shared_ptr<SamplerRT> sampler)41 MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset,
42 int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
43 const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
44 const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
45 const ShuffleMode shuffle_mode, std::unique_ptr<ShardReader> shard_reader,
46 std::shared_ptr<SamplerRT> sampler)
47 : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
48 dataset_file_(std::move(dataset_file)),
49 load_dataset_(load_dataset),
50 columns_to_load_(columns_to_load),
51 operators_(operators),
52 num_mind_record_workers_(num_mind_record_workers),
53 ended_worker_(0),
54 num_padded_(num_padded),
55 sample_json_(sample_json),
56 sample_bytes_(sample_bytes),
57 shuffle_mode_(shuffle_mode),
58 shard_reader_(std::move(shard_reader)) {
59 epoch_sync_flag_ = true; // MindRecordOp needs to turn this flag on, otherwise, calling ShuffleTask() before all
60 // tasks are consumed by the worker threads would cause problem.
61 }
62
63 // Private helper method to encapsulate some common construction/reset tasks
Init()64 Status MindRecordOp::Init() {
65 RETURN_IF_NOT_OK(shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_,
66 operators_, num_padded_));
67
68 data_schema_ = std::make_unique<DataSchema>();
69
70 std::vector<std::string> col_names = shard_reader_->GetShardColumn()->GetColumnName();
71 CHECK_FAIL_RETURN_UNEXPECTED(!col_names.empty(),
72 "Invalid column, no column names are specified, check mindrecord file.");
73 std::vector<mindrecord::ColumnDataType> col_data_types = shard_reader_->GetShardColumn()->GeColumnDataType();
74 std::vector<std::vector<int64_t>> col_shapes = shard_reader_->GetShardColumn()->GetColumnShape();
75
76 bool load_all_cols = columns_to_load_.empty(); // if columns_to_load_ is empty it means load everything
77 std::map<std::string, int32_t> colname_to_ind;
78 for (uint32_t i = 0; i < col_names.size(); i++) {
79 std::string colname = col_names[i];
80 ColDescriptor col_desc;
81
82 TensorShape t_shape = TensorShape::CreateUnknownRankShape(); // shape of tensor, default unknown
83 std::string type_str = mindrecord::ColumnDataTypeNameNormalized[col_data_types[i]];
84 DataType t_dtype = DataType(type_str); // valid types: {"bytes", "string", "int32", "int64", "float32", "float64"}
85
86 if (col_data_types[i] == mindrecord::ColumnBytes) { // rank = 1
87 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 1);
88 } else if (col_data_types[i] == mindrecord::ColumnString) { // rank = 0
89 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 0);
90 } else if (col_shapes[i].size() > 0) {
91 std::vector<dsize_t> vec(col_shapes[i].size()); // temporary vector to hold shape
92 (void)std::copy(col_shapes[i].begin(), col_shapes[i].end(), vec.begin());
93 t_shape = TensorShape(vec);
94 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
95 } else { // unknown shape
96 // create colDesc and add it to schema
97 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
98 }
99
100 colname_to_ind[colname] = data_schema_->NumColumns();
101 RETURN_IF_NOT_OK(data_schema_->AddColumn(col_desc));
102
103 if (load_all_cols) {
104 columns_to_load_.emplace_back(colname);
105 }
106 }
107
108 if (!load_all_cols) {
109 std::unique_ptr<DataSchema> tmp_schema = std::make_unique<DataSchema>();
110 for (std::string colname : columns_to_load_) {
111 CHECK_FAIL_RETURN_UNEXPECTED(colname_to_ind.find(colname) != colname_to_ind.end(),
112 "Invalid column, " + colname + " does not exist in data file.");
113 RETURN_IF_NOT_OK(tmp_schema->AddColumn(data_schema_->Column(colname_to_ind[colname])));
114 }
115 data_schema_ = std::move(tmp_schema);
116 }
117
118 return Status::OK();
119 }
120
121 // Destructor
~MindRecordOp()122 MindRecordOp::~MindRecordOp() {}
123
124 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const125 void MindRecordOp::Print(std::ostream &out, bool show_all) const {
126 if (!show_all) {
127 // Call the super class for displaying any common 1-liner info
128 ParallelOp::Print(out, show_all);
129 // Then show any custom derived-internal 1-liner info for this op
130 out << "\n";
131 } else {
132 // Call the super class for displaying any common detailed info
133 ParallelOp::Print(out, show_all);
134 // Then show any custom derived-internal stuff
135 out << "\nDataset file : ";
136 for (auto &file : dataset_file_) {
137 out << file << " ";
138 }
139 out << "\nNumber of rows : " << num_rows_ << "\nNumber of ShardReader workers : " << num_mind_record_workers_
140 << "\n\n";
141 }
142 }
143
WorkerEntry(int32_t worker_id)144 Status MindRecordOp::WorkerEntry(int32_t worker_id) {
145 TaskManager::FindMe()->Post();
146 std::unique_ptr<IOBlock> io_block;
147
148 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
149 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
150 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
151 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
152
153 while (io_block != nullptr) {
154 if (io_block->wait()) {
155 RETURN_IF_NOT_OK(
156 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
157 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagWait)));
158 RETURN_IF_NOT_OK(TaskManager::FindMe()->Wait()); // wait for auto tune update workers successful
159 TaskManager::FindMe()->Clear();
160 } else if (io_block->eoe()) {
161 RETURN_IF_NOT_OK(
162 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
163 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
164 } else if (io_block->eof()) {
165 RETURN_IF_NOT_OK(
166 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
167 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
168 } else {
169 // load TensorRow
170 std::vector<int64_t> keys;
171 RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
172 if (keys.empty() == true) {
173 {
174 std::unique_lock<std::mutex> lock(ended_worker_mutex_);
175 ended_worker_++;
176 if (ended_worker_ == num_workers_) {
177 shard_reader_->Close();
178 }
179 }
180 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess",
181 {{"TensorRowFlags", IOBlock(IOBlock::kFlagQuit).FlagName()}}));
182 return Status::OK(); // empty key is a quit signal for workers
183 }
184
185 const uint64_t row_id = keys[0];
186 TensorRow fetched_row;
187
188 // Get the next row. Push it up to the output connector.
189 if (row_id % LOG_INTERVAL == 0) {
190 MS_LOG(DEBUG) << "MindRecord operator consumed row " << row_id << " by worker " << worker_id << ".";
191 }
192 RETURN_IF_NOT_OK(GetRowFromReader(&fetched_row, row_id, worker_id));
193 RETURN_IF_NOT_OK(
194 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
195 RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(fetched_row)));
196 }
197 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
198 RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
199 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
200 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
201 }
202 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Unexpected nullptr received in worker.");
203 }
204
GetRowFromReader(TensorRow * fetched_row,uint64_t row_id,int32_t worker_id)205 Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, uint64_t row_id, int32_t worker_id) {
206 RETURN_UNEXPECTED_IF_NULL(fetched_row);
207 *fetched_row = {};
208 auto task_content_ptr = std::make_shared<mindrecord::TASK_CONTENT>(
209 mindrecord::TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, mindrecord::json>>());
210 RETURN_IF_NOT_OK(shard_reader_->GetNextById(row_id, worker_id, &task_content_ptr));
211 auto task_type = task_content_ptr->first;
212 auto tupled_buffer = task_content_ptr->second;
213 if (task_type == mindrecord::TaskType::kPaddedTask) {
214 RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type));
215 std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
216 fetched_row->setPath(file_path);
217 fetched_row->setId(row_id);
218 }
219 if (tupled_buffer.empty()) {
220 return Status::OK();
221 }
222 if (task_type == mindrecord::TaskType::kCommonTask) {
223 for (const auto &tupled_row : tupled_buffer) {
224 std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
225 mindrecord::json columns_json = std::get<1>(tupled_row);
226 RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type));
227 std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
228 fetched_row->setPath(file_path);
229 fetched_row->setId(row_id);
230 }
231 }
232
233 return Status::OK();
234 }
235
LoadTensorRow(TensorRow * tensor_row,const std::vector<uint8_t> & columns_blob,const mindrecord::json & columns_json,const mindrecord::TaskType task_type)236 Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
237 const mindrecord::json &columns_json, const mindrecord::TaskType task_type) {
238 RETURN_UNEXPECTED_IF_NULL(tensor_row);
239 for (int32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
240 auto column_name = columns_to_load_[i_col];
241
242 // Initialize column parameters
243 const unsigned char *data = nullptr;
244 std::unique_ptr<unsigned char[]> data_ptr;
245 uint64_t n_bytes = 0;
246 mindrecord::ColumnDataType column_data_type = mindrecord::ColumnNoDataType;
247 uint64_t column_data_type_size = 1;
248 std::vector<int64_t> column_shape;
249
250 // Get column data
251 auto shard_column = shard_reader_->GetShardColumn();
252 if (num_padded_ > 0 && task_type == mindrecord::TaskType::kPaddedTask) {
253 mindrecord::ColumnCategory category;
254 RETURN_IF_NOT_OK(shard_column->GetColumnTypeByName(column_name, &column_data_type, &column_data_type_size,
255 &column_shape, &category));
256 if (category == mindrecord::ColumnInRaw) {
257 RETURN_IF_NOT_OK(shard_column->GetColumnFromJson(column_name, sample_json_, &data_ptr, &n_bytes));
258 } else if (category == mindrecord::ColumnInBlob) {
259 CHECK_FAIL_RETURN_UNEXPECTED(sample_bytes_.find(column_name) != sample_bytes_.end(),
260 "Invalid padded_sample, failed to retrieve blob data from padding sample, "
261 "check 'padded_sample'.");
262
263 std::string ss(sample_bytes_[column_name]);
264 n_bytes = ss.size();
265 data_ptr = std::make_unique<unsigned char[]>(n_bytes);
266 (void)std::copy(ss.begin(), ss.end(), data_ptr.get());
267 } else {
268 RETURN_STATUS_UNEXPECTED("Invalid datatype, retrieved data type is unknown.");
269 }
270 if (data == nullptr) {
271 data = reinterpret_cast<const unsigned char *>(data_ptr.get());
272 }
273 } else {
274 RETURN_IF_NOT_OK(shard_column->GetColumnValueByName(column_name, columns_blob, columns_json, &data, &data_ptr,
275 &n_bytes, &column_data_type, &column_data_type_size,
276 &column_shape));
277 }
278
279 std::shared_ptr<Tensor> tensor;
280 const ColDescriptor &column = data_schema_->Column(i_col);
281 DataType type = column.Type();
282
283 // Set shape
284 CHECK_FAIL_RETURN_UNEXPECTED(column_data_type_size != 0,
285 "[Internal ERROR] Found memory size of column data type is 0.");
286 auto num_elements = n_bytes / column_data_type_size;
287 if (type == DataType::DE_STRING) {
288 std::string s{data, data + n_bytes};
289 RETURN_IF_NOT_OK(Tensor::CreateScalar(s, &tensor));
290 } else if (type == DataType::DE_BYTES) {
291 std::vector<std::string> strings;
292 strings.push_back(std::string(reinterpret_cast<const char *>(data), num_elements));
293 RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, TensorShape({1}), DataType(DataType::DE_BYTES), &tensor));
294 } else if (column.HasShape()) {
295 auto new_shape = TensorShape(column.Shape());
296 // if the numpy is null, create empty tensor shape
297 if (num_elements == 0) {
298 new_shape = TensorShape({});
299 } else {
300 RETURN_IF_NOT_OK(column.MaterializeTensorShape(static_cast<int32_t>(num_elements), &new_shape));
301 }
302 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
303 } else {
304 std::vector<dsize_t> shapeDetails = {static_cast<dsize_t>(num_elements)};
305 auto new_shape = TensorShape(shapeDetails);
306 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
307 }
308 tensor_row->push_back(std::move(tensor));
309 }
310 return Status::OK();
311 }
312
313 // Overrides base class reset method. When an operator does a reset, it cleans up any state
314 // info from it's previous execution and then initializes itself so that it can be executed
315 // again.
Reset()316 Status MindRecordOp::Reset() {
317 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
318 RETURN_IF_NOT_OK(WaitForWorkers());
319 RETURN_IF_NOT_OK(MappableLeafOp::Reset()); // Call our super class reset first.
320
321 // wakeup workers
322 for (auto &item : worker_tasks_) {
323 item->Post();
324 }
325
326 // wakeup the collector thread
327 wait_for_collector_.Set();
328
329 return Status::OK();
330 }
331
PrepareData()332 Status MindRecordOp::PrepareData() {
333 num_rows_ = shard_reader_->GetNumRows();
334 return Status::OK();
335 }
336
RegisterAndLaunchThreads()337 Status MindRecordOp::RegisterAndLaunchThreads() {
338 RETURN_IF_NOT_OK(ParallelOp::RegisterAndLaunchThreads());
339 RETURN_IF_NOT_OK(shard_reader_->Launch(true));
340 return Status::OK();
341 }
342
CountTotalRows(const std::vector<std::string> dataset_path,bool load_dataset,const std::shared_ptr<ShardOperator> & op,int64_t * count,int64_t num_padded)343 Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
344 const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
345 RETURN_UNEXPECTED_IF_NULL(op);
346 RETURN_UNEXPECTED_IF_NULL(count);
347 std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
348 RETURN_IF_NOT_OK(shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded));
349 return Status::OK();
350 }
351
ComputeColMap()352 Status MindRecordOp::ComputeColMap() {
353 if (column_name_id_map_.empty()) {
354 for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
355 column_name_id_map_[columns_to_load_[i]] = i;
356 }
357 } else {
358 MS_LOG(WARNING) << "Column name map is already set!";
359 }
360 return Status::OK();
361 }
362
AddNewWorkers(int32_t num_new_workers)363 Status MindRecordOp::AddNewWorkers(int32_t num_new_workers) {
364 // wait for workers to process the current rows
365 RETURN_IF_NOT_OK(WaitForWorkers());
366
367 RETURN_IF_NOT_OK(shard_reader_->ExtendRandomFileStreams(num_new_workers));
368 num_mind_record_workers_ += num_new_workers;
369
370 // create queue first
371 for (int32_t i = 0; i < num_new_workers; i++) {
372 RETURN_IF_NOT_OK(worker_in_queues_.AddQueue(tree_->AllTasks()));
373 RETURN_IF_NOT_OK(worker_out_queues_.AddQueue(tree_->AllTasks()));
374 }
375
376 // launch new workers
377 for (int32_t i = 0; i < num_new_workers; i++) {
378 Task *new_task;
379 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
380 Name() + "::WorkerEntry", std::bind(&MindRecordOp::WorkerEntry, this, num_workers_), &new_task, id()));
381 CHECK_FAIL_RETURN_UNEXPECTED(new_task != nullptr, "Cannot create a new worker.");
382 worker_tasks_.push_back(new_task);
383 num_workers_++;
384 MS_LOG(INFO) << "A new worker has been added to op: " << Name() << "::" << id() << " num_workers=" << num_workers_;
385 }
386
387 // wakeup old workers
388 for (auto &item : worker_tasks_) {
389 item->Post();
390 }
391
392 // wakeup the collector thread
393 wait_for_collector_.Set();
394
395 return Status::OK();
396 }
397
RemoveWorkers(int32_t num_workers)398 Status MindRecordOp::RemoveWorkers(int32_t num_workers) {
399 // wait for workers to process the current rows
400 RETURN_IF_NOT_OK(WaitForWorkers());
401
402 num_mind_record_workers_ -= num_workers;
403 RETURN_IF_NOT_OK(shard_reader_->ShrinkRandomFileStreams(num_workers));
404
405 for (int32_t i = 0; i < num_workers; i++) {
406 RETURN_IF_NOT_OK(SendQuitFlagToWorker(num_workers_ - 1));
407 worker_tasks_[num_workers_ - 1]->Post();
408 RETURN_IF_NOT_OK(worker_tasks_[num_workers_ - 1]->Join());
409 RETURN_IF_NOT_OK(worker_in_queues_.RemoveLastQueue());
410 worker_tasks_.pop_back();
411 num_workers_--;
412 MS_LOG(INFO) << "Worker ID " << num_workers_ << " is requested to be removed in operator: " << NameWithID()
413 << " num_workers=" << num_workers_;
414 }
415
416 // wakeup left workers
417 for (auto &item : worker_tasks_) {
418 item->Post();
419 }
420
421 // wakeup the collector thread
422 wait_for_collector_.Set();
423
424 return Status::OK();
425 }
426
InitPullMode()427 Status MindRecordOp::InitPullMode() {
428 num_workers_ = 1;
429 RETURN_IF_NOT_OK(Init());
430 RETURN_IF_NOT_OK(shard_reader_->Launch(true));
431 return this->PrepareData();
432 }
433
LoadTensorRowPullMode(row_id_type row_id,TensorRow * row)434 Status MindRecordOp::LoadTensorRowPullMode(row_id_type row_id, TensorRow *row) {
435 return GetRowFromReader(row, row_id, 0);
436 }
437
438 } // namespace dataset
439 } // namespace mindspore
440