• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_
18 
19 #include <memory>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "minddata/dataset/util/status.h"
27 #include "minddata/dataset/util/auto_index.h"
28 #include "minddata/dataset/engine/data_schema.h"
29 #include "minddata/dataset/engine/datasetops/parallel_op.h"
30 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
31 #include "minddata/dataset/util/queue.h"
32 #include "minddata/dataset/util/wait_post.h"
33 #include "minddata/dataset/engine/jagged_connector.h"
34 
35 namespace mindspore {
36 namespace dataset {
37 using StringIndex = AutoIndexObj<std::string>;
38 
39 class TextFileOp : public NonMappableLeafOp {
40  public:
41   // Constructor of TextFileOp
42   // @note The builder class should be used to call this constructor.
43   // @param num_workers - number of worker threads reading data from tf_file files.
44   // @param total_num_rows - number of rows to read
45   // @param dataset_files_list - list of filepaths for the dataset files.
46   // @param data_schema - the data schema object.
47   // @param op_connector_size - size of each queue in the connector that the child operator pulls from.
48   // @param columns_to_load - the names of the columns to load data from.
49   // @param shuffle_files - whether or not to shuffle the files before reading data.
50   // @param equal_rows_per_shard - whether or not to get equal rows for each process.
51   TextFileOp(int32_t num_workers, int64_t total_rows, int32_t worker_connector_size, std::unique_ptr<DataSchema>,
52              std::vector<std::string> text_files_list, int32_t op_connector_size, bool shuffle_files,
53              int32_t num_devices, int32_t device_id);
54 
55   // Default destructor
56   ~TextFileOp() = default;
57 
58   // A print method typically used for debugging
59   // @param out - The output stream to write output to
60   // @param show_all - A bool to control if you want to show all info or just a summary
61   void Print(std::ostream &out, bool show_all) const override;
62 
63   // Instantiates the internal queues and connectors
64   // @return Status - the error code returned
65   Status Init() override;
66 
67   // Get total rows in files.
68   // @param files - all text files.
69   // @param count - number of rows.
70   // @return Status - the error coed returned.
71   static Status CountAllFileRows(const std::vector<std::string> &files, int64_t *count);
72 
73   // Op name getter
74   // @return Name of the current Op
Name()75   std::string Name() const override { return "TextFileOp"; }
76 
77   // DatasetName name getter
78   // \return DatasetName of the current Op
79   virtual std::string DatasetName(bool upper = false) const { return upper ? "TextFile" : "text file"; }
80 
81   // File names getter
82   // @return Vector of the input file names
FileNames()83   std::vector<std::string> FileNames() { return text_files_list_; }
84 
85  protected:
86   // Parses a single row and puts the data into a tensor table.
87   // @param line - the content of the row.
88   // @param tensor_table - the tensor table to put the parsed data in.
89   // @param row - the id of the row filled in the tensor table.
90   // @return Status - the error code returned.
91   Status LoadTensor(const std::string &line, TensorRow *out_row) const;
92 
93   // Reads a text file and loads the data into multiple TensorRows.
94   // @param file - the file to read.
95   // @param start_offset - the start offset of file.
96   // @param end_offset - the end offset of file.
97   // @param worker_id - the id of the worker that is executing this function.
98   // @return Status - the error code returned.
99   Status LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) override;
100 
101   // Calculate number of rows in each shard.
102   // @return Status - the error code returned.
103   Status CalculateNumRowsPerShard() override;
104 
105   // Fill the IOBlockQueue.
106   // @para i_keys - keys of file to fill to the IOBlockQueue
107   // @return Status - the error code returned.
108   Status FillIOBlockQueue(const std::vector<int64_t> &i_keys) override;
109 
110   // Private function for computing the assignment of the column name map.
111   // @return - Status
112   Status ComputeColMap() override;
113 
114   // Count number of rows in each file.
115   // @param file - txt file name.
116   // @return int64_t - the total number of rows in file.
117   virtual int64_t CountTotalRows(const std::string &file);
118 
119   std::vector<std::string> text_files_list_;
120   std::unique_ptr<DataSchema> data_schema_;
121 };
122 }  // namespace dataset
123 }  // namespace mindspore
124 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_SOURCE_TEXT_FILE_OP_H_
125