• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h"
17 
18 #include <algorithm>
19 #include <cstdint>
20 #include <utility>
21 
22 #include "utils/ms_utils.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/core/global_context.h"
25 #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
26 #include "minddata/mindrecord/include/shard_column.h"
27 #include "minddata/dataset/engine/db_connector.h"
28 #include "minddata/dataset/engine/execution_tree.h"
29 #include "minddata/dataset/include/dataset/constants.h"
30 #include "minddata/dataset/util/log_adapter.h"
31 
32 namespace mindspore {
33 namespace dataset {
34 
35 using mindrecord::kInt64Len;
36 using mindrecord::MSRStatus;
37 using mindrecord::Schema;
38 using mindrecord::ShardOperator;
39 using mindrecord::ShardReader;
40 
41 // Constructor of the MindRecordOp.
MindRecordOp(int32_t num_mind_record_workers,std::vector<std::string> dataset_file,bool load_dataset,int32_t op_connector_queue_size,const std::vector<std::string> & columns_to_load,const std::vector<std::shared_ptr<ShardOperator>> & operators,int64_t num_padded,const mindrecord::json & sample_json,const std::map<std::string,std::string> & sample_bytes,const ShuffleMode shuffle_mode,std::unique_ptr<ShardReader> shard_reader,std::shared_ptr<SamplerRT> sampler)42 MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset,
43                            int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
44                            const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
45                            const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
46                            const ShuffleMode shuffle_mode, std::unique_ptr<ShardReader> shard_reader,
47                            std::shared_ptr<SamplerRT> sampler)
48     : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
49       dataset_file_(std::move(dataset_file)),
50       load_dataset_(load_dataset),
51       columns_to_load_(columns_to_load),
52       operators_(operators),
53       num_mind_record_workers_(num_mind_record_workers),
54       ended_worker_(0),
55       num_padded_(num_padded),
56       sample_json_(sample_json),
57       sample_bytes_(sample_bytes),
58       shuffle_mode_(shuffle_mode),
59       shard_reader_(std::move(shard_reader)) {
60   io_block_queues_.Init(num_workers_, op_connector_queue_size);
61   epoch_sync_flag_ = true;  // MindRecordOp needs to turn this flag on, otherwise, calling ShuffleTask() before all
62                             // tasks are consumed by the worker threads would cause problem.
63 }
64 
65 // Private helper method to encapsulate some common construction/reset tasks
Init()66 Status MindRecordOp::Init() {
67   RETURN_IF_NOT_OK(shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_,
68                                        operators_, num_padded_));
69 
70   data_schema_ = std::make_unique<DataSchema>();
71 
72   std::vector<std::string> col_names = shard_reader_->GetShardColumn()->GetColumnName();
73   CHECK_FAIL_RETURN_UNEXPECTED(!col_names.empty(), "Invalid data, no column names are specified.");
74   std::vector<mindrecord::ColumnDataType> col_data_types = shard_reader_->GetShardColumn()->GeColumnDataType();
75   std::vector<std::vector<int64_t>> col_shapes = shard_reader_->GetShardColumn()->GetColumnShape();
76 
77   bool load_all_cols = columns_to_load_.empty();  // if columns_to_load_ is empty it means load everything
78   std::map<std::string, int32_t> colname_to_ind;
79   for (uint32_t i = 0; i < col_names.size(); i++) {
80     std::string colname = col_names[i];
81     ColDescriptor col_desc;
82 
83     TensorShape t_shape = TensorShape::CreateUnknownRankShape();  // shape of tensor, default unknown
84     std::string type_str = mindrecord::ColumnDataTypeNameNormalized[col_data_types[i]];
85     DataType t_dtype = DataType(type_str);  // valid types: {"bytes", "string", "int32", "int64", "float32", "float64"}
86 
87     if (col_data_types[i] == mindrecord::ColumnBytes) {  // rank = 1
88       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 1);
89     } else if (col_data_types[i] == mindrecord::ColumnString) {  // rank = 0
90       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 0);
91     } else if (col_shapes[i].size() > 0) {
92       std::vector<dsize_t> vec(col_shapes[i].size());  // temporary vector to hold shape
93       (void)std::copy(col_shapes[i].begin(), col_shapes[i].end(), vec.begin());
94       t_shape = TensorShape(vec);
95       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
96     } else {  // unknown shape
97       // create colDesc and add it to schema
98       col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
99     }
100 
101     colname_to_ind[colname] = data_schema_->NumColumns();
102     RETURN_IF_NOT_OK(data_schema_->AddColumn(col_desc));
103 
104     if (load_all_cols) {
105       columns_to_load_.emplace_back(colname);
106     }
107   }
108 
109   if (!load_all_cols) {
110     std::unique_ptr<DataSchema> tmp_schema = std::make_unique<DataSchema>();
111     for (std::string colname : columns_to_load_) {
112       CHECK_FAIL_RETURN_UNEXPECTED(
113         colname_to_ind.find(colname) != colname_to_ind.end(),
114         "Invalid data, specified loading column name: " + colname + " does not exist in data file.");
115       RETURN_IF_NOT_OK(tmp_schema->AddColumn(data_schema_->Column(colname_to_ind[colname])));
116     }
117     data_schema_ = std::move(tmp_schema);
118   }
119 
120   return Status::OK();
121 }
122 
123 // Destructor
~MindRecordOp()124 MindRecordOp::~MindRecordOp() {}
125 
126 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const127 void MindRecordOp::Print(std::ostream &out, bool show_all) const {
128   if (!show_all) {
129     // Call the super class for displaying any common 1-liner info
130     ParallelOp::Print(out, show_all);
131     // Then show any custom derived-internal 1-liner info for this op
132     out << "\n";
133   } else {
134     // Call the super class for displaying any common detailed info
135     ParallelOp::Print(out, show_all);
136     // Then show any custom derived-internal stuff
137     out << "\nDataset file : ";
138     for (auto &file : dataset_file_) {
139       out << file << " ";
140     }
141     out << "\nNumber of rows : " << num_rows_ << "\nNumber of ShardReader workers : " << num_mind_record_workers_
142         << "\n\n";
143   }
144 }
145 
WorkerEntry(int32_t worker_id)146 Status MindRecordOp::WorkerEntry(int32_t worker_id) {
147   TaskManager::FindMe()->Post();
148   std::unique_ptr<IOBlock> io_block;
149   RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
150   while (io_block != nullptr) {
151     if (io_block->wait()) {
152       // Sync io_block is a signal that master thread wants us to pause and sync with other workers.
153       // The last guy who comes to this sync point should reset the counter and wake up the master thread.
154       if (++num_workers_paused_ == num_workers_) {
155         wait_for_workers_post_.Set();
156       }
157       RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
158       continue;
159     }
160     if (io_block->eoe()) {
161       RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
162       RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
163       continue;
164     }
165     if (io_block->eof()) {
166       RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
167       RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
168       continue;
169     }
170 
171     // load TensorRow
172     std::vector<int64_t> keys;
173     RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
174     if (keys.empty() == true) {
175       {
176         std::unique_lock<std::mutex> lock(ended_worker_mutex_);
177         ended_worker_++;
178         if (ended_worker_ == num_workers_) shard_reader_->Close();
179       }
180       return Status::OK();  // empty key is a quit signal for workers
181     }
182 
183     const uint64_t row_id = keys[0];
184     TensorRow fetched_row;
185 
186     // Get the next row. Push it up to the output connector.
187     if (row_id % LOG_INTERVAL == 0) {
188       MS_LOG(DEBUG) << "MindRecord operator consumed row " << row_id << " by worker " << worker_id << ".";
189     }
190     RETURN_IF_NOT_OK(GetRowFromReader(&fetched_row, row_id, worker_id));
191     RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row), worker_id));
192     RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
193   }
194   RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
195 }
196 
GetRowFromReader(TensorRow * fetched_row,uint64_t row_id,int32_t worker_id)197 Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, uint64_t row_id, int32_t worker_id) {
198   *fetched_row = {};
199   auto rc = shard_reader_->GetNextById(row_id, worker_id);
200   auto task_type = rc.first;
201   auto tupled_buffer = rc.second;
202   if (task_type == mindrecord::TaskType::kPaddedTask) {
203     RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type));
204     std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
205     fetched_row->setPath(file_path);
206     fetched_row->setId(row_id);
207   }
208   if (tupled_buffer.empty()) {
209     return Status::OK();
210   }
211   if (task_type == mindrecord::TaskType::kCommonTask) {
212     for (const auto &tupled_row : tupled_buffer) {
213       std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
214       mindrecord::json columns_json = std::get<1>(tupled_row);
215       RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type));
216       std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
217       fetched_row->setPath(file_path);
218       fetched_row->setId(row_id);
219     }
220   }
221 
222   return Status::OK();
223 }
224 
LoadTensorRow(TensorRow * tensor_row,const std::vector<uint8_t> & columns_blob,const mindrecord::json & columns_json,const mindrecord::TaskType task_type)225 Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
226                                    const mindrecord::json &columns_json, const mindrecord::TaskType task_type) {
227   for (int32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
228     auto column_name = columns_to_load_[i_col];
229 
230     // Initialize column parameters
231     const unsigned char *data = nullptr;
232     std::unique_ptr<unsigned char[]> data_ptr;
233     uint64_t n_bytes = 0;
234     mindrecord::ColumnDataType column_data_type = mindrecord::ColumnNoDataType;
235     uint64_t column_data_type_size = 1;
236     std::vector<int64_t> column_shape;
237 
238     // Get column data
239     auto shard_column = shard_reader_->GetShardColumn();
240     if (num_padded_ > 0 && task_type == mindrecord::TaskType::kPaddedTask) {
241       mindrecord::ColumnCategory category;
242       RETURN_IF_NOT_OK(shard_column->GetColumnTypeByName(column_name, &column_data_type, &column_data_type_size,
243                                                          &column_shape, &category));
244       if (category == mindrecord::ColumnInRaw) {
245         RETURN_IF_NOT_OK(shard_column->GetColumnFromJson(column_name, sample_json_, &data_ptr, &n_bytes));
246       } else if (category == mindrecord::ColumnInBlob) {
247         CHECK_FAIL_RETURN_UNEXPECTED(sample_bytes_.find(column_name) != sample_bytes_.end(),
248                                      "Invalid data, failed to retrieve blob data from padding sample.");
249 
250         std::string ss(sample_bytes_[column_name]);
251         n_bytes = ss.size();
252         data_ptr = std::make_unique<unsigned char[]>(n_bytes);
253         std::copy(ss.begin(), ss.end(), data_ptr.get());
254       } else {
255         RETURN_STATUS_UNEXPECTED("Invalid data, retrieved data type is unknown.");
256       }
257       if (data == nullptr) {
258         data = reinterpret_cast<const unsigned char *>(data_ptr.get());
259       }
260     } else {
261       RETURN_IF_NOT_OK(shard_column->GetColumnValueByName(column_name, columns_blob, columns_json, &data, &data_ptr,
262                                                           &n_bytes, &column_data_type, &column_data_type_size,
263                                                           &column_shape));
264     }
265 
266     std::shared_ptr<Tensor> tensor;
267     const ColDescriptor &column = data_schema_->Column(i_col);
268     DataType type = column.Type();
269 
270     // Set shape
271     CHECK_FAIL_RETURN_UNEXPECTED(column_data_type_size != 0, "Found memory size of column data type is 0.");
272     auto num_elements = n_bytes / column_data_type_size;
273     if (type == DataType::DE_STRING) {
274       std::string s{data, data + n_bytes};
275       RETURN_IF_NOT_OK(Tensor::CreateScalar(s, &tensor));
276     } else if (column.HasShape()) {
277       auto new_shape = TensorShape(column.Shape());
278       // if the numpy is null, create empty tensor shape
279       if (num_elements == 0) {
280         new_shape = TensorShape({});
281       } else {
282         RETURN_IF_NOT_OK(column.MaterializeTensorShape(static_cast<int32_t>(num_elements), &new_shape));
283       }
284       RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
285     } else {
286       std::vector<dsize_t> shapeDetails = {static_cast<dsize_t>(num_elements)};
287       auto new_shape = TensorShape(shapeDetails);
288       RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
289     }
290     tensor_row->push_back(std::move(tensor));
291   }
292   return Status::OK();
293 }
294 
295 // Overrides base class reset method.  When an operator does a reset, it cleans up any state
296 // info from it's previous execution and then initializes itself so that it can be executed
297 // again.
Reset()298 Status MindRecordOp::Reset() {
299   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
300   RETURN_IF_NOT_OK(MappableLeafOp::Reset());  // Call our super class reset first.
301   return Status::OK();
302 }
303 
LaunchThreadsAndInitOp()304 Status MindRecordOp::LaunchThreadsAndInitOp() {
305   RETURN_UNEXPECTED_IF_NULL(tree_);
306   RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
307   RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
308   RETURN_IF_NOT_OK(shard_reader_->Launch(true));
309   // Launch main workers that load TensorRows by reading all images
310   RETURN_IF_NOT_OK(
311     tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id()));
312   num_rows_ = shard_reader_->GetNumRows();
313   RETURN_IF_NOT_OK(this->InitSampler());  // pass numRows to Sampler
314   TaskManager::FindMe()->Post();
315   return Status::OK();
316 }
317 
CountTotalRows(const std::vector<std::string> dataset_path,bool load_dataset,const std::shared_ptr<ShardOperator> & op,int64_t * count,int64_t num_padded)318 Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
319                                     const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
320   std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
321   RETURN_IF_NOT_OK(shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded));
322   return Status::OK();
323 }
324 
ComputeColMap()325 Status MindRecordOp::ComputeColMap() {
326   if (column_name_id_map_.empty()) {
327     for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
328       column_name_id_map_[columns_to_load_[i]] = i;
329     }
330   } else {
331     MS_LOG(WARNING) << "Column name map is already set!";
332   }
333   return Status::OK();
334 }
335 
336 }  // namespace dataset
337 }  // namespace mindspore
338