• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h"
17 
18 #include <algorithm>
19 #include <cstdint>
20 #include <utility>
21 
22 #include "utils/ms_utils.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/core/global_context.h"
25 #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
26 #include "minddata/mindrecord/include/shard_column.h"
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/include/dataset/constants.h"
29 #include "minddata/dataset/util/log_adapter.h"
30 
31 namespace mindspore {
32 namespace dataset {
33 
34 using mindrecord::kInt64Len;
35 using mindrecord::MSRStatus;
36 using mindrecord::Schema;
37 using mindrecord::ShardOperator;
38 using mindrecord::ShardReader;
39 
40 // Constructor of the MindRecordOp.
MindRecordOp(int32_t num_mind_record_workers,std::vector<std::string> dataset_file,bool load_dataset,int32_t op_connector_queue_size,const std::vector<std::string> & columns_to_load,const std::vector<std::shared_ptr<ShardOperator>> & operators,int64_t num_padded,const mindrecord::json & sample_json,const std::map<std::string,std::string> & sample_bytes,const ShuffleMode shuffle_mode,std::unique_ptr<ShardReader> shard_reader,std::shared_ptr<SamplerRT> sampler)41 MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset,
42                            int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
43                            const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
44                            const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
45                            const ShuffleMode shuffle_mode, std::unique_ptr<ShardReader> shard_reader,
46                            std::shared_ptr<SamplerRT> sampler)
47     : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
48       dataset_file_(std::move(dataset_file)),
49       load_dataset_(load_dataset),
50       columns_to_load_(columns_to_load),
51       operators_(operators),
52       num_mind_record_workers_(num_mind_record_workers),
53       ended_worker_(0),
54       num_padded_(num_padded),
55       sample_json_(sample_json),
56       sample_bytes_(sample_bytes),
57       shuffle_mode_(shuffle_mode),
58       shard_reader_(std::move(shard_reader)) {
59   epoch_sync_flag_ = true;  // MindRecordOp needs to turn this flag on, otherwise, calling ShuffleTask() before all
60                             // tasks are consumed by the worker threads would cause problem.
61 }
62 
63 // Private helper method to encapsulate some common construction/reset tasks
Init()64 Status MindRecordOp::Init() {
65   RETURN_IF_NOT_OK(shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_,
66                                        operators_, num_padded_));
67 
68   data_schema_ = std::make_unique<DataSchema>();
69 
70   std::vector<std::string> col_names = shard_reader_->GetShardColumn()->GetColumnName();
71   CHECK_FAIL_RETURN_UNEXPECTED(!col_names.empty(),
72                                "Invalid column, no column names are specified, check mindrecord file.");
73   std::vector<mindrecord::ColumnDataType> col_data_types = shard_reader_->GetShardColumn()->GeColumnDataType();
74   std::vector<std::vector<int64_t>> col_shapes = shard_reader_->GetShardColumn()->GetColumnShape();
75 
76   bool load_all_cols = columns_to_load_.empty();  // if columns_to_load_ is empty it means load everything
77   std::map<std::string, int32_t> colname_to_ind;
78   for (uint32_t i = 0; i < col_names.size(); i++) {
79     std::string colname = col_names[i];
80     ColDescriptor col_desc;
81 
82     TensorShape t_shape = TensorShape::CreateUnknownRankShape();  // shape of tensor, default unknown
83     std::string type_str = mindrecord::ColumnDataTypeNameNormalized[col_data_types[i]];
84     DataType t_dtype = DataType(type_str);  // valid types: {"bytes", "string", "int32", "int64", "float32", "float64"}
85 
86     if (col_data_types[i] == mindrecord::ColumnBytes) {  // rank = 1
87       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 1);
88     } else if (col_data_types[i] == mindrecord::ColumnString) {  // rank = 0
89       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 0);
90     } else if (col_shapes[i].size() > 0) {
91       std::vector<dsize_t> vec(col_shapes[i].size());  // temporary vector to hold shape
92       (void)std::copy(col_shapes[i].begin(), col_shapes[i].end(), vec.begin());
93       t_shape = TensorShape(vec);
94       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
95     } else {  // unknown shape
96       // create colDesc and add it to schema
97       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
98     }
99 
100     colname_to_ind[colname] = data_schema_->NumColumns();
101     RETURN_IF_NOT_OK(data_schema_->AddColumn(col_desc));
102 
103     if (load_all_cols) {
104       columns_to_load_.emplace_back(colname);
105     }
106   }
107 
108   if (!load_all_cols) {
109     std::unique_ptr<DataSchema> tmp_schema = std::make_unique<DataSchema>();
110     for (std::string colname : columns_to_load_) {
111       CHECK_FAIL_RETURN_UNEXPECTED(colname_to_ind.find(colname) != colname_to_ind.end(),
112                                    "Invalid column, " + colname + " does not exist in data file.");
113       RETURN_IF_NOT_OK(tmp_schema->AddColumn(data_schema_->Column(colname_to_ind[colname])));
114     }
115     data_schema_ = std::move(tmp_schema);
116   }
117 
118   return Status::OK();
119 }
120 
121 // Destructor
~MindRecordOp()122 MindRecordOp::~MindRecordOp() {}
123 
124 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const125 void MindRecordOp::Print(std::ostream &out, bool show_all) const {
126   if (!show_all) {
127     // Call the super class for displaying any common 1-liner info
128     ParallelOp::Print(out, show_all);
129     // Then show any custom derived-internal 1-liner info for this op
130     out << "\n";
131   } else {
132     // Call the super class for displaying any common detailed info
133     ParallelOp::Print(out, show_all);
134     // Then show any custom derived-internal stuff
135     out << "\nDataset file : ";
136     for (auto &file : dataset_file_) {
137       out << file << " ";
138     }
139     out << "\nNumber of rows : " << num_rows_ << "\nNumber of ShardReader workers : " << num_mind_record_workers_
140         << "\n\n";
141   }
142 }
143 
WorkerEntry(int32_t worker_id)144 Status MindRecordOp::WorkerEntry(int32_t worker_id) {
145   TaskManager::FindMe()->Post();
146   std::unique_ptr<IOBlock> io_block;
147 
148   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
149   RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
150   RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
151   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
152 
153   while (io_block != nullptr) {
154     if (io_block->wait()) {
155       RETURN_IF_NOT_OK(
156         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
157       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagWait)));
158       RETURN_IF_NOT_OK(TaskManager::FindMe()->Wait());  // wait for auto tune update workers successful
159       TaskManager::FindMe()->Clear();
160     } else if (io_block->eoe()) {
161       RETURN_IF_NOT_OK(
162         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
163       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOE)));
164     } else if (io_block->eof()) {
165       RETURN_IF_NOT_OK(
166         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
167       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(TensorRow(TensorRow::TensorRowFlags::kFlagEOF)));
168     } else {
169       // load TensorRow
170       std::vector<int64_t> keys;
171       RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
172       if (keys.empty() == true) {
173         {
174           std::unique_lock<std::mutex> lock(ended_worker_mutex_);
175           ended_worker_++;
176           if (ended_worker_ == num_workers_) {
177             shard_reader_->Close();
178           }
179         }
180         RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess",
181                                           {{"TensorRowFlags", IOBlock(IOBlock::kFlagQuit).FlagName()}}));
182         return Status::OK();  // empty key is a quit signal for workers
183       }
184 
185       const uint64_t row_id = keys[0];
186       TensorRow fetched_row;
187 
188       // Get the next row. Push it up to the output connector.
189       if (row_id % LOG_INTERVAL == 0) {
190         MS_LOG(DEBUG) << "MindRecord operator consumed row " << row_id << " by worker " << worker_id << ".";
191       }
192       RETURN_IF_NOT_OK(GetRowFromReader(&fetched_row, row_id, worker_id));
193       RETURN_IF_NOT_OK(
194         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
195       RETURN_IF_NOT_OK(worker_out_queues_[worker_id]->EmplaceBack(std::move(fetched_row)));
196     }
197     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
198     RETURN_IF_NOT_OK(worker_in_queues_[worker_id]->PopFront(&io_block));
199     RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
200     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
201   }
202   RETURN_STATUS_UNEXPECTED("[Internal ERROR] Unexpected nullptr received in worker.");
203 }
204 
GetRowFromReader(TensorRow * fetched_row,uint64_t row_id,int32_t worker_id)205 Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, uint64_t row_id, int32_t worker_id) {
206   RETURN_UNEXPECTED_IF_NULL(fetched_row);
207   *fetched_row = {};
208   auto task_content_ptr = std::make_shared<mindrecord::TASK_CONTENT>(
209     mindrecord::TaskType::kCommonTask, std::vector<std::tuple<std::vector<uint8_t>, mindrecord::json>>());
210   RETURN_IF_NOT_OK(shard_reader_->GetNextById(row_id, worker_id, &task_content_ptr));
211   auto task_type = task_content_ptr->first;
212   auto tupled_buffer = task_content_ptr->second;
213   if (task_type == mindrecord::TaskType::kPaddedTask) {
214     RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type));
215     std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
216     fetched_row->setPath(file_path);
217     fetched_row->setId(row_id);
218   }
219   if (tupled_buffer.empty()) {
220     return Status::OK();
221   }
222   if (task_type == mindrecord::TaskType::kCommonTask) {
223     for (const auto &tupled_row : tupled_buffer) {
224       std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
225       mindrecord::json columns_json = std::get<1>(tupled_row);
226       RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type));
227       std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
228       fetched_row->setPath(file_path);
229       fetched_row->setId(row_id);
230     }
231   }
232 
233   return Status::OK();
234 }
235 
LoadTensorRow(TensorRow * tensor_row,const std::vector<uint8_t> & columns_blob,const mindrecord::json & columns_json,const mindrecord::TaskType task_type)236 Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
237                                    const mindrecord::json &columns_json, const mindrecord::TaskType task_type) {
238   RETURN_UNEXPECTED_IF_NULL(tensor_row);
239   for (int32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
240     auto column_name = columns_to_load_[i_col];
241 
242     // Initialize column parameters
243     const unsigned char *data = nullptr;
244     std::unique_ptr<unsigned char[]> data_ptr;
245     uint64_t n_bytes = 0;
246     mindrecord::ColumnDataType column_data_type = mindrecord::ColumnNoDataType;
247     uint64_t column_data_type_size = 1;
248     std::vector<int64_t> column_shape;
249 
250     // Get column data
251     auto shard_column = shard_reader_->GetShardColumn();
252     if (num_padded_ > 0 && task_type == mindrecord::TaskType::kPaddedTask) {
253       mindrecord::ColumnCategory category;
254       RETURN_IF_NOT_OK(shard_column->GetColumnTypeByName(column_name, &column_data_type, &column_data_type_size,
255                                                          &column_shape, &category));
256       if (category == mindrecord::ColumnInRaw) {
257         RETURN_IF_NOT_OK(shard_column->GetColumnFromJson(column_name, sample_json_, &data_ptr, &n_bytes));
258       } else if (category == mindrecord::ColumnInBlob) {
259         CHECK_FAIL_RETURN_UNEXPECTED(sample_bytes_.find(column_name) != sample_bytes_.end(),
260                                      "Invalid padded_sample, failed to retrieve blob data from padding sample, "
261                                      "check 'padded_sample'.");
262 
263         std::string ss(sample_bytes_[column_name]);
264         n_bytes = ss.size();
265         data_ptr = std::make_unique<unsigned char[]>(n_bytes);
266         (void)std::copy(ss.begin(), ss.end(), data_ptr.get());
267       } else {
268         RETURN_STATUS_UNEXPECTED("Invalid datatype, retrieved data type is unknown.");
269       }
270       if (data == nullptr) {
271         data = reinterpret_cast<const unsigned char *>(data_ptr.get());
272       }
273     } else {
274       RETURN_IF_NOT_OK(shard_column->GetColumnValueByName(column_name, columns_blob, columns_json, &data, &data_ptr,
275                                                           &n_bytes, &column_data_type, &column_data_type_size,
276                                                           &column_shape));
277     }
278 
279     std::shared_ptr<Tensor> tensor;
280     const ColDescriptor &column = data_schema_->Column(i_col);
281     DataType type = column.Type();
282 
283     // Set shape
284     CHECK_FAIL_RETURN_UNEXPECTED(column_data_type_size != 0,
285                                  "[Internal ERROR] Found memory size of column data type is 0.");
286     auto num_elements = n_bytes / column_data_type_size;
287     if (type == DataType::DE_STRING) {
288       std::string s{data, data + n_bytes};
289       RETURN_IF_NOT_OK(Tensor::CreateScalar(s, &tensor));
290     } else if (type == DataType::DE_BYTES) {
291       std::vector<std::string> strings;
292       strings.push_back(std::string(reinterpret_cast<const char *>(data), num_elements));
293       RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, TensorShape({1}), DataType(DataType::DE_BYTES), &tensor));
294     } else if (column.HasShape()) {
295       auto new_shape = TensorShape(column.Shape());
296       // if the numpy is null, create empty tensor shape
297       if (num_elements == 0) {
298         new_shape = TensorShape({});
299       } else {
300         RETURN_IF_NOT_OK(column.MaterializeTensorShape(static_cast<int32_t>(num_elements), &new_shape));
301       }
302       RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
303     } else {
304       std::vector<dsize_t> shapeDetails = {static_cast<dsize_t>(num_elements)};
305       auto new_shape = TensorShape(shapeDetails);
306       RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
307     }
308     tensor_row->push_back(std::move(tensor));
309   }
310   return Status::OK();
311 }
312 
313 // Overrides base class reset method.  When an operator does a reset, it cleans up any state
314 // info from it's previous execution and then initializes itself so that it can be executed
315 // again.
Reset()316 Status MindRecordOp::Reset() {
317   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
318   RETURN_IF_NOT_OK(WaitForWorkers());
319   RETURN_IF_NOT_OK(MappableLeafOp::Reset());  // Call our super class reset first.
320 
321   // wakeup workers
322   for (auto &item : worker_tasks_) {
323     item->Post();
324   }
325 
326   // wakeup the collector thread
327   wait_for_collector_.Set();
328 
329   return Status::OK();
330 }
331 
PrepareData()332 Status MindRecordOp::PrepareData() {
333   num_rows_ = shard_reader_->GetNumRows();
334   return Status::OK();
335 }
336 
RegisterAndLaunchThreads()337 Status MindRecordOp::RegisterAndLaunchThreads() {
338   RETURN_IF_NOT_OK(ParallelOp::RegisterAndLaunchThreads());
339   RETURN_IF_NOT_OK(shard_reader_->Launch(true));
340   return Status::OK();
341 }
342 
CountTotalRows(const std::vector<std::string> dataset_path,bool load_dataset,const std::shared_ptr<ShardOperator> & op,int64_t * count,int64_t num_padded)343 Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
344                                     const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
345   RETURN_UNEXPECTED_IF_NULL(op);
346   RETURN_UNEXPECTED_IF_NULL(count);
347   std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
348   RETURN_IF_NOT_OK(shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded));
349   return Status::OK();
350 }
351 
ComputeColMap()352 Status MindRecordOp::ComputeColMap() {
353   if (column_name_id_map_.empty()) {
354     for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
355       column_name_id_map_[columns_to_load_[i]] = i;
356     }
357   } else {
358     MS_LOG(WARNING) << "Column name map is already set!";
359   }
360   return Status::OK();
361 }
362 
AddNewWorkers(int32_t num_new_workers)363 Status MindRecordOp::AddNewWorkers(int32_t num_new_workers) {
364   // wait for workers to process the current rows
365   RETURN_IF_NOT_OK(WaitForWorkers());
366 
367   RETURN_IF_NOT_OK(shard_reader_->ExtendRandomFileStreams(num_new_workers));
368   num_mind_record_workers_ += num_new_workers;
369 
370   // create queue first
371   for (int32_t i = 0; i < num_new_workers; i++) {
372     RETURN_IF_NOT_OK(worker_in_queues_.AddQueue(tree_->AllTasks()));
373     RETURN_IF_NOT_OK(worker_out_queues_.AddQueue(tree_->AllTasks()));
374   }
375 
376   // launch new workers
377   for (int32_t i = 0; i < num_new_workers; i++) {
378     Task *new_task;
379     RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
380       Name() + "::WorkerEntry", std::bind(&MindRecordOp::WorkerEntry, this, num_workers_), &new_task, id()));
381     CHECK_FAIL_RETURN_UNEXPECTED(new_task != nullptr, "Cannot create a new worker.");
382     worker_tasks_.push_back(new_task);
383     num_workers_++;
384     MS_LOG(INFO) << "A new worker has been added to op: " << Name() << "::" << id() << " num_workers=" << num_workers_;
385   }
386 
387   // wakeup old workers
388   for (auto &item : worker_tasks_) {
389     item->Post();
390   }
391 
392   // wakeup the collector thread
393   wait_for_collector_.Set();
394 
395   return Status::OK();
396 }
397 
RemoveWorkers(int32_t num_workers)398 Status MindRecordOp::RemoveWorkers(int32_t num_workers) {
399   // wait for workers to process the current rows
400   RETURN_IF_NOT_OK(WaitForWorkers());
401 
402   num_mind_record_workers_ -= num_workers;
403   RETURN_IF_NOT_OK(shard_reader_->ShrinkRandomFileStreams(num_workers));
404 
405   for (int32_t i = 0; i < num_workers; i++) {
406     RETURN_IF_NOT_OK(SendQuitFlagToWorker(num_workers_ - 1));
407     worker_tasks_[num_workers_ - 1]->Post();
408     RETURN_IF_NOT_OK(worker_tasks_[num_workers_ - 1]->Join());
409     RETURN_IF_NOT_OK(worker_in_queues_.RemoveLastQueue());
410     worker_tasks_.pop_back();
411     num_workers_--;
412     MS_LOG(INFO) << "Worker ID " << num_workers_ << " is requested to be removed in operator: " << NameWithID()
413                  << " num_workers=" << num_workers_;
414   }
415 
416   // wakeup left workers
417   for (auto &item : worker_tasks_) {
418     item->Post();
419   }
420 
421   // wakeup the collector thread
422   wait_for_collector_.Set();
423 
424   return Status::OK();
425 }
426 
InitPullMode()427 Status MindRecordOp::InitPullMode() {
428   num_workers_ = 1;
429   RETURN_IF_NOT_OK(Init());
430   RETURN_IF_NOT_OK(shard_reader_->Launch(true));
431   return this->PrepareData();
432 }
433 
LoadTensorRowPullMode(row_id_type row_id,TensorRow * row)434 Status MindRecordOp::LoadTensorRowPullMode(row_id_type row_id, TensorRow *row) {
435   return GetRowFromReader(row, row_id, 0);
436 }
437 
438 }  // namespace dataset
439 }  // namespace mindspore
440