• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_
18 
19 #include <atomic>
20 #include <memory>
21 #include <string>
22 #include <unordered_map>
23 #include <utility>
24 #include <vector>
25 
26 #include "minddata/dataset/callback/ds_callback.h"
27 #include "minddata/dataset/engine/dataset_iterator.h"
28 #include "minddata/dataset/engine/datasetops/map_op/map_job.h"
29 #include "minddata/dataset/engine/datasetops/parallel_op.h"
30 #include "minddata/dataset/kernels/tensor_op.h"
31 #include "minddata/dataset/util/queue.h"
32 #include "minddata/dataset/util/wait_post.h"
33 
34 namespace mindspore {
35 namespace dataset {
36 // Forward declare
37 class ExecutionTree;
38 
39 // MapOp class implements the Map operator. It will apply a list of operations to each record specified by column names.
40 // The column order behavior after MapOp is as follows.
41 // [Case 1] If the number of Input Columns == the number of Output Column, column ordering after MapOp
42 // is the same as the original column order where the Remainder Columns stay in the same position,
43 // and the Output Columns are placed the same position of the Input Columns.
44 // For example, initially if the dataset has column order |A, B, C, D, E|,
45 // and we apply MapOp() with Input Columns {B, C} and Output Columns {X, Y}.
46 // The column order after applying MapOp will be |A, X, Y, D, E|.
47 // Note that in this case, |X, Y| is the Output Columns and |A, D, E| which is the Remainder Columns stay in
48 // their original position, and column B is replaced by column X and column C is replace by column Y.
49 // [Case 2] If the number of Input Columns != the number of Output Column, column ordering after MapOp
50 // is Output Columns followed by Remainder Columns.
51 // For example, initially if the dataset has column order |A, B, C, D, E|,
52 // and we apply MapOp() with Input Columns {B, C, A} and Output Columns {X, Y}.
53 // The column order after applying MapOp will be |X, Y, D, E|.
54 // Note that in this case, |X, Y| is the Output Columns and |D, E| is the Remainder Columns,
55 // and the Input Columns are gone and replaced by the Output Columns.
56 
57 // Keywords:
58 // Input Columns : a vector of column names (string) passed to MapOp specifying the column names from which
59 //     Tensors are taken and passed to the TensorOp Compute().
60 // Output Columns : a vector of column names (string) passed to MapOp specifying what are the column names
61 //     for the Tensors produced by TensorOp Compute().
62 // Remainder Columns : columns that exist in the dataset but are not mentioned in Input Columns.
63 //     These columns will not be passed to TensorOp Compute(), but will be appended to the end of the Output Columns.
64 class MapOp : public ParallelOp {
65  public:
66   // Constructor of MapOp
67   // @note The builder class should be used to call it.
68   // @param in_col_names A list of input column names (should match the input/output \p tensorFuncs).
69   // @param out_col_names A list of output column names (should match the input/output \p tensorFuncs).
70   // @param tensor_funcs A list of TensorOp pointers for MapOp to apply to each data.
71   // @param num_workers The number of worker threads.
72   // @param op_connector_size The size of each queue in the connector.
73   MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names,
74         std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size);
75 
76   // Destructor
77   ~MapOp() = default;
78 
79   // A print method typically used for debugging
80   // @param out The output stream to write output to
81   // @param show_all A bool to control if you want to show all info or just a summary
82   void Print(std::ostream &out, bool show_all) const override;
83 
84   // << Stream output operator overload
85   // @notes This allows you to write the debug print info using stream operators
86   // @param out reference to the output stream being overloaded
87   // @param mo reference to the MapOp to display
88   // @return the output stream must be returned
89   friend std::ostream &operator<<(std::ostream &out, const MapOp &mo) {
90     mo.Print(out, false);
91     return out;
92   }
93 
94   // Class functor operator () override.
95   // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
96   // provide the master loop that drives the logic for performing the work
97   // This main thread creates local queues, pulls TensorRow from the previous
98   // op's Connector and distributes them to the local queues. Workers pull from the local queues.
99   // @return Status The status code returned
100   Status operator()() override;
101 
102   // Getter
103   // @return the number of threads consuming data from previous op's output Connector.
104   int32_t NumConsumers() const override;
105 
106   // Op name getter
107   // @return Name of the current Op
Name()108   std::string Name() const override { return kMapOp; }
109 
110   // List of tensor ops getter/setter
111   // @Return the vector of tensor ops by non-const reference
112 
TFuncs()113   auto &TFuncs() { return tfuncs_; }
114 
TFuncs()115   const auto &TFuncs() const { return tfuncs_; }
116 
117  private:
118   // A unit of job for map worker thread.
119   // MapWorkerJob holds a list of MapJob where each MapJob can be a CpuMapJob, GpuMapJob or DvppMapJob.
120   struct MapWorkerJob {
MapWorkerJobMapWorkerJob121     explicit MapWorkerJob(TensorRow tr) : tensor_row(std::move(tr)) {}
122     std::vector<std::shared_ptr<MapJob>> jobs;
123     TensorRow tensor_row;
124   };
125 
126   // A helper function to create jobs for workers.
127   Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job);
128 
129   // A helper function that fetch worker map job from local queues and extract the data and map job list
130   Status FetchNextWork(uint32_t worker_id, TensorRow *row, std::vector<std::shared_ptr<MapJob>> *job_list);
131 
132   // Local queues where worker threads get a job from
133   QueueList<std::unique_ptr<MapWorkerJob>> local_queues_;
134 
135   //  Tensorops to be read and applied by worker threads
136   std::vector<std::shared_ptr<TensorOp>> tfuncs_;
137 
138   // Variable to store the column name that the tensorOps are consuming
139   std::vector<std::string> in_columns_;
140 
141   // Variable to store the column name that the tensorOps are producing
142   std::vector<std::string> out_columns_;
143 
144   // Boolean mapping, true means to keep the column.
145   std::vector<bool> keep_input_columns_;
146 
147   // Indices of the columns to process.
148   std::vector<size_t> to_process_indices_;
149 
150   std::unique_ptr<ChildIterator> child_iterator_;  // An iterator for fetching.
151 
152   // Private function for worker/thread to loop continuously. It comprises the main
153   // logic of MapOp: getting the data from previous Op, validating user specified column names,
154   // applying a list of TensorOps to each of the data, process the results and then
155   // pushing them back to MapOp's output Connector to be fetched by the next Op.
156   // @param worker_id The id assigned to this thread/worker upon creation.
157   // @return Status The status code returned
158   Status WorkerEntry(int32_t worker_id) override;  //  In: workerId assigned by tree_
159 
160   // Private function for worker thread to perform TensorOp's compute function and get the result.
161   // @param in_row Input TensorRow
162   // @param[out] out_row Generated TensorRow
163   Status WorkerCompute(const TensorRow &in_row, TensorRow *out_row,
164                        const std::vector<std::shared_ptr<MapJob>> &job_list);
165 
166   // Private function that create the final column name to index mapping and
167   // get indices of the columns this mapop does not use.
168   // @param col_name_id_map The column name to index mapping obtained from child operator
169   void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map);
170 
171   // Validating if each of the input_columns exists in col_name_id_map.
172   // @param - the column map to check
173   // @return - status return code
174   Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map);
175 
176   // Private function for computing the assignment of the column name map.
177   // @return - Status
178   Status ComputeColMap() override;
179 
180   // Private function for initializing private variables such as in_columns_, out_columns_.
181   // @return - Status
182   Status InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map);
183 
184   // This function should only be called from master thread. It intends to suspend the operation of all workers and
185   // have them wait on the QueueList. Master thread would send a token to each worker then wait on a WaitPost.
186   // Workers upon receiving the suspension token from master thread, increment an atomic count, the last worker
187   // who does the increment wakes up the master.
188   // @return - Status
189   Status WaitForWorkers() override;
190 };
191 }  // namespace dataset
192 }  // namespace mindspore
193 
194 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_
195