1 /** 2 * Copyright 2020-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_ 18 19 #include <memory> 20 #include <map> 21 #include <mutex> 22 #include <string> 23 #include <utility> 24 #include <vector> 25 26 #include "minddata/dataset/util/status.h" 27 #include "minddata/dataset/util/auto_index.h" 28 #include "minddata/dataset/engine/data_schema.h" 29 #include "minddata/dataset/engine/datasetops/parallel_op.h" 30 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h" 31 #include "minddata/dataset/util/queue.h" 32 #include "minddata/dataset/util/wait_post.h" 33 #include "minddata/dataset/engine/jagged_connector.h" 34 35 namespace mindspore { 36 namespace dataset { 37 using StringIndex = AutoIndexObj<std::string>; 38 39 class TextFileOp : public NonMappableLeafOp { 40 public: 41 // Constructor of TextFileOp 42 // @note The builder class should be used to call this constructor. 43 // @param num_workers - number of worker threads reading data from tf_file files. 44 // @param total_num_rows - number of rows to read 45 // @param dataset_files_list - list of filepaths for the dataset files. 46 // @param data_schema - the data schema object. 47 // @param op_connector_size - size of each queue in the connector that the child operator pulls from. 48 // @param columns_to_load - the names of the columns to load data from. 49 // @param shuffle_files - whether or not to shuffle the files before reading data. 50 // @param equal_rows_per_shard - whether or not to get equal rows for each process. 51 TextFileOp(int32_t num_workers, int64_t total_rows, int32_t worker_connector_size, std::unique_ptr<DataSchema>, 52 std::vector<std::string> text_files_list, int32_t op_connector_size, bool shuffle_files, 53 int32_t num_devices, int32_t device_id); 54 55 // Default destructor 56 ~TextFileOp() = default; 57 58 // A print method typically used for debugging 59 // @param out - The output stream to write output to 60 // @param show_all - A bool to control if you want to show all info or just a summary 61 void Print(std::ostream &out, bool show_all) const override; 62 63 // Instantiates the internal queues and connectors 64 // @return Status - the error code returned 65 Status Init() override; 66 67 // Get total rows in files. 68 // @param files - all text files. 69 // @param count - number of rows. 70 // @return Status - the error coed returned. 71 static Status CountAllFileRows(const std::vector<std::string> &files, int64_t *count); 72 73 // Op name getter 74 // @return Name of the current Op Name()75 std::string Name() const override { return "TextFileOp"; } 76 77 // DatasetName name getter 78 // \return DatasetName of the current Op 79 virtual std::string DatasetName(bool upper = false) const { return upper ? "TextFile" : "text file"; } 80 81 // File names getter 82 // @return Vector of the input file names FileNames()83 std::vector<std::string> FileNames() { return text_files_list_; } 84 85 protected: 86 // Parses a single row and puts the data into a tensor table. 87 // @param line - the content of the row. 88 // @param tensor_table - the tensor table to put the parsed data in. 89 // @param row - the id of the row filled in the tensor table. 90 // @return Status - the error code returned. 91 Status LoadTensor(const std::string &line, TensorRow *out_row) const; 92 93 // Reads a text file and loads the data into multiple TensorRows. 94 // @param file - the file to read. 95 // @param start_offset - the start offset of file. 96 // @param end_offset - the end offset of file. 97 // @param worker_id - the id of the worker that is executing this function. 98 // @return Status - the error code returned. 99 Status LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) override; 100 101 // Calculate number of rows in each shard. 102 // @return Status - the error code returned. 103 Status CalculateNumRowsPerShard() override; 104 105 // Fill the IOBlockQueue. 106 // @para i_keys - keys of file to fill to the IOBlockQueue 107 // @return Status - the error code returned. 108 Status FillIOBlockQueue(const std::vector<int64_t> &i_keys) override; 109 110 // Private function for computing the assignment of the column name map. 111 // @return - Status 112 Status ComputeColMap() override; 113 114 // Count number of rows in each file. 115 // @param file - txt file name. 116 // @return int64_t - the total number of rows in file. 117 virtual int64_t CountTotalRows(const std::string &file); 118 119 std::vector<std::string> text_files_list_; 120 std::unique_ptr<DataSchema> data_schema_; 121 }; 122 } // namespace dataset 123 } // namespace mindspore 124 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_ 125