• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/map_op/map_op.h"
17 #include <cstring>
18 #include <memory>
19 #include <vector>
20 
21 #include "minddata/dataset/callback/callback_param.h"
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/include/dataset/constants.h"
24 #include "minddata/dataset/core/global_context.h"
25 
26 #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
27 #include "minddata/dataset/kernels/tensor_op.h"
28 #include "minddata/dataset/util/log_adapter.h"
29 #include "minddata/dataset/util/task_manager.h"
30 
31 namespace mindspore {
32 namespace dataset {
33 // Constructor of MapOp
MapOp(const std::vector<std::string> & in_col_names,const std::vector<std::string> & out_col_names,std::vector<std::shared_ptr<TensorOp>> tensor_funcs,int32_t num_workers,int32_t op_connector_size)34 MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
35              std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size)
36     : ParallelOp(num_workers, op_connector_size),
37       tfuncs_(std::move(tensor_funcs)),
38       in_columns_(in_col_names),
39       out_columns_(out_col_names) {
40   // Set connector size via config.
41   // If caller didn't specify the out_col_names, assume they are same as the in_columns.
42   if (out_columns_.empty() || out_columns_[0].empty()) {
43     out_columns_ = in_columns_;
44   }
45 }
46 
47 // The number of threads consuming data from previous op's output Connector.
NumConsumers() const48 int32_t MapOp::NumConsumers() const {
49   // When Performance Mode is on, there is only one thread consuming from the previous Connector.
50   return 1;
51 }
52 
53 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const54 void MapOp::Print(std::ostream &out, bool show_all) const {
55   if (!show_all) {
56     // Call the super class for displaying any common 1-liner info
57     ParallelOp::Print(out, show_all);
58     // Then show any custom derived-internal 1-liner info for this op
59     out << "\n";
60   } else {
61     // Call the super class for displaying any common detailed info
62     ParallelOp::Print(out, show_all);
63     // Then show any custom derived-internal stuff
64     out << "\nInput column names:";
65     for (size_t i = 0; i < in_columns_.size(); i++) {
66       out << " " << in_columns_[i];
67     }
68     out << "\n  TensorOps:";
69     for (size_t i = 0; i < tfuncs_.size(); i++) {
70       out << " " << *(tfuncs_[i].get());
71     }
72     out << "\n\n";
73   }
74 }
75 
76 // A helper function that fetch worker map job from local queues and extract the data and map job list
FetchNextWork(uint32_t worker_id,TensorRow * row,std::vector<std::shared_ptr<MapJob>> * job_list)77 Status MapOp::FetchNextWork(uint32_t worker_id, TensorRow *row, std::vector<std::shared_ptr<MapJob>> *job_list) {
78   std::unique_ptr<MapWorkerJob> worker_job;
79   // Fetch the next worker job and TensorRow
80   RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&worker_job));
81   // Extract the TensorRow and job list from the map worker job.
82   *row = std::move(worker_job->tensor_row);
83   *job_list = std::move(worker_job->jobs);
84 
85   return Status::OK();
86 }
87 
GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> * worker_job)88 Status MapOp::GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job) {
89   std::shared_ptr<MapJob> map_job = nullptr;
90   MapTargetDevice prev_target = MapTargetDevice::kCpu;
91   for (size_t i = 0; i < tfuncs_.size(); i++) {
92     // Currently we only have CPU as the device target
93     // In the future, we will have heuristic or control from user to select target device
94     MapTargetDevice target_device = MapTargetDevice::kCpu;
95 
96     // If there is no existing map_job, we will create one.
97     // map_job could be nullptr when we are at the first tensor op or when the target device of the prev op
98     // is different with that of the current op.
99     if (map_job == nullptr) {
100       map_job = std::make_shared<CpuMapJob>();
101     }
102     RETURN_IF_NOT_OK(map_job->AddOperation(tfuncs_[i]));
103 
104     // Push map_job into worker_job if one of the two conditions is true:
105     // 1) It is the last tensor operation in tfuncs_
106     // 2) The the target device of the current tensor operation is different with previous one
107     if ((i + 1 == tfuncs_.size()) || ((i != 0) && (prev_target != target_device))) {
108       (*worker_job)->jobs.push_back(std::move(map_job));
109     }
110 
111     prev_target = target_device;
112   }
113 
114   return Status::OK();
115 }
116 
117 // This class functor will provide the master loop that drives the logic for performing the work
operator ()()118 Status MapOp::operator()() {
119   // Create and register the local queues.
120   local_queues_.Init(num_workers_, oc_queue_size_);
121   // init callback
122   RETURN_IF_NOT_OK(callback_manager_.Init(this));
123   Status rc = local_queues_.Register(tree_->AllTasks());
124   RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
125   if (rc.IsError()) {
126     TaskManager::FindMe()->Post();
127     return rc;
128   }
129 
130   // The operator class just starts off threads by calling the tree_ function
131   rc =
132     tree_->LaunchWorkers(num_workers_, std::bind(&MapOp::WorkerEntry, this, std::placeholders::_1), NameWithID(), id());
133   // Synchronize with TaskManager
134   TaskManager::FindMe()->Post();
135   RETURN_IF_NOT_OK(rc);
136   // num_rows received, including eoe, num_epoch, num_step of current epoch
137   int64_t num_rows = 0, ep_step = 0, total_step = 0;
138 
139   RETURN_IF_NOT_OK(callback_manager_.Begin(CallbackParam(0, ep_step, total_step)));
140 
141   child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
142   TensorRow new_row;
143   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
144 
145   while (!new_row.eof()) {
146     if (op_current_repeats_ % GetOpNumRepeatsPerEpoch() == 0) {
147       RETURN_IF_NOT_OK(callback_manager_.EpochBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
148     }
149     while (!new_row.eoe()) {
150       ep_step++;
151       total_step++;
152       // Create an empty map worker job to be populated by a TensorRow and map jobs
153 
154       RETURN_IF_NOT_OK(callback_manager_.StepBegin(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
155 
156       std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
157 
158       // Populate map worker job for a worker to execute
159       RETURN_IF_NOT_OK(GenerateWorkerJob(&worker_job));
160 
161       // Push map worker job to the corresponding worker's queue
162       RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
163 
164       RETURN_IF_NOT_OK(callback_manager_.StepEnd(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
165 
166       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
167     }
168 
169     // check whether this is the end of a real epoch (not all eoe signals end of epoch)
170     if ((op_current_repeats_ + 1) % GetOpNumRepeatsPerEpoch() == 0) {
171       RETURN_IF_NOT_OK(callback_manager_.EpochEnd(CallbackParam(op_current_epochs_ + 1, ep_step, total_step)));
172 
173       ep_step = 0;
174     }
175     // Propagate the eoe row to worker
176     std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
177     RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
178     UpdateRepeatAndEpochCounter();
179     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
180   }
181   // End() is commented out because it might never be called due to the lack of EOF when EpochCtrl is -1
182   // Handle eof logic, this code might never be reached if epoch_ctrl = -1.
183   std::unique_ptr<MapWorkerJob> worker_job = std::make_unique<MapWorkerJob>(std::move(new_row));
184   RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(worker_job)));
185 
186   // Quit all workers, this code might never be reached if EpochCtrl is -1.
187   for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
188     TensorRow quit_flag(TensorRow::kFlagQuit);
189     auto quit = std::make_unique<MapWorkerJob>(quit_flag);
190     RETURN_IF_NOT_OK(local_queues_[num_rows++ % num_workers_]->Add(std::move(quit)));
191   }
192 
193   return Status::OK();
194 }
195 
196 // Private function for worker/thread to loop continuously. It comprises the main
197 // logic of MapOp: getting the data from previous Op, validating user specified column names,
198 // applying a list of TensorOps to each of the data, process the results and then
199 // pushing them back to MapOp's output Connector to be fetched by the next Op.
WorkerEntry(int32_t worker_id)200 Status MapOp::WorkerEntry(int32_t worker_id) {
201   // Handshake with TaskManager that thread creation is successful.
202   TaskManager::FindMe()->Post();
203 
204   TensorRow in_row;
205   std::vector<std::shared_ptr<MapJob>> job_list;
206   // Fetch next data row and map job list
207   RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
208 
209   // Now that init work is done, drop into the main fetching loop.
210   // Map op does not use child iterator, and it needs to manually handle eoe and eof's itself
211   // rather than use the base-class defaults.
212   while (true) {
213     // Handle special logic where row carries a ctrl flag.
214     if (in_row.Flags() != TensorRow::kFlagNone) {
215       if (in_row.wait()) {
216         // When worker receives the signal from master thread, it increments a atomic int
217         // The last guy who increments the counter, wakes up master thread
218         if (++num_workers_paused_ == num_workers_) {
219           wait_for_workers_post_.Set();
220         }
221         // This will block the worker until master thread gives it a new work
222       } else if (in_row.eoe()) {
223         // Calling base class EoeReceived to forward eoe row.
224         RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
225       } else if (in_row.eof()) {
226         // Calling base class EofReceived to forward eof row.
227         RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
228       } else if (in_row.quit()) {
229         break;
230       }
231       RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
232       continue;
233     }
234     CHECK_FAIL_RETURN_UNEXPECTED(in_row.size() != 0, "MapOp got an empty TensorRow.");
235     TensorRow out_row;
236     // Perform the compute function of TensorOp(s) and store the result in new_tensor_table.
237     RETURN_IF_NOT_OK(WorkerCompute(in_row, &out_row, job_list));
238     // Push the row onto the connector for next operator to consume.
239     RETURN_IF_NOT_OK(out_connector_->Add(std::move(out_row), static_cast<int>(worker_id)));
240     // Fetch next data row and map job list
241     RETURN_IF_NOT_OK(FetchNextWork(worker_id, &in_row, &job_list));
242   }
243   return Status::OK();
244 }
245 
WorkerCompute(const TensorRow & in_row,TensorRow * out_row,const std::vector<std::shared_ptr<MapJob>> & job_list)246 Status MapOp::WorkerCompute(const TensorRow &in_row, TensorRow *out_row,
247                             const std::vector<std::shared_ptr<MapJob>> &job_list) {
248   int32_t num_cols = in_row.size();
249 
250   std::vector<TensorRow> job_input_table;
251   std::vector<TensorRow> original_table;
252   TensorRow to_process;
253   // Prepare the data that we need from in_row
254   // to_process   : A vector of Tensors only holding cols in input_columns.
255 
256   // From the current row, select the Tensor that need to be passed to TensorOp
257   (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process),
258                        [&in_row](const auto &it) { return std::move(in_row[it]); });
259   to_process.setId(in_row.getId());
260   std::vector<std::string> cur_row_path = in_row.getPath();
261   if (cur_row_path.size() > 0) {
262     std::vector<std::string> to_process_path;
263     (void)std::transform(to_process_indices_.begin(), to_process_indices_.end(), std::back_inserter(to_process_path),
264                          [&cur_row_path](const auto &it) { return cur_row_path[it]; });
265     to_process.setPath(to_process_path);
266   }
267   job_input_table.push_back(std::move(to_process));
268   original_table.push_back(std::move(in_row));
269 
270   // Variable to keep the result after executing the job.
271   std::vector<TensorRow> result_table;
272   // Executing the list of jobs.
273   for (size_t i = 0; i < job_list.size(); i++) {
274     RETURN_IF_INTERRUPTED();
275     // Execute MapWorkerJob.
276     RETURN_IF_NOT_OK(job_list[i]->Run(job_input_table, &result_table));
277     // Assign the processed data as an input for the next job processing, except for the last TensorOp in the list.
278     if (i + 1 < job_list.size()) {
279       job_input_table = std::move(result_table);
280     }
281   }
282 
283   // Sanity check a row in result_table
284   if (!result_table.empty() && out_columns_.size() != result_table[0].size()) {
285     RETURN_STATUS_UNEXPECTED("Result of a tensorOp doesn't match output column names");
286   }
287 
288   // Merging the data processed by job (result_table) with the data that are not used.
289   if (in_columns_.size() == out_columns_.size()) {
290     // Place the processed tensor back into the original index of the input tensor
291     for (size_t i = 0; i < result_table[0].size(); i++) {
292       original_table[0][to_process_indices_[i]] = std::move(result_table[0][i]);
293     }
294     *out_row = std::move(original_table[0]);
295   } else {
296     // Append the data in the original table that we did not use to the end of each row in result_table.
297     for (int32_t i = 0; i < num_cols; i++) {
298       if (keep_input_columns_[i]) {
299         result_table[0].push_back(std::move(original_table[0][i]));
300       }
301     }
302     *out_row = std::move(result_table[0]);
303   }
304 
305   return Status::OK();
306 }
307 
ComputeColMap()308 Status MapOp::ComputeColMap() {
309   // If the map has not been set up yet in the base class, then set it up
310   if (column_name_id_map_.empty()) {
311     std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
312     // Initialize private variables
313     RETURN_IF_NOT_OK(InitPrivateVariable(&current_name_id_map));
314     // Create the final column name to index mapping in the base class field
315     CreateFinalColMap(&current_name_id_map);
316     MS_LOG(DEBUG) << "Column name map for map op is: " << this->ColumnNameMapAsString();
317   } else {
318     MS_LOG(WARNING) << "Column name map is already set!";
319   }
320   return Status::OK();
321 }
322 
323 // Validating if each of the input_columns exists in the col_name_id_map.
ValidateInColumns(const std::unordered_map<std::string,int32_t> & col_name_id_map)324 Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
325   for (const auto &inCol : in_columns_) {
326     bool found = col_name_id_map.find(inCol) != col_name_id_map.end();
327     if (!found) {
328       std::string err_msg = "Invalid parameter, input column name: " + inCol + " doesn't exist in the dataset columns.";
329       RETURN_STATUS_UNEXPECTED(err_msg);
330     }
331   }
332   return Status::OK();
333 }
334 
InitPrivateVariable(std::unordered_map<std::string,int32_t> * col_name_id_map)335 Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) {
336   // If input_columns is empty(), The col at index-0 will be picked.
337   if (in_columns_.empty()) {
338     auto itr =
339       std::find_if(col_name_id_map->begin(), col_name_id_map->end(), [](const auto &it) { return it.second == 0; });
340     CHECK_FAIL_RETURN_UNEXPECTED(itr != col_name_id_map->end(), "Column name id map doesn't have id 0");
341     MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
342     in_columns_.push_back(itr->first);
343 
344     // If caller didn't specify the out_col_names, assume they are same as the input_columns.
345     // This was done in the constructor, but if input columns was empty to start we have to redo it here.
346     if (out_columns_.empty() || out_columns_[0].empty()) {
347       out_columns_ = in_columns_;
348     }
349   }
350 
351   // Before we continue, issue a sanity check to make sure the input columns from user and the incoming
352   // columns from child are correct
353   RETURN_IF_NOT_OK(this->ValidateInColumns(*col_name_id_map));
354 
355   // Initialize keep_input_columns, true means to keep the column.
356   keep_input_columns_.resize(col_name_id_map->size(), true);
357   for (const auto &col_name : in_columns_) {
358     int32_t missed = (*col_name_id_map)[col_name];
359     keep_input_columns_[missed] = false;
360   }
361 
362   // initialize to_process_indices.
363   for (const auto &col_name : in_columns_) {
364     to_process_indices_.push_back((*col_name_id_map)[col_name]);
365   }
366   return Status::OK();
367 }
368 
369 // Create the final column name to index mapping and get indices of the columns this mapop does not use.
CreateFinalColMap(std::unordered_map<std::string,int32_t> * col_name_id_map)370 void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map) {
371   std::unordered_map<std::string, int32_t> final_col_name_id_map;
372   size_t num_cols = col_name_id_map->size();
373   std::vector<int32_t> new_ids(num_cols);
374   if (in_columns_.size() == out_columns_.size()) {
375     for (size_t i = 0; i < in_columns_.size(); i++) {
376       int32_t loc = (*col_name_id_map)[in_columns_[i]];
377       (void)col_name_id_map->erase(in_columns_[i]);
378       (*col_name_id_map)[out_columns_[i]] = loc;
379     }
380 
381     // Set the base class final column id map result
382     column_name_id_map_ = *col_name_id_map;
383   } else {
384     int32_t fill_idx = 0;
385     // First columns of the tables are occupied by the output columns from tensorOp.
386     for (const auto &col_name : out_columns_) {
387       final_col_name_id_map[col_name] = fill_idx++;
388     }
389 
390     // Creating new_ids mapping for the columns we keep.
391     for (size_t i = 0; i < num_cols; i++) {
392       if (keep_input_columns_[i]) {
393         new_ids[i] = fill_idx++;
394       }
395     }
396 
397     // Iterating through the old mapping to update the final mapping for the columns we kept.
398     std::string name;
399     for (const auto &pair : *col_name_id_map) {
400       name = pair.first;
401       int32_t old_id = pair.second;
402       if (keep_input_columns_[old_id]) {
403         final_col_name_id_map[name] = new_ids[old_id];
404       }
405     }
406 
407     // Set the base class final column id map result
408     column_name_id_map_ = final_col_name_id_map;
409   }
410 }
411 
WaitForWorkers()412 Status MapOp::WaitForWorkers() {
413   // reset num_paused workers to 0
414   num_workers_paused_ = 0;
415   for (int32_t wkr_id = 0; wkr_id < num_workers_; wkr_id++) {
416     // a special row (id=-1, empty, none flag) is used to signal that worker needs to pause.
417     TensorRow waitRow(TensorRow::kFlagWait);
418     RETURN_IF_NOT_OK(local_queues_[wkr_id]->Add(std::make_unique<MapWorkerJob>(waitRow)));
419   }
420   // wait until all workers are done processing their work in local_queue_
421   RETURN_IF_NOT_OK(wait_for_workers_post_.Wait());
422   // clear the WaitPost for the next Wait()
423   wait_for_workers_post_.Clear();
424   return Status::OK();
425 }
426 }  // namespace dataset
427 }  // namespace mindspore
428