1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/map_op/map_op.h"
17 #include <cstring>
18 #include <memory>
19 #include <vector>
20
21 #include "minddata/dataset/callback/callback_param.h"
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/include/dataset/constants.h"
24 #include "minddata/dataset/core/global_context.h"
25
26 #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
27 #include "minddata/dataset/kernels/tensor_op.h"
28 #include "minddata/dataset/util/log_adapter.h"
29 #include "minddata/dataset/util/task_manager.h"
30
31 namespace mindspore {
32 namespace dataset {
33 // Constructor of MapOp
MapOp(const std::vector<std::string> & in_col_names,const std::vector<std::string> & out_col_names,std::vector<std::shared_ptr<TensorOp>> tensor_funcs,int32_t num_workers,int32_t op_connector_size)34 MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
35 std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size)
36 : ParallelOp(num_workers, op_connector_size),
37 tfuncs_(std::move(tensor_funcs)),
38 in_columns_(in_col_names),
39 out_columns_(out_col_names) {
40 // Set connector size via config.
41 // If caller didn't specify the out_col_names, assume they are same as the in_columns.
42 if (out_columns_.empty() || out_columns_[0].empty()) {
43 out_columns_ = in_columns_;
44 }
45 }
46
47 // The number of threads consuming data from previous op's output Connector.
NumConsumers() const48 int32_t MapOp::NumConsumers() const {
49 // When Performance Mode is on, there is only one thread consuming from the previous Connector.
50 return 1;
51 }
52
53 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const54 void MapOp::Print(std::ostream &out, bool show_all) const {
55 if (!show_all) {
56 // Call the super class for displaying any common 1-liner info
57 ParallelOp::Print(out, show_all);
58 // Then show any custom derived-internal 1-liner info for this op
59 out << "\n";
60 } else {
61 // Call the super class for displaying any common detailed info
62 ParallelOp::Print(out, show_all);
63 // Then show any custom derived-internal stuff
64 out << "\nInput column names:";
65 for (size_t i = 0; i < in_columns_.size(); i++) {
66 out << " " << in_columns_[i];
67 }
68 out << "\n TensorOps:";
69 for (size_t i = 0; i < tfuncs_.size(); i++) {
70 out << " " << *(tfuncs_[i].get());
71 }
72 out << "\n\n";
73 }
74 }
75
76 // A helper function that fetch worker map job from local queues and extract the data and map job list
FetchNextWork(uint32_t worker_id,TensorRow * row,std::vector<std::shared_ptr<MapJob>> * job_list)77 Status MapOp::FetchNextWork(uint32_t worker_id, TensorRow *row, std::vector<std::shared_ptr<MapJob>> *job_list) {
78 std::unique_ptr<MapWorkerJob> worker_job;
79 // Fetch the next worker job and TensorRow
80 RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job));
81 // Extract the TensorRow and job list from the map worker job.
82 *row = std::move(worker_job->tensor_row);
83 *job_list = std::move(worker_job->jobs);
84
85 return Status::OK();
86 }
87
GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> * worker_job)88 Status MapOp::GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job) {
89 std::shared_ptr<MapJob> map_job = nullptr;
90 MapTargetDevice prev_target = MapTargetDevice::kCpu;
91 for (size_t i = 0; i < tfuncs_.size(); i++) {
92 // Currently we only have CPU as the device target
93 // In the future, we will have heuristic or control from user to select target device
94 MapTargetDevice target_device = MapTargetDevice::kCpu;
95
96 // If there is no existing map_job, we will create one.
97 // map_job could be nullptr when we are at the first tensor op or when the target device of the prev op
98 // is different with that of the current op.
99 if (map_job == nullptr) {
100 map_job = std::make_shared<CpuMapJob>();
101 }
102 RETURN_IF_NOT_OK(map_job->AddOperation(tfuncs_[i]));
103
104 // Push map_job into worker_job if one of the two conditions is true:
105 // 1) It is the last tensor operation in tfuncs_
106 // 2) The the target device of the current tensor operation is different with previous one
107 if ((i + 1 == tfuncs_.size()) || ((i != 0) && (prev_target != target_device))) {
108 (*worker_job)->jobs.push_back(std::move(map_job));
109 }
110
111 prev_target = target_device;
112 }
113
114 return Status::OK();
115 }
116
117 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()118 Status MapOp::operator()() {
119 // Create and register the local queues.
120 local_queues_.Init(num_workers_, oc_queue_size_);
121 // init callback
122 RETURN_IF_NOT_OK(callback_manager_.Init(this));
123 Status rc = local_queues_.Register(tree_->AllTasks());
124 RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
125 if (rc.IsError()) {
126 TaskManager::FindMe()->Post();
127 return rc;
128 }
129
130 // The operator class just starts off threads by calling the tree_ function
131 rc =
132 tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID(), id());
133 // Synchronize with TaskManager
134 TaskManager::FindMe()->Post();
135 RETURN_IF_NOT_OK(rc);
136 // num_rows received, including eoe, num_epoch, num_step of current epoch
137 int64_t num_rows = 0, ep_step = 0, total_step = 0;
138
139 RETURN_IF_NOT_OK(callback_manager_.Begin(CallbackParam(0, ep_step, total_step)));
140
141 child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
142 TensorRow new_row;
143 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
144
145 while (!new_row.eof()) {
146 if (op_current_repeats_ % GetOpNumRepeatsPerEpoch() == 0) {
147 RETURN_IF_NOT_OK(callback_manager_.EpochBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
148 }
149 while (!new_row.eoe()) {
150 ep_step++;
151 total_step++;
152 // Create an empty map worker job to be populated by a TensorRow and map jobs
153
154 RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
155
156 std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
157
158 // Populate map worker job for a worker to execute
159 RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job));
160
161 // Push map worker job to the corresponding worker's queue
162 RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
163
164 RETURN_IF_NOT_OK(callback_manager_.StepEnd(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
165
166 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
167 }
168
169 // check whether this is the end of a real epoch (not all eoe signals end of epoch)
170 if ((op_current_repeats_ + 1) % GetOpNumRepeatsPerEpoch() == 0) {
171 RETURN_IF_NOT_OK(callback_manager_.EpochEnd(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
172
173 ep_step = 0;
174 }
175 // Propagate the eoe row to worker
176 std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
177 RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
178 UpdateRepeatAndEpochCounter();
179 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
180 }
181 // End() is commented out because it might never be called due to the lack of EOF when EpochCtrl is -1
182 // Handle eof logic, this code might never be reached if epoch_ctrl = -1.
183 std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
184 RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
185
186 // Quit all workers, this code might never be reached if EpochCtrl is -1.
187 for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
188 TensorRow quit_flag(TensorRow::kFlagQuit);
189 auto quit = std::make_unique<MapWorkerJob>(quit_flag);
190 RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(quit)));
191 }
192
193 return Status::OK();
194 }
195
196 // Private function for worker/thread to loop continuously. It comprises the main
197 // logic of MapOp: getting the data from previous Op, validating user specified column names,
198 // applying a list of TensorOps to each of the data, process the results and then
199 // pushing them back to MapOp's output Connector to be fetched by the next Op.
WorkerEntry(int32_t worker_id)200 Status MapOp::WorkerEntry(int32_t worker_id) {
201 // Handshake with TaskManager that thread creation is successful.
202 TaskManager::FindMe()->Post();
203
204 TensorRow in_row;
205 std::vector<std::shared_ptr<MapJob>> job_list;
206 // Fetch next data row and map job list
207 RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
208
209 // Now that init work is done, drop into the main fetching loop.
210 // Map op does not use child iterator, and it needs to manually handle eoe and eof's itself
211 // rather than use the base-class defaults.
212 while (true) {
213 // Handle special logic where row carries a ctrl flag.
214 if (in_row.Flags() != TensorRow::kFlagNone) {
215 if (in_row.wait()) {
216 // When worker receives the signal from master thread, it increments a atomic int
217 // The last guy who increments the counter, wakes up master thread
218 if (++num_workers_paused_ == num_workers_) {
219 wait_for_workers_post_.Set();
220 }
221 // This will block the worker until master thread gives it a new work
222 } else if (in_row.eoe()) {
223 // Calling base class EoeReceived to forward eoe row.
224 RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
225 } else if (in_row.eof()) {
226 // Calling base class EofReceived to forward eof row.
227 RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
228 } else if (in_row.quit()) {
229 break;
230 }
231 RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
232 continue;
233 }
234 CHECK_FAIL_RETURN_UNEXPECTED(in_row.size() != 0, "MapOp got an empty TensorRow.");
235 TensorRow out_row;
236 // Perform the compute function of TensorOp(s) and store the result in new_tensor_table.
237 RETURN_IF_NOT_OK(WorkerCompute(in_row, &out_row, job_list));
238 // Push the row onto the connector for next operator to consume.
239 RETURN_IF_NOT_OK(out_connector_->Add(std::move(out_row), static_cast<int>(worker_id)));
240 // Fetch next data row and map job list
241 RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
242 }
243 return Status::OK();
244 }
245
WorkerCompute(const TensorRow & in_row,TensorRow * out_row,const std::vector<std::shared_ptr<MapJob>> & job_list)246 Status MapOp::WorkerCompute(const TensorRow &in_row, TensorRow *out_row,
247 const std::vector<std::shared_ptr<MapJob>> &job_list) {
248 int32_t num_cols = in_row.size();
249
250 std::vector<TensorRow> job_input_table;
251 std::vector<TensorRow> original_table;
252 TensorRow to_process;
253 // Prepare the data that we need from in_row
254 // to_process : A vector of Tensors only holding cols in input_columns.
255
256 // From the current row, select the Tensor that need to be passed to TensorOp
257 (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process),
258 [&in_row](const auto &it) { return std::move(in_row[it]); });
259 to_process.setId(in_row.getId());
260 std::vector<std::string> cur_row_path = in_row.getPath();
261 if (cur_row_path.size() > 0) {
262 std::vector<std::string> to_process_path;
263 (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process_path),
264 [&cur_row_path](const auto &it) { return cur_row_path[it]; });
265 to_process.setPath(to_process_path);
266 }
267 job_input_table.push_back(std::move(to_process));
268 original_table.push_back(std::move(in_row));
269
270 // Variable to keep the result after executing the job.
271 std::vector<TensorRow> result_table;
272 // Executing the list of jobs.
273 for (size_t i = 0; i < job_list.size(); i++) {
274 RETURN_IF_INTERRUPTED();
275 // Execute MapWorkerJob.
276 RETURN_IF_NOT_OK(job_list[i]->Run(job_input_table, &result_table));
277 // Assign the processed data as an input for the next job processing, except for the last TensorOp in the list.
278 if (i + 1 < job_list.size()) {
279 job_input_table = std::move(result_table);
280 }
281 }
282
283 // Sanity check a row in result_table
284 if (!result_table.empty() && out_columns_.size() != result_table[0].size()) {
285 RETURN_STATUS_UNEXPECTED("Result of a tensorOp doesn't match output column names");
286 }
287
288 // Merging the data processed by job (result_table) with the data that are not used.
289 if (in_columns_.size() == out_columns_.size()) {
290 // Place the processed tensor back into the original index of the input tensor
291 for (size_t i = 0; i < result_table[0].size(); i++) {
292 original_table[0][to_process_indices_[i]] = std::move(result_table[0][i]);
293 }
294 *out_row = std::move(original_table[0]);
295 } else {
296 // Append the data in the original table that we did not use to the end of each row in result_table.
297 for (int32_t i = 0; i < num_cols; i++) {
298 if (keep_input_columns_[i]) {
299 result_table[0].push_back(std::move(original_table[0][i]));
300 }
301 }
302 *out_row = std::move(result_table[0]);
303 }
304
305 return Status::OK();
306 }
307
ComputeColMap()308 Status MapOp::ComputeColMap() {
309 // If the map has not been set up yet in the base class, then set it up
310 if (column_name_id_map_.empty()) {
311 std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
312 // Initialize private variables
313 RETURN_IF_NOT_OK(InitPrivateVariable(¤t_name_id_map));
314 // Create the final column name to index mapping in the base class field
315 CreateFinalColMap(¤t_name_id_map);
316 MS_LOG(DEBUG) << "Column name map for map op is: " << this->ColumnNameMapAsString();
317 } else {
318 MS_LOG(WARNING) << "Column name map is already set!";
319 }
320 return Status::OK();
321 }
322
323 // Validating if each of the input_columns exists in the col_name_id_map.
ValidateInColumns(const std::unordered_map<std::string,int32_t> & col_name_id_map)324 Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
325 for (const auto &inCol : in_columns_) {
326 bool found = col_name_id_map.find(inCol) != col_name_id_map.end();
327 if (!found) {
328 std::string err_msg = "Invalid parameter, input column name: " + inCol + " doesn't exist in the dataset columns.";
329 RETURN_STATUS_UNEXPECTED(err_msg);
330 }
331 }
332 return Status::OK();
333 }
334
InitPrivateVariable(std::unordered_map<std::string,int32_t> * col_name_id_map)335 Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) {
336 // If input_columns is empty(), The col at index-0 will be picked.
337 if (in_columns_.empty()) {
338 auto itr =
339 std::find_if(col_name_id_map->begin(), col_name_id_map->end(), [](const auto &it) { return it.second == 0; });
340 CHECK_FAIL_RETURN_UNEXPECTED(itr != col_name_id_map->end(), "Column name id map doesn't have id 0");
341 MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
342 in_columns_.push_back(itr->first);
343
344 // If caller didn't specify the out_col_names, assume they are same as the input_columns.
345 // This was done in the constructor, but if input columns was empty to start we have to redo it here.
346 if (out_columns_.empty() || out_columns_[0].empty()) {
347 out_columns_ = in_columns_;
348 }
349 }
350
351 // Before we continue, issue a sanity check to make sure the input columns from user and the incoming
352 // columns from child are correct
353 RETURN_IF_NOT_OK(this->ValidateInColumns(*col_name_id_map));
354
355 // Initialize keep_input_columns, true means to keep the column.
356 keep_input_columns_.resize(col_name_id_map->size(), true);
357 for (const auto &col_name : in_columns_) {
358 int32_t missed = (*col_name_id_map)[col_name];
359 keep_input_columns_[missed] = false;
360 }
361
362 // initialize to_process_indices.
363 for (const auto &col_name : in_columns_) {
364 to_process_indices_.push_back((*col_name_id_map)[col_name]);
365 }
366 return Status::OK();
367 }
368
369 // Create the final column name to index mapping and get indices of the columns this mapop does not use.
CreateFinalColMap(std::unordered_map<std::string,int32_t> * col_name_id_map)370 void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map) {
371 std::unordered_map<std::string, int32_t> final_col_name_id_map;
372 size_t num_cols = col_name_id_map->size();
373 std::vector<int32_t> new_ids(num_cols);
374 if (in_columns_.size() == out_columns_.size()) {
375 for (size_t i = 0; i < in_columns_.size(); i++) {
376 int32_t loc = (*col_name_id_map)[in_columns_[i]];
377 (void)col_name_id_map->erase(in_columns_[i]);
378 (*col_name_id_map)[out_columns_[i]] = loc;
379 }
380
381 // Set the base class final column id map result
382 column_name_id_map_ = *col_name_id_map;
383 } else {
384 int32_t fill_idx = 0;
385 // First columns of the tables are occupied by the output columns from tensorOp.
386 for (const auto &col_name : out_columns_) {
387 final_col_name_id_map[col_name] = fill_idx++;
388 }
389
390 // Creating new_ids mapping for the columns we keep.
391 for (size_t i = 0; i < num_cols; i++) {
392 if (keep_input_columns_[i]) {
393 new_ids[i] = fill_idx++;
394 }
395 }
396
397 // Iterating through the old mapping to update the final mapping for the columns we kept.
398 std::string name;
399 for (const auto &pair : *col_name_id_map) {
400 name = pair.first;
401 int32_t old_id = pair.second;
402 if (keep_input_columns_[old_id]) {
403 final_col_name_id_map[name] = new_ids[old_id];
404 }
405 }
406
407 // Set the base class final column id map result
408 column_name_id_map_ = final_col_name_id_map;
409 }
410 }
411
WaitForWorkers()412 Status MapOp::WaitForWorkers() {
413 // reset num_paused workers to 0
414 num_workers_paused_ = 0;
415 for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
416 // a special row (id=-1, empty, none flag) is used to signal that worker needs to pause.
417 TensorRow waitRow(TensorRow::kFlagWait);
418 RETURN_IF_NOT_OK(local_queues_[wkr_id]->Add(std::make_unique<MapWorkerJob>(waitRow)));
419 }
420 // wait until all workers are done processing their work in local_queue_
421 RETURN_IF_NOT_OK(wait_for_workers_post_.Wait());
422 // clear the WaitPost for the next Wait()
423 wait_for_workers_post_.Clear();
424 return Status::OK();
425 }
426 } // namespace dataset
427 } // namespace mindspore
428