1 /** 2 * Copyright 2019-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_ 18 19 #include <atomic> 20 #include <memory> 21 #include <string> 22 #include <unordered_map> 23 #include <utility> 24 #include <vector> 25 26 #include "minddata/dataset/callback/ds_callback.h" 27 #include "minddata/dataset/engine/dataset_iterator.h" 28 #include "minddata/dataset/engine/datasetops/map_op/map_job.h" 29 #include "minddata/dataset/engine/datasetops/parallel_op.h" 30 #include "minddata/dataset/kernels/tensor_op.h" 31 #include "minddata/dataset/util/queue.h" 32 #include "minddata/dataset/util/wait_post.h" 33 34 namespace mindspore { 35 namespace dataset { 36 // Forward declare 37 class ExecutionTree; 38 39 // MapOp class implements the Map operator. It will apply a list of operations to each record specified by column names. 40 // The column order behavior after MapOp is as follows. 41 // [Case 1] If the number of Input Columns == the number of Output Column, column ordering after MapOp 42 // is the same as the original column order where the Remainder Columns stay in the same position, 43 // and the Output Columns are placed the same position of the Input Columns. 44 // For example, initially if the dataset has column order |A, B, C, D, E|, 45 // and we apply MapOp() with Input Columns {B, C} and Output Columns {X, Y}. 46 // The column order after applying MapOp will be |A, X, Y, D, E|. 47 // Note that in this case, |X, Y| is the Output Columns and |A, D, E| which is the Remainder Columns stay in 48 // their original position, and column B is replaced by column X and column C is replace by column Y. 49 // [Case 2] If the number of Input Columns != the number of Output Column, column ordering after MapOp 50 // is Output Columns followed by Remainder Columns. 51 // For example, initially if the dataset has column order |A, B, C, D, E|, 52 // and we apply MapOp() with Input Columns {B, C, A} and Output Columns {X, Y}. 53 // The column order after applying MapOp will be |X, Y, D, E|. 54 // Note that in this case, |X, Y| is the Output Columns and |D, E| is the Remainder Columns, 55 // and the Input Columns are gone and replaced by the Output Columns. 56 57 // Keywords: 58 // Input Columns : a vector of column names (string) passed to MapOp specifying the column names from which 59 // Tensors are taken and passed to the TensorOp Compute(). 60 // Output Columns : a vector of column names (string) passed to MapOp specifying what are the column names 61 // for the Tensors produced by TensorOp Compute(). 62 // Remainder Columns : columns that exist in the dataset but are not mentioned in Input Columns. 63 // These columns will not be passed to TensorOp Compute(), but will be appended to the end of the Output Columns. 64 class MapOp : public ParallelOp { 65 public: 66 // Constructor of MapOp 67 // @note The builder class should be used to call it. 68 // @param in_col_names A list of input column names (should match the input/output \p tensorFuncs). 69 // @param out_col_names A list of output column names (should match the input/output \p tensorFuncs). 70 // @param tensor_funcs A list of TensorOp pointers for MapOp to apply to each data. 71 // @param num_workers The number of worker threads. 72 // @param op_connector_size The size of each queue in the connector. 73 MapOp(const std::vector<std::string> &in_col_names, const std::vector<std::string> &out_col_names, 74 std::vector<std::shared_ptr<TensorOp>> tensor_funcs, int32_t num_workers, int32_t op_connector_size); 75 76 // Destructor 77 ~MapOp() = default; 78 79 // A print method typically used for debugging 80 // @param out The output stream to write output to 81 // @param show_all A bool to control if you want to show all info or just a summary 82 void Print(std::ostream &out, bool show_all) const override; 83 84 // << Stream output operator overload 85 // @notes This allows you to write the debug print info using stream operators 86 // @param out reference to the output stream being overloaded 87 // @param mo reference to the MapOp to display 88 // @return the output stream must be returned 89 friend std::ostream &operator<<(std::ostream &out, const MapOp &mo) { 90 mo.Print(out, false); 91 return out; 92 } 93 94 // Class functor operator () override. 95 // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will 96 // provide the master loop that drives the logic for performing the work 97 // This main thread creates local queues, pulls TensorRow from the previous 98 // op's Connector and distributes them to the local queues. Workers pull from the local queues. 99 // @return Status The status code returned 100 Status operator()() override; 101 102 // Getter 103 // @return the number of threads consuming data from previous op's output Connector. 104 int32_t NumConsumers() const override; 105 106 // Op name getter 107 // @return Name of the current Op Name()108 std::string Name() const override { return kMapOp; } 109 110 // List of tensor ops getter/setter 111 // @Return the vector of tensor ops by non-const reference 112 TFuncs()113 auto &TFuncs() { return tfuncs_; } 114 TFuncs()115 const auto &TFuncs() const { return tfuncs_; } 116 117 private: 118 // A unit of job for map worker thread. 119 // MapWorkerJob holds a list of MapJob where each MapJob can be a CpuMapJob, GpuMapJob or DvppMapJob. 120 struct MapWorkerJob { MapWorkerJobMapWorkerJob121 explicit MapWorkerJob(TensorRow tr) : tensor_row(std::move(tr)) {} 122 std::vector<std::shared_ptr<MapJob>> jobs; 123 TensorRow tensor_row; 124 }; 125 126 // A helper function to create jobs for workers. 127 Status GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job); 128 129 // A helper function that fetch worker map job from local queues and extract the data and map job list 130 Status FetchNextWork(uint32_t worker_id, TensorRow *row, std::vector<std::shared_ptr<MapJob>> *job_list); 131 132 // Local queues where worker threads get a job from 133 QueueList<std::unique_ptr<MapWorkerJob>> local_queues_; 134 135 // Tensorops to be read and applied by worker threads 136 std::vector<std::shared_ptr<TensorOp>> tfuncs_; 137 138 // Variable to store the column name that the tensorOps are consuming 139 std::vector<std::string> in_columns_; 140 141 // Variable to store the column name that the tensorOps are producing 142 std::vector<std::string> out_columns_; 143 144 // Boolean mapping, true means to keep the column. 145 std::vector<bool> keep_input_columns_; 146 147 // Indices of the columns to process. 148 std::vector<size_t> to_process_indices_; 149 150 std::unique_ptr<ChildIterator> child_iterator_; // An iterator for fetching. 151 152 // Private function for worker/thread to loop continuously. It comprises the main 153 // logic of MapOp: getting the data from previous Op, validating user specified column names, 154 // applying a list of TensorOps to each of the data, process the results and then 155 // pushing them back to MapOp's output Connector to be fetched by the next Op. 156 // @param worker_id The id assigned to this thread/worker upon creation. 157 // @return Status The status code returned 158 Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_ 159 160 // Private function for worker thread to perform TensorOp's compute function and get the result. 161 // @param in_row Input TensorRow 162 // @param[out] out_row Generated TensorRow 163 Status WorkerCompute(const TensorRow &in_row, TensorRow *out_row, 164 const std::vector<std::shared_ptr<MapJob>> &job_list); 165 166 // Private function that create the final column name to index mapping and 167 // get indices of the columns this mapop does not use. 168 // @param col_name_id_map The column name to index mapping obtained from child operator 169 void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map); 170 171 // Validating if each of the input_columns exists in col_name_id_map. 172 // @param - the column map to check 173 // @return - status return code 174 Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map); 175 176 // Private function for computing the assignment of the column name map. 177 // @return - Status 178 Status ComputeColMap() override; 179 180 // Private function for initializing private variables such as in_columns_, out_columns_. 181 // @return - Status 182 Status InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map); 183 184 // This function should only be called from master thread. It intends to suspend the operation of all workers and 185 // have them wait on the QueueList. Master thread would send a token to each worker then wait on a WaitPost. 186 // Workers upon receiving the suspension token from master thread, increment an atomic count, the last worker 187 // who does the increment wakes up the master. 188 // @return - Status 189 Status WaitForWorkers() override; 190 }; 191 } // namespace dataset 192 } // namespace mindspore 193 194 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_MAP_OP_H_ 195