1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/batch_op.h"
17
18 #include <utility>
19
20 #include "utils/ms_utils.h"
21 #ifdef ENABLE_PYTHON
22 #include "minddata/dataset/core/pybind_support.h"
23 #endif
24
25 #include "minddata/dataset/engine/db_connector.h"
26 #include "minddata/dataset/kernels/data/data_utils.h"
27 #include "minddata/dataset/util/status.h"
28
29 namespace mindspore {
30 namespace dataset {
Builder(int32_t batch_size)31 BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false), builder_pad_(false), builder_pad_map_({}) {
32 builder_batch_size_ = batch_size;
33 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
34 builder_num_workers_ = cfg->num_parallel_workers();
35 builder_op_connector_size_ = cfg->op_connector_size();
36 }
37
Build(std::shared_ptr<BatchOp> * ptr)38 Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
39 RETURN_UNEXPECTED_IF_NULL(ptr);
40 #ifdef ENABLE_PYTHON
41 *ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
42 builder_num_workers_, builder_in_names_, builder_out_names_,
43 builder_batch_size_func_, builder_batch_map_func_, builder_pad_map_);
44 #else
45 *ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
46 builder_num_workers_, builder_in_names_, builder_pad_map_);
47 #endif
48 return Status::OK();
49 }
50
51 #ifdef ENABLE_PYTHON
BatchOp(int32_t batch_size,bool drop,bool pad,int32_t op_queue_size,int32_t num_workers,const std::vector<std::string> & in_col,const std::vector<std::string> & out_col,py::function batch_size_func,py::function batch_map_func,PadInfo pad_map)52 BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
53 const std::vector<std::string> &in_col, const std::vector<std::string> &out_col,
54 py::function batch_size_func, py::function batch_map_func, PadInfo pad_map)
55 : ParallelOp(num_workers, op_queue_size),
56 start_batch_size_(batch_size),
57 drop_(drop),
58 pad_(pad),
59 in_col_names_(in_col),
60 out_col_names_(out_col),
61 batch_size_func_(batch_size_func),
62 batch_map_func_(batch_map_func),
63 pad_info_(pad_map),
64 batch_num_(0),
65 batch_cnt_(0) {
66 // Adjust connector queue size. After batch each row is batch_size times larger
67 int32_t queue_size = std::max(1, op_queue_size / start_batch_size_);
68 if (num_workers == 1) {
69 // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2
70 queue_size = std::max(2, queue_size);
71 }
72
73 worker_queues_.Init(num_workers, queue_size);
74 }
75 // if PYTHON is disabled. per_batch_map can't be used
76 #else
BatchOp(int32_t batch_size,bool drop,bool pad,int32_t op_queue_size,int32_t num_workers,const std::vector<std::string> & cols_to_map,PadInfo pad_map)77 BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
78 const std::vector<std::string> &cols_to_map, PadInfo pad_map)
79 : ParallelOp(num_workers, op_queue_size),
80 start_batch_size_(batch_size),
81 drop_(drop),
82 pad_(pad),
83 in_col_names_(cols_to_map),
84 pad_info_(pad_map),
85 batch_num_(0),
86 batch_cnt_(0) {
87 int32_t queue_size = std::max(1, op_queue_size / start_batch_size_);
88 if (num_workers == 1) {
89 // ensure there is at least 2 queue slots for whole operation.. If only 1 worker, incrase it to 2
90 queue_size = std::max(2, queue_size);
91 }
92 worker_queues_.Init(num_workers, queue_size);
93 }
94 #endif
95
operator ()()96 Status BatchOp::operator()() {
97 Status rc = LaunchThreadsAndInitOp();
98 // Synchronize with TaskManager
99 TaskManager::FindMe()->Post();
100 RETURN_IF_NOT_OK(rc);
101 int64_t epoch_num = 0, batch_num = 0, cnt = 0;
102 TensorRow new_row;
103 std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
104 child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
105 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
106 int32_t cur_batch_size = 0;
107 RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0)));
108 while (child_iterator_->EofHandled() == false) {
109 while (new_row.empty() == false) {
110 table->emplace_back(new_row);
111 // if # of rows is enough to make 1 batch, send it to worker_queue
112 if (table->size() == static_cast<size_t>(cur_batch_size)) {
113 RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
114 std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
115 cnt++;
116 table = std::make_unique<TensorQTable>();
117 RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
118 }
119 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
120 }
121 // Reminder logic, execute only when there is a remainder (table is non empty) and don't drop
122 if (drop_ == false && table->empty() == false) {
123 RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
124 std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
125 cnt++;
126 }
127 table = std::make_unique<TensorQTable>(); // this drops when drop == true
128 // end of the current epoch, batch_num should start from 0 again
129 batch_num = 0;
130 epoch_num++;
131 RETURN_IF_NOT_OK(
132 worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOE))));
133 RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
134 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
135
136 #if !defined(_WIN32) && !defined(_WIN64) && ENABLE_PYTHON
137 if ((num_workers_ > 1 || batch_map_func_) && GetMemoryUsage() > MAX_MEMORY_USAGE_THRESHOLD) {
138 MS_LOG(WARNING) << "Memory consumption is more than " << MAX_MEMORY_USAGE_THRESHOLD * 100 << "%, "
139 << "which may cause oom error. Please reduce num_parallel_workers size / "
140 << "optimize per_batch_map function / other python data preprocess function to "
141 << "reduce memory usage.";
142 }
143 #endif
144 } // end of EofHandled() == false
145 RETURN_IF_NOT_OK(
146 worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF))));
147 // EOF received, send quit signal to all workers
148 for (int32_t ind = 0; ind < num_workers_; ind++) {
149 RETURN_IF_NOT_OK(
150 worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kQuit))));
151 }
152 return Status::OK();
153 }
154
Print(std::ostream & out,bool show_all) const155 void BatchOp::Print(std::ostream &out, bool show_all) const {
156 if (!show_all) {
157 // Call the super class for displaying any common 1-liner info
158 ParallelOp::Print(out, show_all);
159 // Then show any custom derived-internal 1-liner info for this op
160 out << " [batch size: " << start_batch_size_ << "]\n";
161 } else {
162 // Call the super class for displaying any common detailed info
163 ParallelOp::Print(out, show_all);
164 // Then show any custom derived-internal stuff
165 out << "\nStart batch size: " << start_batch_size_ << "\nDrop remainder: " << (drop_ ? "yes" : "no") << "\n\n";
166 }
167 }
168
BatchRows(const std::unique_ptr<TensorQTable> * src,TensorRow * dest,dsize_t batch_size)169 Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *dest, dsize_t batch_size) {
170 RETURN_UNEXPECTED_IF_NULL(src);
171 RETURN_UNEXPECTED_IF_NULL(dest);
172 if ((*src)->size() != batch_size) {
173 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Source table size does not match the batch_size.");
174 }
175
176 if (batch_size == 1) {
177 *dest = std::move((*src)->front());
178 (*src)->pop_front();
179
180 for (const auto &tensor : (*dest)) {
181 RETURN_IF_NOT_OK(tensor->ExpandDim(0));
182 }
183 return Status::OK();
184 }
185
186 auto num_columns = (*src)->front().size();
187 for (size_t i = 0; i < num_columns; i++) {
188 std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i); // first row, column i
189 TensorShape first_shape = first_tensor->shape();
190 DataType first_type = first_tensor->type();
191 TensorShape new_shape = first_shape.PrependDim(static_cast<int64_t>(batch_size));
192
193 std::shared_ptr<Tensor> new_tensor;
194 if (first_type.IsNumeric()) { // numeric tensor
195 RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor));
196 dsize_t j = 0;
197 for (auto row : **src) {
198 std::shared_ptr<Tensor> old_tensor = row.at(i); // row j, column i
199 if (old_tensor->shape() == first_shape) { // check the newly popped rows have the same dim as the first
200 if (new_shape.NumOfElements() != 0) {
201 RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor));
202 }
203 // Don't do anything if the tensor has no data
204 } else {
205 std::stringstream shape1, shape2;
206 first_shape.Print(shape1);
207 old_tensor->shape().Print(shape2);
208 RETURN_STATUS_UNEXPECTED(
209 "Invalid data, batch operation expect same shape for each data row, but got inconsistent shape in column " +
210 std::to_string(i) + " expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str());
211 }
212 }
213 } else { // handle string column differently
214 std::vector<std::string> strings;
215 for (dsize_t j = 0; j < batch_size; j++) {
216 std::shared_ptr<Tensor> old_tensor = (*src)->at(j).at(i);
217 for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); ++itr) {
218 strings.emplace_back(*itr);
219 }
220 }
221 RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor));
222 }
223 dest->emplace_back(new_tensor);
224 }
225
226 return Status::OK();
227 }
228
WorkerEntry(int32_t workerId)229 Status BatchOp::WorkerEntry(int32_t workerId) {
230 TaskManager::FindMe()->Post();
231 std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair;
232 RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
233 while (table_pair.second.ctrl_ != batchCtrl::kQuit) {
234 if (table_pair.second.ctrl_ == batchCtrl::kEOE) {
235 RETURN_IF_NOT_OK(out_connector_->SendEOE(workerId));
236 } else if (table_pair.second.ctrl_ == batchCtrl::kEOF) {
237 RETURN_IF_NOT_OK(out_connector_->SendEOF(workerId));
238 } else if (table_pair.second.ctrl_ == batchCtrl::kNoCtrl) {
239 TensorRow new_row;
240 RETURN_IF_NOT_OK(MakeBatchedRow(std::move(table_pair), &new_row));
241 RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), workerId));
242 }
243 RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
244 }
245 return Status::OK();
246 }
247
MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>,CBatchInfo> table_pair,TensorRow * new_row)248 Status BatchOp::MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row) {
249 RETURN_UNEXPECTED_IF_NULL(table_pair.first);
250 #ifdef ENABLE_PYTHON
251 if (!in_col_names_.empty()) {
252 RETURN_IF_NOT_OK(MapColumns(&table_pair));
253 } // pass it through pyfun
254 #endif
255 if (pad_) {
256 RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_));
257 } // do padding if needed
258 RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size()));
259 return Status::OK();
260 }
261
LaunchThreadsAndInitOp()262 Status BatchOp::LaunchThreadsAndInitOp() {
263 if (tree_ == nullptr) {
264 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
265 "[Internal ERROR] Pipeline init failed, Execution tree not set.");
266 }
267 RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks()));
268 RETURN_IF_NOT_OK(
269 tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name(), id()));
270 return Status::OK();
271 }
272
EofReceived(int32_t)273 Status BatchOp::EofReceived(int32_t) { return Status::OK(); }
274
EoeReceived(int32_t)275 Status BatchOp::EoeReceived(int32_t) {
276 state_ = OpState::kDeOpIdle;
277 return Status::OK();
278 }
279
280 #ifdef ENABLE_PYTHON
MapColumns(std::pair<std::unique_ptr<TensorQTable>,CBatchInfo> * table_pair)281 Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
282 RETURN_UNEXPECTED_IF_NULL(table_pair);
283 RETURN_UNEXPECTED_IF_NULL(table_pair->first);
284 std::unique_ptr<TensorQTable> in_q_table = std::move(table_pair->first);
285 size_t num_rows = in_q_table->size();
286 auto out_q_table = std::make_unique<TensorQTable>(num_rows, TensorRow(column_name_id_map_.size(), nullptr));
287 TensorTable in_cols(in_col_names_.size(), TensorRow(num_rows, nullptr)), out_cols;
288
289 std::unordered_map<std::string, size_t> in_col_name_id; // name of columns that need to be fed to per-batch_map
290 for (size_t i = 0; i < in_col_names_.size(); i++) in_col_name_id.insert({in_col_names_[i], i});
291
292 for (const auto &itr : child_map_) {
293 auto col_itr = in_col_name_id.find(itr.first);
294 if (col_itr != in_col_name_id.end()) { // col needs to be prepared for per_batch_map
295 for (size_t i = 0; i < num_rows; i++) {
296 in_cols[col_itr->second][i] = std::move((*in_q_table)[i][itr.second]);
297 }
298 } else { // col needs to be placed into the out table
299 size_t col_id = column_name_id_map_[itr.first];
300 for (size_t i = 0; i < num_rows; i++) {
301 (*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
302 }
303 }
304 }
305
306 in_q_table.reset(); // release the input table
307 RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second));
308
309 for (size_t i = 0; i < out_cols.size(); i++) {
310 size_t col_id = column_name_id_map_[out_col_names_[i]];
311 size_t row_id = 0;
312 CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size(),
313 "Invalid data, column: " + out_col_names_[i] +
314 " expects: " + std::to_string(num_rows) +
315 " rows returned from per_batch_map, got: " + std::to_string(out_cols[i].size()));
316 for (auto &t_row : *out_q_table) {
317 t_row[col_id] = out_cols[i][row_id++];
318 }
319 }
320
321 table_pair->first = std::move(out_q_table);
322 return Status::OK();
323 }
324 #endif
325
GetBatchSize(int32_t * batch_size,CBatchInfo info)326 Status BatchOp::GetBatchSize(int32_t *batch_size, CBatchInfo info) {
327 RETURN_UNEXPECTED_IF_NULL(batch_size);
328 #ifdef ENABLE_PYTHON
329 if (batch_size_func_) {
330 RETURN_IF_NOT_OK(InvokeBatchSizeFunc(batch_size, info));
331 } else {
332 (*batch_size) = start_batch_size_;
333 }
334 #else
335 (*batch_size) = start_batch_size_;
336 #endif
337 return Status::OK();
338 }
339
340 #ifdef ENABLE_PYTHON
InvokeBatchSizeFunc(int32_t * batch_size,CBatchInfo info)341 Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
342 RETURN_UNEXPECTED_IF_NULL(batch_size);
343 {
344 // Acquire Python GIL
345 py::gil_scoped_acquire gil_acquire;
346 if (Py_IsInitialized() == 0) {
347 return Status(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
348 }
349 try {
350 py::object size = batch_size_func_(info);
351 *batch_size = size.cast<int32_t>();
352 if (*batch_size <= 0) {
353 return Status(StatusCode::kMDPyFuncException,
354 "Invalid parameter, batch_size function should return an integer greater than 0, but got: " +
355 std::to_string(*batch_size));
356 }
357 } catch (const py::error_already_set &e) {
358 return Status(StatusCode::kMDPyFuncException, e.what());
359 } catch (const py::cast_error &e) {
360 return Status(StatusCode::kMDPyFuncException,
361 "Invalid parameter, batch_size function should return an integer greater than 0.");
362 }
363 }
364 return Status(StatusCode::kSuccess, "batch_size function call succeeded.");
365 }
366
InvokeBatchMapFunc(TensorTable * input,TensorTable * output,CBatchInfo info)367 Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) {
368 RETURN_UNEXPECTED_IF_NULL(input);
369 RETURN_UNEXPECTED_IF_NULL(output);
370 {
371 // Acquire Python GIL
372 py::gil_scoped_acquire gil_acquire;
373 if (Py_IsInitialized() == 0) {
374 return Status(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
375 }
376 try {
377 // Prepare batch map call back parameters
378 py::tuple input_args(input->size() + 1);
379 for (size_t i = 0; i < input->size(); i++) {
380 std::vector<py::array> np_batch;
381 for (std::shared_ptr<Tensor> t : input->at(i)) {
382 py::array np_array;
383 RETURN_IF_NOT_OK(t->GetDataAsNumpy(&np_array));
384 np_batch.push_back(std::move(np_array));
385 }
386 input_args[i] = np_batch;
387 }
388 input_args[input->size()] = info;
389 // Invoke batch map func
390 py::object ret_py_obj = batch_map_func_(*input_args);
391 // Parse batch map return value
392 py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
393 CHECK_FAIL_RETURN_UNEXPECTED(py::isinstance<py::tuple>(ret_tuple),
394 "per_batch_map function should return a tuple.");
395 CHECK_FAIL_RETURN_UNEXPECTED(ret_tuple.size() == out_col_names_.size(),
396 "Incorrect number of columns returned in per_batch_map function. Expects: " +
397 std::to_string(out_col_names_.size()) +
398 " got: " + std::to_string(ret_tuple.size()));
399 for (size_t i = 0; i < ret_tuple.size(); i++) {
400 TensorRow output_batch;
401 // If user returns a type that is neither a list nor an array, issue a error msg.
402 if (!py::isinstance<py::list>(ret_tuple[i])) {
403 MS_LOG(INFO) << "column: " << out_col_names_[i]
404 << " returned by per_batch_map is not a list, this could lead to conversion failure.";
405 }
406
407 py::list output_list = py::cast<py::list>(ret_tuple[i]);
408
409 for (size_t j = 0; j < output_list.size(); j++) {
410 std::shared_ptr<Tensor> out;
411 RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py::cast<py::array>(output_list[j]), &out));
412 output_batch.push_back(std::move(out));
413 }
414 output->push_back(std::move(output_batch));
415 }
416 } catch (const py::error_already_set &e) {
417 return Status(StatusCode::kMDPyFuncException, e.what());
418 } catch (const py::cast_error &e) {
419 return Status(StatusCode::kMDPyFuncException,
420 "Invalid parameter, per_batch_map function of batch should return a tuple of list of numpy array.");
421 }
422 }
423 return Status::OK();
424 }
425 #endif
426
PadColumns(std::unique_ptr<TensorQTable> * table,const PadInfo & pad_info,const std::unordered_map<std::string,int32_t> & column_name_id_map)427 Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
428 const std::unordered_map<std::string, int32_t> &column_name_id_map) {
429 RETURN_UNEXPECTED_IF_NULL(table); // placeholder for now, might need this in the future
430 CHECK_FAIL_RETURN_UNEXPECTED(
431 (*table)->front().size() == column_name_id_map.size(),
432 "Invalid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
433 std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size()));
434 std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
435 0); // value to pad each column's tensor with, default 0
436 std::set<int32_t> pad_cols;
437 // padded_shape provided by user, maximum shapes of current batch of tensors
438 std::vector<std::vector<dsize_t>> pad_shapes(column_name_id_map.size()), max_shapes(column_name_id_map.size());
439 RETURN_IF_NOT_OK(UnpackPadInfo(pad_info, column_name_id_map, &pad_cols, &pad_vals, &pad_shapes));
440
441 // init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well
442 for (size_t col_id : pad_cols) {
443 max_shapes[col_id] = std::vector<dsize_t>((*table)->front()[col_id]->Rank(), -1);
444 if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id]; // fill pad shape with -1
445 CHECK_FAIL_RETURN_UNEXPECTED(
446 pad_shapes[col_id].size() == max_shapes[col_id].size(),
447 "Invalid data, rank of pad_shape must be equal to rank of specified column. pad_shapes rank:" +
448 std::to_string(pad_shapes[col_id].size()) + ", column rank: " + std::to_string(max_shapes[col_id].size()));
449 }
450
451 // calculate maximum shape for each column that needs to be padded
452 for (const TensorRow &row : **table) { // iterator each row in a batch
453 for (size_t col_id : pad_cols) { // iterator each tensor in a row
454 CHECK_FAIL_RETURN_UNEXPECTED(
455 row[col_id]->Rank() == max_shapes[col_id].size(),
456 "Invalid data, data to be padded together need to have the same rank, got shape 1: " +
457 std::to_string(row[col_id]->Rank()) + ", shape 2: " + std::to_string(max_shapes[col_id].size()));
458 for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) { // pick the largest number in each dimension
459 max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]);
460 }
461 }
462 }
463
464 // if user sets a dimension to -1 (None in python), use the max value for current dimension
465 for (size_t col_id : pad_cols) {
466 for (size_t dim = 0; dim < pad_shapes[col_id].size(); dim++) {
467 if (pad_shapes[col_id][dim] < 0) pad_shapes[col_id][dim] = max_shapes[col_id][dim];
468 }
469 }
470
471 // call pad on each tensor that needs to be padded
472 for (TensorRow &row : **table) {
473 for (size_t col_id : pad_cols) {
474 std::shared_ptr<Tensor> pad_tensor;
475 RETURN_IF_NOT_OK(PadEnd(row[col_id], &pad_tensor, pad_shapes[col_id], pad_vals[col_id]));
476 row[col_id] = pad_tensor;
477 }
478 }
479 return Status::OK();
480 }
481
UnpackPadInfo(const PadInfo & pad_info,const std::unordered_map<std::string,int32_t> & column_name_id_map,std::set<int32_t> * pad_cols,std::vector<std::shared_ptr<Tensor>> * pad_vals,std::vector<std::vector<dsize_t>> * pad_shapes)482 Status BatchOp::UnpackPadInfo(const PadInfo &pad_info,
483 const std::unordered_map<std::string, int32_t> &column_name_id_map,
484 std::set<int32_t> *pad_cols, std::vector<std::shared_ptr<Tensor>> *pad_vals,
485 std::vector<std::vector<dsize_t>> *pad_shapes) {
486 RETURN_UNEXPECTED_IF_NULL(pad_cols);
487 RETURN_UNEXPECTED_IF_NULL(pad_vals);
488 RETURN_UNEXPECTED_IF_NULL(pad_shapes);
489 if (pad_info.empty()) { // if pad_info empty, pad every columns automatically
490 for (size_t col_id = 0; col_id < column_name_id_map.size(); col_id++) {
491 pad_cols->insert(col_id);
492 }
493 } else {
494 for (const auto &p : pad_info) {
495 auto location = column_name_id_map.find(p.first);
496 CHECK_FAIL_RETURN_UNEXPECTED(location != column_name_id_map.end(),
497 "Invalid parameter, column name: " + p.first + " does not exist.");
498 auto col_id = static_cast<dsize_t>(location->second);
499 CHECK_FAIL_RETURN_UNEXPECTED(
500 col_id < pad_vals->size() && col_id < pad_shapes->size(),
501 "Invalid parameter, column id must be less than the size of pad_val and pad_shape, but got: " +
502 std::to_string(col_id));
503 pad_cols->insert(col_id);
504 (*pad_vals)[col_id] = p.second.second; // set pad values
505 (*pad_shapes)[col_id] = p.second.first.AsVector(); // empty vector if shape is unknown
506 }
507 }
508 return Status::OK();
509 }
510
ComputeColMap()511 Status BatchOp::ComputeColMap() {
512 CHECK_FAIL_RETURN_UNEXPECTED(child_.size() == 1,
513 "Invalid data, batch operator can't be used as a single operator, "
514 "should be preceded by an operator that reads data, for example, ImageFolderDataset.");
515 CHECK_FAIL_RETURN_UNEXPECTED(!(child_[0]->column_name_id_map().empty()),
516 "Invalid data, the column of the previous operator of the batch cannot be empty.");
517
518 if (in_col_names_.empty()) { // if per_batch_map is not set, do not need to deal with out_col_names
519 column_name_id_map_ = child_[0]->column_name_id_map();
520 return Status::OK();
521 }
522
523 // from this point onward, per_batch_map is needed, therefore, child_map_ must be set
524 child_map_ = child_[0]->column_name_id_map();
525
526 // check all input columns exist
527 for (const auto &col : in_col_names_) {
528 CHECK_FAIL_RETURN_UNEXPECTED(child_map_.find(col) != child_map_.end(),
529 "Invalid parameter, col:" + col + " doesn't exist in dataset.");
530 }
531
532 // following logic deals with per_batch_map
533 bool col_name_flag = (out_col_names_.empty() || out_col_names_ == in_col_names_); // true if col name is unchanged
534
535 // column names are unchanged
536 if (col_name_flag) {
537 if (out_col_names_.empty()) out_col_names_ = in_col_names_;
538 column_name_id_map_ = child_map_;
539 return Status::OK();
540 }
541 // column names are changed from this point onward, this map is the child_map without input cols for per_batch_map
542 auto child_map_no_in_col = child_map_;
543
544 for (const auto &col : in_col_names_) {
545 child_map_no_in_col.erase(col);
546 }
547
548 // col names are changed
549 if (out_col_names_.size() == in_col_names_.size()) { // column names changed, but same number of columns
550 // the following code rename the input keys to output keys. ["a","b"] -> ["b", "a"] is allowed
551 column_name_id_map_ = child_map_no_in_col;
552 for (auto i = 0; i < in_col_names_.size(); i++) {
553 column_name_id_map_[out_col_names_[i]] = child_map_[in_col_names_[i]];
554 }
555 } else { // number of columns are different, put the output column names first, then the original ones
556 for (const std::string &col : out_col_names_) {
557 column_name_id_map_.insert({col, column_name_id_map_.size()});
558 }
559 for (const auto &itr : child_map_no_in_col) {
560 column_name_id_map_.insert({itr.first, column_name_id_map_.size()});
561 }
562 }
563
564 CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.size() == (child_map_no_in_col.size() + out_col_names_.size()),
565 "Key error in column_name_id_map_. output_columns in batch is not set correctly!");
566 return Status::OK();
567 }
568
GetTreeBatchSize()569 int64_t BatchOp::GetTreeBatchSize() {
570 #ifdef ENABLE_PYTHON
571 if (batch_size_func_) {
572 return -1;
573 }
574 #endif
575 return start_batch_size_;
576 }
577
GetNextRowPullMode(TensorRow * const row)578 Status BatchOp::GetNextRowPullMode(TensorRow *const row) {
579 RETURN_UNEXPECTED_IF_NULL(row);
580 std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
581 child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
582 int32_t cur_batch_size = 0;
583 RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_)));
584 for (int i = 0; i < cur_batch_size; i++) {
585 TensorRow new_row;
586 RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
587 if (!new_row.empty()) {
588 table->emplace_back(new_row);
589 if (table->size() == static_cast<size_t>(cur_batch_size)) break;
590 } else {
591 if (drop_ || table->empty()) {
592 table = std::make_unique<TensorQTable>(); // this drops when drop == true
593 }
594 }
595 }
596 RETURN_UNEXPECTED_IF_NULL(table);
597 if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); // do padding if needed
598 if (!table->empty()) {
599 RETURN_IF_NOT_OK(BatchRows(&table, row, table->size()));
600 batch_cnt_++;
601 batch_num_++;
602 }
603 return Status::OK();
604 }
605
606 } // namespace dataset
607 } // namespace mindspore
608