• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/batch_op.h"
17 
18 #include <utility>
19 
20 #include "utils/ms_utils.h"
21 #ifdef ENABLE_PYTHON
22 #include "minddata/dataset/core/pybind_support.h"
23 #endif
24 
25 #include "minddata/dataset/engine/db_connector.h"
26 #include "minddata/dataset/kernels/data/data_utils.h"
27 #include "minddata/dataset/util/status.h"
28 
29 namespace mindspore {
30 namespace dataset {
Builder(int32_t batch_size)31 BatchOp::Builder::Builder(int32_t batch_size) : builder_drop_(false), builder_pad_(false), builder_pad_map_({}) {
32   builder_batch_size_ = batch_size;
33   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
34   builder_num_workers_ = cfg->num_parallel_workers();
35   builder_op_connector_size_ = cfg->op_connector_size();
36 }
37 
Build(std::shared_ptr<BatchOp> * ptr)38 Status BatchOp::Builder::Build(std::shared_ptr<BatchOp> *ptr) {
39   RETURN_UNEXPECTED_IF_NULL(ptr);
40 #ifdef ENABLE_PYTHON
41   *ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
42                                    builder_num_workers_, builder_in_names_, builder_out_names_,
43                                    builder_batch_size_func_, builder_batch_map_func_, builder_pad_map_);
44 #else
45   *ptr = std::make_shared<BatchOp>(builder_batch_size_, builder_drop_, builder_pad_, builder_op_connector_size_,
46                                    builder_num_workers_, builder_in_names_, builder_pad_map_);
47 #endif
48   return Status::OK();
49 }
50 
51 #ifdef ENABLE_PYTHON
BatchOp(int32_t batch_size,bool drop,bool pad,int32_t op_queue_size,int32_t num_workers,const std::vector<std::string> & in_col,const std::vector<std::string> & out_col,py::function batch_size_func,py::function batch_map_func,PadInfo pad_map)52 BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
53                  const std::vector<std::string> &in_col, const std::vector<std::string> &out_col,
54                  py::function batch_size_func, py::function batch_map_func, PadInfo pad_map)
55     : ParallelOp(num_workers, op_queue_size),
56       start_batch_size_(batch_size),
57       drop_(drop),
58       pad_(pad),
59       in_col_names_(in_col),
60       out_col_names_(out_col),
61       batch_size_func_(batch_size_func),
62       batch_map_func_(batch_map_func),
63       pad_info_(pad_map),
64       batch_num_(0),
65       batch_cnt_(0) {
66   // Adjust connector queue size.  After batch each row is batch_size times larger
67   int32_t queue_size = std::max(1, op_queue_size / start_batch_size_);
68   if (num_workers == 1) {
69     // ensure there is at least 2 queue slots for whole operation..  If only 1 worker, incrase it to 2
70     queue_size = std::max(2, queue_size);
71   }
72 
73   worker_queues_.Init(num_workers, queue_size);
74 }
75 // if PYTHON is disabled. per_batch_map can't be used
76 #else
BatchOp(int32_t batch_size,bool drop,bool pad,int32_t op_queue_size,int32_t num_workers,const std::vector<std::string> & cols_to_map,PadInfo pad_map)77 BatchOp::BatchOp(int32_t batch_size, bool drop, bool pad, int32_t op_queue_size, int32_t num_workers,
78                  const std::vector<std::string> &cols_to_map, PadInfo pad_map)
79     : ParallelOp(num_workers, op_queue_size),
80       start_batch_size_(batch_size),
81       drop_(drop),
82       pad_(pad),
83       in_col_names_(cols_to_map),
84       pad_info_(pad_map),
85       batch_num_(0),
86       batch_cnt_(0) {
87   int32_t queue_size = std::max(1, op_queue_size / start_batch_size_);
88   if (num_workers == 1) {
89     // ensure there is at least 2 queue slots for whole operation..  If only 1 worker, incrase it to 2
90     queue_size = std::max(2, queue_size);
91   }
92   worker_queues_.Init(num_workers, queue_size);
93 }
94 #endif
95 
operator ()()96 Status BatchOp::operator()() {
97   Status rc = LaunchThreadsAndInitOp();
98   // Synchronize with TaskManager
99   TaskManager::FindMe()->Post();
100   RETURN_IF_NOT_OK(rc);
101   int64_t epoch_num = 0, batch_num = 0, cnt = 0;
102   TensorRow new_row;
103   std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
104   child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
105   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
106   int32_t cur_batch_size = 0;
107   RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0)));
108   while (child_iterator_->EofHandled() == false) {
109     while (new_row.empty() == false) {
110       table->emplace_back(new_row);
111       // if # of rows is enough to make 1 batch, send it to worker_queue
112       if (table->size() == static_cast<size_t>(cur_batch_size)) {
113         RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
114           std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
115         cnt++;
116         table = std::make_unique<TensorQTable>();
117         RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
118       }
119       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
120     }
121     // Reminder logic, execute only when there is a remainder (table is non empty) and don't drop
122     if (drop_ == false && table->empty() == false) {
123       RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
124         std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
125       cnt++;
126     }
127     table = std::make_unique<TensorQTable>();  // this drops when drop == true
128     // end of the current epoch, batch_num should start from 0 again
129     batch_num = 0;
130     epoch_num++;
131     RETURN_IF_NOT_OK(
132       worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOE))));
133     RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
134     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
135 
136 #if !defined(_WIN32) && !defined(_WIN64) && ENABLE_PYTHON
137     if ((num_workers_ > 1 || batch_map_func_) && GetMemoryUsage() > MAX_MEMORY_USAGE_THRESHOLD) {
138       MS_LOG(WARNING) << "Memory consumption is more than " << MAX_MEMORY_USAGE_THRESHOLD * 100 << "%, "
139                       << "which may cause oom error. Please reduce num_parallel_workers size / "
140                       << "optimize per_batch_map function / other python data preprocess function to "
141                       << "reduce memory usage.";
142     }
143 #endif
144   }  // end of EofHandled() == false
145   RETURN_IF_NOT_OK(
146     worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kEOF))));
147   // EOF received, send quit signal to all workers
148   for (int32_t ind = 0; ind < num_workers_; ind++) {
149     RETURN_IF_NOT_OK(
150       worker_queues_[cnt++ % num_workers_]->EmplaceBack(std::make_pair(nullptr, CBatchInfo(batchCtrl::kQuit))));
151   }
152   return Status::OK();
153 }
154 
Print(std::ostream & out,bool show_all) const155 void BatchOp::Print(std::ostream &out, bool show_all) const {
156   if (!show_all) {
157     // Call the super class for displaying any common 1-liner info
158     ParallelOp::Print(out, show_all);
159     // Then show any custom derived-internal 1-liner info for this op
160     out << " [batch size: " << start_batch_size_ << "]\n";
161   } else {
162     // Call the super class for displaying any common detailed info
163     ParallelOp::Print(out, show_all);
164     // Then show any custom derived-internal stuff
165     out << "\nStart batch size: " << start_batch_size_ << "\nDrop remainder: " << (drop_ ? "yes" : "no") << "\n\n";
166   }
167 }
168 
BatchRows(const std::unique_ptr<TensorQTable> * src,TensorRow * dest,dsize_t batch_size)169 Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *dest, dsize_t batch_size) {
170   RETURN_UNEXPECTED_IF_NULL(src);
171   RETURN_UNEXPECTED_IF_NULL(dest);
172   if ((*src)->size() != batch_size) {
173     RETURN_STATUS_UNEXPECTED("[Internal ERROR] Source table size does not match the batch_size.");
174   }
175 
176   if (batch_size == 1) {
177     *dest = std::move((*src)->front());
178     (*src)->pop_front();
179 
180     for (const auto &tensor : (*dest)) {
181       RETURN_IF_NOT_OK(tensor->ExpandDim(0));
182     }
183     return Status::OK();
184   }
185 
186   auto num_columns = (*src)->front().size();
187   for (size_t i = 0; i < num_columns; i++) {
188     std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i);  // first row, column i
189     TensorShape first_shape = first_tensor->shape();
190     DataType first_type = first_tensor->type();
191     TensorShape new_shape = first_shape.PrependDim(static_cast<int64_t>(batch_size));
192 
193     std::shared_ptr<Tensor> new_tensor;
194     if (first_type.IsNumeric()) {  // numeric tensor
195       RETURN_IF_NOT_OK(Tensor::CreateEmpty(new_shape, first_type, &new_tensor));
196       dsize_t j = 0;
197       for (auto row : **src) {
198         std::shared_ptr<Tensor> old_tensor = row.at(i);  // row j, column i
199         if (old_tensor->shape() == first_shape) {        // check the newly popped rows have the same dim as the first
200           if (new_shape.NumOfElements() != 0) {
201             RETURN_IF_NOT_OK(new_tensor->InsertTensor({j++}, old_tensor));
202           }
203           // Don't do anything if the tensor has no data
204         } else {
205           std::stringstream shape1, shape2;
206           first_shape.Print(shape1);
207           old_tensor->shape().Print(shape2);
208           RETURN_STATUS_UNEXPECTED(
209             "Invalid data, batch operation expect same shape for each data row, but got inconsistent shape in column " +
210             std::to_string(i) + " expected shape for this column is:" + shape1.str() + ", got shape:" + shape2.str());
211         }
212       }
213     } else {  // handle string column differently
214       std::vector<std::string> strings;
215       for (dsize_t j = 0; j < batch_size; j++) {
216         std::shared_ptr<Tensor> old_tensor = (*src)->at(j).at(i);
217         for (auto itr = old_tensor->begin<std::string_view>(); itr != old_tensor->end<std::string_view>(); ++itr) {
218           strings.emplace_back(*itr);
219         }
220       }
221       RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor));
222     }
223     dest->emplace_back(new_tensor);
224   }
225 
226   return Status::OK();
227 }
228 
WorkerEntry(int32_t workerId)229 Status BatchOp::WorkerEntry(int32_t workerId) {
230   TaskManager::FindMe()->Post();
231   std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair;
232   RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
233   while (table_pair.second.ctrl_ != batchCtrl::kQuit) {
234     if (table_pair.second.ctrl_ == batchCtrl::kEOE) {
235       RETURN_IF_NOT_OK(out_connector_->SendEOE(workerId));
236     } else if (table_pair.second.ctrl_ == batchCtrl::kEOF) {
237       RETURN_IF_NOT_OK(out_connector_->SendEOF(workerId));
238     } else if (table_pair.second.ctrl_ == batchCtrl::kNoCtrl) {
239       TensorRow new_row;
240       RETURN_IF_NOT_OK(MakeBatchedRow(std::move(table_pair), &new_row));
241       RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), workerId));
242     }
243     RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
244   }
245   return Status::OK();
246 }
247 
MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>,CBatchInfo> table_pair,TensorRow * new_row)248 Status BatchOp::MakeBatchedRow(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row) {
249   RETURN_UNEXPECTED_IF_NULL(table_pair.first);
250 #ifdef ENABLE_PYTHON
251   if (!in_col_names_.empty()) {
252     RETURN_IF_NOT_OK(MapColumns(&table_pair));
253   }  // pass it through pyfun
254 #endif
255   if (pad_) {
256     RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_));
257   }  // do padding if needed
258   RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size()));
259   return Status::OK();
260 }
261 
LaunchThreadsAndInitOp()262 Status BatchOp::LaunchThreadsAndInitOp() {
263   if (tree_ == nullptr) {
264     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
265                   "[Internal ERROR] Pipeline init failed, Execution tree not set.");
266   }
267   RETURN_IF_NOT_OK(worker_queues_.Register(tree_->AllTasks()));
268   RETURN_IF_NOT_OK(
269     tree_->LaunchWorkers(num_workers_, std::bind(&BatchOp::WorkerEntry, this, std::placeholders::_1), Name(), id()));
270   return Status::OK();
271 }
272 
EofReceived(int32_t)273 Status BatchOp::EofReceived(int32_t) { return Status::OK(); }
274 
EoeReceived(int32_t)275 Status BatchOp::EoeReceived(int32_t) {
276   state_ = OpState::kDeOpIdle;
277   return Status::OK();
278 }
279 
280 #ifdef ENABLE_PYTHON
MapColumns(std::pair<std::unique_ptr<TensorQTable>,CBatchInfo> * table_pair)281 Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
282   RETURN_UNEXPECTED_IF_NULL(table_pair);
283   RETURN_UNEXPECTED_IF_NULL(table_pair->first);
284   std::unique_ptr<TensorQTable> in_q_table = std::move(table_pair->first);
285   size_t num_rows = in_q_table->size();
286   auto out_q_table = std::make_unique<TensorQTable>(num_rows, TensorRow(column_name_id_map_.size(), nullptr));
287   TensorTable in_cols(in_col_names_.size(), TensorRow(num_rows, nullptr)), out_cols;
288 
289   std::unordered_map<std::string, size_t> in_col_name_id;  // name of columns that need to be fed to per-batch_map
290   for (size_t i = 0; i < in_col_names_.size(); i++) in_col_name_id.insert({in_col_names_[i], i});
291 
292   for (const auto &itr : child_map_) {
293     auto col_itr = in_col_name_id.find(itr.first);
294     if (col_itr != in_col_name_id.end()) {  // col needs to be prepared for per_batch_map
295       for (size_t i = 0; i < num_rows; i++) {
296         in_cols[col_itr->second][i] = std::move((*in_q_table)[i][itr.second]);
297       }
298     } else {  // col needs to be placed into the out table
299       size_t col_id = column_name_id_map_[itr.first];
300       for (size_t i = 0; i < num_rows; i++) {
301         (*out_q_table)[i][col_id] = std::move((*in_q_table)[i][itr.second]);
302       }
303     }
304   }
305 
306   in_q_table.reset();  // release the input table
307   RETURN_IF_NOT_OK(InvokeBatchMapFunc(&in_cols, &out_cols, table_pair->second));
308 
309   for (size_t i = 0; i < out_cols.size(); i++) {
310     size_t col_id = column_name_id_map_[out_col_names_[i]];
311     size_t row_id = 0;
312     CHECK_FAIL_RETURN_UNEXPECTED(num_rows == out_cols[i].size(),
313                                  "Invalid data, column: " + out_col_names_[i] +
314                                    " expects: " + std::to_string(num_rows) +
315                                    " rows returned from per_batch_map, got: " + std::to_string(out_cols[i].size()));
316     for (auto &t_row : *out_q_table) {
317       t_row[col_id] = out_cols[i][row_id++];
318     }
319   }
320 
321   table_pair->first = std::move(out_q_table);
322   return Status::OK();
323 }
324 #endif
325 
GetBatchSize(int32_t * batch_size,CBatchInfo info)326 Status BatchOp::GetBatchSize(int32_t *batch_size, CBatchInfo info) {
327   RETURN_UNEXPECTED_IF_NULL(batch_size);
328 #ifdef ENABLE_PYTHON
329   if (batch_size_func_) {
330     RETURN_IF_NOT_OK(InvokeBatchSizeFunc(batch_size, info));
331   } else {
332     (*batch_size) = start_batch_size_;
333   }
334 #else
335   (*batch_size) = start_batch_size_;
336 #endif
337   return Status::OK();
338 }
339 
340 #ifdef ENABLE_PYTHON
InvokeBatchSizeFunc(int32_t * batch_size,CBatchInfo info)341 Status BatchOp::InvokeBatchSizeFunc(int32_t *batch_size, CBatchInfo info) {
342   RETURN_UNEXPECTED_IF_NULL(batch_size);
343   {
344     // Acquire Python GIL
345     py::gil_scoped_acquire gil_acquire;
346     if (Py_IsInitialized() == 0) {
347       return Status(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
348     }
349     try {
350       py::object size = batch_size_func_(info);
351       *batch_size = size.cast<int32_t>();
352       if (*batch_size <= 0) {
353         return Status(StatusCode::kMDPyFuncException,
354                       "Invalid parameter, batch_size function should return an integer greater than 0, but got: " +
355                         std::to_string(*batch_size));
356       }
357     } catch (const py::error_already_set &e) {
358       return Status(StatusCode::kMDPyFuncException, e.what());
359     } catch (const py::cast_error &e) {
360       return Status(StatusCode::kMDPyFuncException,
361                     "Invalid parameter, batch_size function should return an integer greater than 0.");
362     }
363   }
364   return Status(StatusCode::kSuccess, "batch_size function call succeeded.");
365 }
366 
InvokeBatchMapFunc(TensorTable * input,TensorTable * output,CBatchInfo info)367 Status BatchOp::InvokeBatchMapFunc(TensorTable *input, TensorTable *output, CBatchInfo info) {
368   RETURN_UNEXPECTED_IF_NULL(input);
369   RETURN_UNEXPECTED_IF_NULL(output);
370   {
371     // Acquire Python GIL
372     py::gil_scoped_acquire gil_acquire;
373     if (Py_IsInitialized() == 0) {
374       return Status(StatusCode::kMDPythonInterpreterFailure, "[Internal ERROR] Python Interpreter is finalized.");
375     }
376     try {
377       // Prepare batch map call back parameters
378       py::tuple input_args(input->size() + 1);
379       for (size_t i = 0; i < input->size(); i++) {
380         std::vector<py::array> np_batch;
381         for (std::shared_ptr<Tensor> t : input->at(i)) {
382           py::array np_array;
383           RETURN_IF_NOT_OK(t->GetDataAsNumpy(&np_array));
384           np_batch.push_back(std::move(np_array));
385         }
386         input_args[i] = np_batch;
387       }
388       input_args[input->size()] = info;
389       // Invoke batch map func
390       py::object ret_py_obj = batch_map_func_(*input_args);
391       // Parse batch map return value
392       py::tuple ret_tuple = py::cast<py::tuple>(ret_py_obj);
393       CHECK_FAIL_RETURN_UNEXPECTED(py::isinstance<py::tuple>(ret_tuple),
394                                    "per_batch_map function should return a tuple.");
395       CHECK_FAIL_RETURN_UNEXPECTED(ret_tuple.size() == out_col_names_.size(),
396                                    "Incorrect number of columns returned in per_batch_map function. Expects: " +
397                                      std::to_string(out_col_names_.size()) +
398                                      " got: " + std::to_string(ret_tuple.size()));
399       for (size_t i = 0; i < ret_tuple.size(); i++) {
400         TensorRow output_batch;
401         // If user returns a type that is neither a list nor an array, issue a error msg.
402         if (!py::isinstance<py::list>(ret_tuple[i])) {
403           MS_LOG(INFO) << "column: " << out_col_names_[i]
404                        << " returned by per_batch_map is not a list, this could lead to conversion failure.";
405         }
406 
407         py::list output_list = py::cast<py::list>(ret_tuple[i]);
408 
409         for (size_t j = 0; j < output_list.size(); j++) {
410           std::shared_ptr<Tensor> out;
411           RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py::cast<py::array>(output_list[j]), &out));
412           output_batch.push_back(std::move(out));
413         }
414         output->push_back(std::move(output_batch));
415       }
416     } catch (const py::error_already_set &e) {
417       return Status(StatusCode::kMDPyFuncException, e.what());
418     } catch (const py::cast_error &e) {
419       return Status(StatusCode::kMDPyFuncException,
420                     "Invalid parameter, per_batch_map function of batch should return a tuple of list of numpy array.");
421     }
422   }
423   return Status::OK();
424 }
425 #endif
426 
PadColumns(std::unique_ptr<TensorQTable> * table,const PadInfo & pad_info,const std::unordered_map<std::string,int32_t> & column_name_id_map)427 Status BatchOp::PadColumns(std::unique_ptr<TensorQTable> *table, const PadInfo &pad_info,
428                            const std::unordered_map<std::string, int32_t> &column_name_id_map) {
429   RETURN_UNEXPECTED_IF_NULL(table);  // placeholder for now, might need this in the future
430   CHECK_FAIL_RETURN_UNEXPECTED(
431     (*table)->front().size() == column_name_id_map.size(),
432     "Invalid parameter, size of column_name_id_map must be equal to num of data columns. map size: " +
433       std::to_string(column_name_id_map.size()) + ", column nums: " + std::to_string((*table)->front().size()));
434   std::vector<std::shared_ptr<Tensor>> pad_vals(column_name_id_map.size(),
435                                                 0);  // value to pad each column's tensor with, default 0
436   std::set<int32_t> pad_cols;
437   // padded_shape provided by user, maximum shapes of current batch of tensors
438   std::vector<std::vector<dsize_t>> pad_shapes(column_name_id_map.size()), max_shapes(column_name_id_map.size());
439   RETURN_IF_NOT_OK(UnpackPadInfo(pad_info, column_name_id_map, &pad_cols, &pad_vals, &pad_shapes));
440 
441   // init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well
442   for (size_t col_id : pad_cols) {
443     max_shapes[col_id] = std::vector<dsize_t>((*table)->front()[col_id]->Rank(), -1);
444     if (pad_shapes[col_id].empty()) pad_shapes[col_id] = max_shapes[col_id];  // fill pad shape with -1
445     CHECK_FAIL_RETURN_UNEXPECTED(
446       pad_shapes[col_id].size() == max_shapes[col_id].size(),
447       "Invalid data, rank of pad_shape must be equal to rank of specified column. pad_shapes rank:" +
448         std::to_string(pad_shapes[col_id].size()) + ", column rank: " + std::to_string(max_shapes[col_id].size()));
449   }
450 
451   // calculate maximum shape for each column that needs to be padded
452   for (const TensorRow &row : **table) {  // iterator each row in a batch
453     for (size_t col_id : pad_cols) {      // iterator each tensor in a row
454       CHECK_FAIL_RETURN_UNEXPECTED(
455         row[col_id]->Rank() == max_shapes[col_id].size(),
456         "Invalid data, data to be padded together need to have the same rank, got shape 1: " +
457           std::to_string(row[col_id]->Rank()) + ", shape 2: " + std::to_string(max_shapes[col_id].size()));
458       for (size_t dim = 0; dim < row[col_id]->Rank(); dim++) {  // pick the largest number in each dimension
459         max_shapes[col_id][dim] = std::max(max_shapes[col_id][dim], row[col_id]->shape()[dim]);
460       }
461     }
462   }
463 
464   // if user sets a dimension to -1 (None in python), use the max value for current dimension
465   for (size_t col_id : pad_cols) {
466     for (size_t dim = 0; dim < pad_shapes[col_id].size(); dim++) {
467       if (pad_shapes[col_id][dim] < 0) pad_shapes[col_id][dim] = max_shapes[col_id][dim];
468     }
469   }
470 
471   // call pad on each tensor that needs to be padded
472   for (TensorRow &row : **table) {
473     for (size_t col_id : pad_cols) {
474       std::shared_ptr<Tensor> pad_tensor;
475       RETURN_IF_NOT_OK(PadEnd(row[col_id], &pad_tensor, pad_shapes[col_id], pad_vals[col_id]));
476       row[col_id] = pad_tensor;
477     }
478   }
479   return Status::OK();
480 }
481 
UnpackPadInfo(const PadInfo & pad_info,const std::unordered_map<std::string,int32_t> & column_name_id_map,std::set<int32_t> * pad_cols,std::vector<std::shared_ptr<Tensor>> * pad_vals,std::vector<std::vector<dsize_t>> * pad_shapes)482 Status BatchOp::UnpackPadInfo(const PadInfo &pad_info,
483                               const std::unordered_map<std::string, int32_t> &column_name_id_map,
484                               std::set<int32_t> *pad_cols, std::vector<std::shared_ptr<Tensor>> *pad_vals,
485                               std::vector<std::vector<dsize_t>> *pad_shapes) {
486   RETURN_UNEXPECTED_IF_NULL(pad_cols);
487   RETURN_UNEXPECTED_IF_NULL(pad_vals);
488   RETURN_UNEXPECTED_IF_NULL(pad_shapes);
489   if (pad_info.empty()) {  // if pad_info empty, pad every columns automatically
490     for (size_t col_id = 0; col_id < column_name_id_map.size(); col_id++) {
491       pad_cols->insert(col_id);
492     }
493   } else {
494     for (const auto &p : pad_info) {
495       auto location = column_name_id_map.find(p.first);
496       CHECK_FAIL_RETURN_UNEXPECTED(location != column_name_id_map.end(),
497                                    "Invalid parameter, column name: " + p.first + " does not exist.");
498       auto col_id = static_cast<dsize_t>(location->second);
499       CHECK_FAIL_RETURN_UNEXPECTED(
500         col_id < pad_vals->size() && col_id < pad_shapes->size(),
501         "Invalid parameter, column id must be less than the size of pad_val and pad_shape, but got: " +
502           std::to_string(col_id));
503       pad_cols->insert(col_id);
504       (*pad_vals)[col_id] = p.second.second;              // set pad values
505       (*pad_shapes)[col_id] = p.second.first.AsVector();  // empty vector if shape is unknown
506     }
507   }
508   return Status::OK();
509 }
510 
ComputeColMap()511 Status BatchOp::ComputeColMap() {
512   CHECK_FAIL_RETURN_UNEXPECTED(child_.size() == 1,
513                                "Invalid data, batch operator can't be used as a single operator, "
514                                "should be preceded by an operator that reads data, for example, ImageFolderDataset.");
515   CHECK_FAIL_RETURN_UNEXPECTED(!(child_[0]->column_name_id_map().empty()),
516                                "Invalid data, the column of the previous operator of the batch cannot be empty.");
517 
518   if (in_col_names_.empty()) {  // if per_batch_map is not set, do not need to deal with out_col_names
519     column_name_id_map_ = child_[0]->column_name_id_map();
520     return Status::OK();
521   }
522 
523   // from this point onward, per_batch_map is needed, therefore, child_map_ must be set
524   child_map_ = child_[0]->column_name_id_map();
525 
526   // check all input columns exist
527   for (const auto &col : in_col_names_) {
528     CHECK_FAIL_RETURN_UNEXPECTED(child_map_.find(col) != child_map_.end(),
529                                  "Invalid parameter, col:" + col + " doesn't exist in dataset.");
530   }
531 
532   // following logic deals with per_batch_map
533   bool col_name_flag = (out_col_names_.empty() || out_col_names_ == in_col_names_);  // true if col name is unchanged
534 
535   // column names are unchanged
536   if (col_name_flag) {
537     if (out_col_names_.empty()) out_col_names_ = in_col_names_;
538     column_name_id_map_ = child_map_;
539     return Status::OK();
540   }
541   // column names are changed from this point onward, this map is the child_map without input cols for per_batch_map
542   auto child_map_no_in_col = child_map_;
543 
544   for (const auto &col : in_col_names_) {
545     child_map_no_in_col.erase(col);
546   }
547 
548   // col names are changed
549   if (out_col_names_.size() == in_col_names_.size()) {  // column names changed, but same number of columns
550     // the following code rename the input keys to output keys. ["a","b"] -> ["b", "a"] is allowed
551     column_name_id_map_ = child_map_no_in_col;
552     for (auto i = 0; i < in_col_names_.size(); i++) {
553       column_name_id_map_[out_col_names_[i]] = child_map_[in_col_names_[i]];
554     }
555   } else {  // number of columns are different, put the output column names first, then the original ones
556     for (const std::string &col : out_col_names_) {
557       column_name_id_map_.insert({col, column_name_id_map_.size()});
558     }
559     for (const auto &itr : child_map_no_in_col) {
560       column_name_id_map_.insert({itr.first, column_name_id_map_.size()});
561     }
562   }
563 
564   CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.size() == (child_map_no_in_col.size() + out_col_names_.size()),
565                                "Key error in column_name_id_map_. output_columns in batch is not set correctly!");
566   return Status::OK();
567 }
568 
GetTreeBatchSize()569 int64_t BatchOp::GetTreeBatchSize() {
570 #ifdef ENABLE_PYTHON
571   if (batch_size_func_) {
572     return -1;
573   }
574 #endif
575   return start_batch_size_;
576 }
577 
GetNextRowPullMode(TensorRow * const row)578 Status BatchOp::GetNextRowPullMode(TensorRow *const row) {
579   RETURN_UNEXPECTED_IF_NULL(row);
580   std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
581   child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
582   int32_t cur_batch_size = 0;
583   RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_)));
584   for (int i = 0; i < cur_batch_size; i++) {
585     TensorRow new_row;
586     RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
587     if (!new_row.empty()) {
588       table->emplace_back(new_row);
589       if (table->size() == static_cast<size_t>(cur_batch_size)) break;
590     } else {
591       if (drop_ || table->empty()) {
592         table = std::make_unique<TensorQTable>();  // this drops when drop == true
593       }
594     }
595   }
596   RETURN_UNEXPECTED_IF_NULL(table);
597   if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_));  // do padding if needed
598   if (!table->empty()) {
599     RETURN_IF_NOT_OK(BatchRows(&table, row, table->size()));
600     batch_cnt_++;
601     batch_num_++;
602   }
603   return Status::OK();
604 }
605 
606 }  // namespace dataset
607 }  // namespace mindspore
608