1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h"
17
18 #include <algorithm>
19 #include <cstdint>
20 #include <utility>
21
22 #include "utils/ms_utils.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/core/global_context.h"
25 #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h"
26 #include "minddata/mindrecord/include/shard_column.h"
27 #include "minddata/dataset/engine/db_connector.h"
28 #include "minddata/dataset/engine/execution_tree.h"
29 #include "minddata/dataset/include/dataset/constants.h"
30 #include "minddata/dataset/util/log_adapter.h"
31
32 namespace mindspore {
33 namespace dataset {
34
35 using mindrecord::kInt64Len;
36 using mindrecord::MSRStatus;
37 using mindrecord::Schema;
38 using mindrecord::ShardOperator;
39 using mindrecord::ShardReader;
40
41 // Constructor of the MindRecordOp.
MindRecordOp(int32_t num_mind_record_workers,std::vector<std::string> dataset_file,bool load_dataset,int32_t op_connector_queue_size,const std::vector<std::string> & columns_to_load,const std::vector<std::shared_ptr<ShardOperator>> & operators,int64_t num_padded,const mindrecord::json & sample_json,const std::map<std::string,std::string> & sample_bytes,const ShuffleMode shuffle_mode,std::unique_ptr<ShardReader> shard_reader,std::shared_ptr<SamplerRT> sampler)42 MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset,
43 int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load,
44 const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded,
45 const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes,
46 const ShuffleMode shuffle_mode, std::unique_ptr<ShardReader> shard_reader,
47 std::shared_ptr<SamplerRT> sampler)
48 : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)),
49 dataset_file_(std::move(dataset_file)),
50 load_dataset_(load_dataset),
51 columns_to_load_(columns_to_load),
52 operators_(operators),
53 num_mind_record_workers_(num_mind_record_workers),
54 ended_worker_(0),
55 num_padded_(num_padded),
56 sample_json_(sample_json),
57 sample_bytes_(sample_bytes),
58 shuffle_mode_(shuffle_mode),
59 shard_reader_(std::move(shard_reader)) {
60 io_block_queues_.Init(num_workers_, op_connector_queue_size);
61 epoch_sync_flag_ = true; // MindRecordOp needs to turn this flag on, otherwise, calling ShuffleTask() before all
62 // tasks are consumed by the worker threads would cause problem.
63 }
64
65 // Private helper method to encapsulate some common construction/reset tasks
Init()66 Status MindRecordOp::Init() {
67 RETURN_IF_NOT_OK(shard_reader_->Open(dataset_file_, load_dataset_, num_mind_record_workers_, columns_to_load_,
68 operators_, num_padded_));
69
70 data_schema_ = std::make_unique<DataSchema>();
71
72 std::vector<std::string> col_names = shard_reader_->GetShardColumn()->GetColumnName();
73 CHECK_FAIL_RETURN_UNEXPECTED(!col_names.empty(), "Invalid data, no column names are specified.");
74 std::vector<mindrecord::ColumnDataType> col_data_types = shard_reader_->GetShardColumn()->GeColumnDataType();
75 std::vector<std::vector<int64_t>> col_shapes = shard_reader_->GetShardColumn()->GetColumnShape();
76
77 bool load_all_cols = columns_to_load_.empty(); // if columns_to_load_ is empty it means load everything
78 std::map<std::string, int32_t> colname_to_ind;
79 for (uint32_t i = 0; i < col_names.size(); i++) {
80 std::string colname = col_names[i];
81 ColDescriptor col_desc;
82
83 TensorShape t_shape = TensorShape::CreateUnknownRankShape(); // shape of tensor, default unknown
84 std::string type_str = mindrecord::ColumnDataTypeNameNormalized[col_data_types[i]];
85 DataType t_dtype = DataType(type_str); // valid types: {"bytes", "string", "int32", "int64", "float32", "float64"}
86
87 if (col_data_types[i] == mindrecord::ColumnBytes) { // rank = 1
88 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 1);
89 } else if (col_data_types[i] == mindrecord::ColumnString) { // rank = 0
90 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, 0);
91 } else if (col_shapes[i].size() > 0) {
92 std::vector<dsize_t> vec(col_shapes[i].size()); // temporary vector to hold shape
93 (void)std::copy(col_shapes[i].begin(), col_shapes[i].end(), vec.begin());
94 t_shape = TensorShape(vec);
95 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
96 } else { // unknown shape
97 // create colDesc and add it to schema
98 col_desc = ColDescriptor(colname, t_dtype, TensorImpl::kFlexible, t_shape.Rank(), &t_shape);
99 }
100
101 colname_to_ind[colname] = data_schema_->NumColumns();
102 RETURN_IF_NOT_OK(data_schema_->AddColumn(col_desc));
103
104 if (load_all_cols) {
105 columns_to_load_.emplace_back(colname);
106 }
107 }
108
109 if (!load_all_cols) {
110 std::unique_ptr<DataSchema> tmp_schema = std::make_unique<DataSchema>();
111 for (std::string colname : columns_to_load_) {
112 CHECK_FAIL_RETURN_UNEXPECTED(
113 colname_to_ind.find(colname) != colname_to_ind.end(),
114 "Invalid data, specified loading column name: " + colname + " does not exist in data file.");
115 RETURN_IF_NOT_OK(tmp_schema->AddColumn(data_schema_->Column(colname_to_ind[colname])));
116 }
117 data_schema_ = std::move(tmp_schema);
118 }
119
120 return Status::OK();
121 }
122
123 // Destructor
~MindRecordOp()124 MindRecordOp::~MindRecordOp() {}
125
126 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const127 void MindRecordOp::Print(std::ostream &out, bool show_all) const {
128 if (!show_all) {
129 // Call the super class for displaying any common 1-liner info
130 ParallelOp::Print(out, show_all);
131 // Then show any custom derived-internal 1-liner info for this op
132 out << "\n";
133 } else {
134 // Call the super class for displaying any common detailed info
135 ParallelOp::Print(out, show_all);
136 // Then show any custom derived-internal stuff
137 out << "\nDataset file : ";
138 for (auto &file : dataset_file_) {
139 out << file << " ";
140 }
141 out << "\nNumber of rows : " << num_rows_ << "\nNumber of ShardReader workers : " << num_mind_record_workers_
142 << "\n\n";
143 }
144 }
145
WorkerEntry(int32_t worker_id)146 Status MindRecordOp::WorkerEntry(int32_t worker_id) {
147 TaskManager::FindMe()->Post();
148 std::unique_ptr<IOBlock> io_block;
149 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
150 while (io_block != nullptr) {
151 if (io_block->wait()) {
152 // Sync io_block is a signal that master thread wants us to pause and sync with other workers.
153 // The last guy who comes to this sync point should reset the counter and wake up the master thread.
154 if (++num_workers_paused_ == num_workers_) {
155 wait_for_workers_post_.Set();
156 }
157 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
158 continue;
159 }
160 if (io_block->eoe()) {
161 RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
162 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
163 continue;
164 }
165 if (io_block->eof()) {
166 RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
167 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
168 continue;
169 }
170
171 // load TensorRow
172 std::vector<int64_t> keys;
173 RETURN_IF_NOT_OK(io_block->GetKeys(&keys));
174 if (keys.empty() == true) {
175 {
176 std::unique_lock<std::mutex> lock(ended_worker_mutex_);
177 ended_worker_++;
178 if (ended_worker_ == num_workers_) shard_reader_->Close();
179 }
180 return Status::OK(); // empty key is a quit signal for workers
181 }
182
183 const uint64_t row_id = keys[0];
184 TensorRow fetched_row;
185
186 // Get the next row. Push it up to the output connector.
187 if (row_id % LOG_INTERVAL == 0) {
188 MS_LOG(DEBUG) << "MindRecord operator consumed row " << row_id << " by worker " << worker_id << ".";
189 }
190 RETURN_IF_NOT_OK(GetRowFromReader(&fetched_row, row_id, worker_id));
191 RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row), worker_id));
192 RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&io_block));
193 }
194 RETURN_STATUS_UNEXPECTED("Unexpected nullptr received in worker.");
195 }
196
GetRowFromReader(TensorRow * fetched_row,uint64_t row_id,int32_t worker_id)197 Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, uint64_t row_id, int32_t worker_id) {
198 *fetched_row = {};
199 auto rc = shard_reader_->GetNextById(row_id, worker_id);
200 auto task_type = rc.first;
201 auto tupled_buffer = rc.second;
202 if (task_type == mindrecord::TaskType::kPaddedTask) {
203 RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type));
204 std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
205 fetched_row->setPath(file_path);
206 fetched_row->setId(row_id);
207 }
208 if (tupled_buffer.empty()) {
209 return Status::OK();
210 }
211 if (task_type == mindrecord::TaskType::kCommonTask) {
212 for (const auto &tupled_row : tupled_buffer) {
213 std::vector<uint8_t> columns_blob = std::get<0>(tupled_row);
214 mindrecord::json columns_json = std::get<1>(tupled_row);
215 RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type));
216 std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]);
217 fetched_row->setPath(file_path);
218 fetched_row->setId(row_id);
219 }
220 }
221
222 return Status::OK();
223 }
224
LoadTensorRow(TensorRow * tensor_row,const std::vector<uint8_t> & columns_blob,const mindrecord::json & columns_json,const mindrecord::TaskType task_type)225 Status MindRecordOp::LoadTensorRow(TensorRow *tensor_row, const std::vector<uint8_t> &columns_blob,
226 const mindrecord::json &columns_json, const mindrecord::TaskType task_type) {
227 for (int32_t i_col = 0; i_col < columns_to_load_.size(); i_col++) {
228 auto column_name = columns_to_load_[i_col];
229
230 // Initialize column parameters
231 const unsigned char *data = nullptr;
232 std::unique_ptr<unsigned char[]> data_ptr;
233 uint64_t n_bytes = 0;
234 mindrecord::ColumnDataType column_data_type = mindrecord::ColumnNoDataType;
235 uint64_t column_data_type_size = 1;
236 std::vector<int64_t> column_shape;
237
238 // Get column data
239 auto shard_column = shard_reader_->GetShardColumn();
240 if (num_padded_ > 0 && task_type == mindrecord::TaskType::kPaddedTask) {
241 mindrecord::ColumnCategory category;
242 RETURN_IF_NOT_OK(shard_column->GetColumnTypeByName(column_name, &column_data_type, &column_data_type_size,
243 &column_shape, &category));
244 if (category == mindrecord::ColumnInRaw) {
245 RETURN_IF_NOT_OK(shard_column->GetColumnFromJson(column_name, sample_json_, &data_ptr, &n_bytes));
246 } else if (category == mindrecord::ColumnInBlob) {
247 CHECK_FAIL_RETURN_UNEXPECTED(sample_bytes_.find(column_name) != sample_bytes_.end(),
248 "Invalid data, failed to retrieve blob data from padding sample.");
249
250 std::string ss(sample_bytes_[column_name]);
251 n_bytes = ss.size();
252 data_ptr = std::make_unique<unsigned char[]>(n_bytes);
253 std::copy(ss.begin(), ss.end(), data_ptr.get());
254 } else {
255 RETURN_STATUS_UNEXPECTED("Invalid data, retrieved data type is unknown.");
256 }
257 if (data == nullptr) {
258 data = reinterpret_cast<const unsigned char *>(data_ptr.get());
259 }
260 } else {
261 RETURN_IF_NOT_OK(shard_column->GetColumnValueByName(column_name, columns_blob, columns_json, &data, &data_ptr,
262 &n_bytes, &column_data_type, &column_data_type_size,
263 &column_shape));
264 }
265
266 std::shared_ptr<Tensor> tensor;
267 const ColDescriptor &column = data_schema_->Column(i_col);
268 DataType type = column.Type();
269
270 // Set shape
271 CHECK_FAIL_RETURN_UNEXPECTED(column_data_type_size != 0, "Found memory size of column data type is 0.");
272 auto num_elements = n_bytes / column_data_type_size;
273 if (type == DataType::DE_STRING) {
274 std::string s{data, data + n_bytes};
275 RETURN_IF_NOT_OK(Tensor::CreateScalar(s, &tensor));
276 } else if (column.HasShape()) {
277 auto new_shape = TensorShape(column.Shape());
278 // if the numpy is null, create empty tensor shape
279 if (num_elements == 0) {
280 new_shape = TensorShape({});
281 } else {
282 RETURN_IF_NOT_OK(column.MaterializeTensorShape(static_cast<int32_t>(num_elements), &new_shape));
283 }
284 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
285 } else {
286 std::vector<dsize_t> shapeDetails = {static_cast<dsize_t>(num_elements)};
287 auto new_shape = TensorShape(shapeDetails);
288 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(new_shape, type, data, &tensor));
289 }
290 tensor_row->push_back(std::move(tensor));
291 }
292 return Status::OK();
293 }
294
295 // Overrides base class reset method. When an operator does a reset, it cleans up any state
296 // info from it's previous execution and then initializes itself so that it can be executed
297 // again.
Reset()298 Status MindRecordOp::Reset() {
299 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
300 RETURN_IF_NOT_OK(MappableLeafOp::Reset()); // Call our super class reset first.
301 return Status::OK();
302 }
303
LaunchThreadsAndInitOp()304 Status MindRecordOp::LaunchThreadsAndInitOp() {
305 RETURN_UNEXPECTED_IF_NULL(tree_);
306 RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
307 RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
308 RETURN_IF_NOT_OK(shard_reader_->Launch(true));
309 // Launch main workers that load TensorRows by reading all images
310 RETURN_IF_NOT_OK(
311 tree_->LaunchWorkers(num_workers_, std::bind(&MindRecordOp::WorkerEntry, this, std::placeholders::_1), "", id()));
312 num_rows_ = shard_reader_->GetNumRows();
313 RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler
314 TaskManager::FindMe()->Post();
315 return Status::OK();
316 }
317
CountTotalRows(const std::vector<std::string> dataset_path,bool load_dataset,const std::shared_ptr<ShardOperator> & op,int64_t * count,int64_t num_padded)318 Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
319 const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
320 std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
321 RETURN_IF_NOT_OK(shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded));
322 return Status::OK();
323 }
324
ComputeColMap()325 Status MindRecordOp::ComputeColMap() {
326 if (column_name_id_map_.empty()) {
327 for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
328 column_name_id_map_[columns_to_load_[i]] = i;
329 }
330 } else {
331 MS_LOG(WARNING) << "Column name map is already set!";
332 }
333 return Status::OK();
334 }
335
336 } // namespace dataset
337 } // namespace mindspore
338