1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
17
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/io_block.h"
20 #include "minddata/dataset/engine/db_connector.h"
21 #include "minddata/dataset/engine/execution_tree.h"
22 #include "minddata/dataset/engine/jagged_connector.h"
23 #include "minddata/dataset/util/random.h"
24 #include "minddata/dataset/util/status.h"
25 #include "minddata/dataset/util/task_manager.h"
26 #include "minddata/dataset/util/wait_post.h"
27
28 namespace mindspore {
29 namespace dataset {
NonMappableLeafOp(int32_t num_workers,int32_t worker_connector_size,int64_t total_num_rows,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id)30 NonMappableLeafOp::NonMappableLeafOp(int32_t num_workers, int32_t worker_connector_size, int64_t total_num_rows,
31 int32_t op_connector_size, bool shuffle_files, int32_t num_devices,
32 int32_t device_id)
33 : ParallelOp(num_workers, op_connector_size),
34 device_id_(device_id),
35 num_devices_(num_devices),
36 load_jagged_connector_(true),
37 filename_index_(std::make_unique<StringIndex>()),
38 finished_reading_dataset_(false),
39 total_rows_(total_num_rows),
40 load_io_block_queue_(true),
41 shuffle_files_(shuffle_files),
42 num_rows_per_shard_(0),
43 num_rows_(0) {
44 worker_connector_size_ = worker_connector_size;
45 }
46
47 // Class functor operator () override.
48 // All dataset operators operate by launching a thread (see ExecutionTree). This class functor will
49 // provide the master loop that drives the logic for performing the work
operator ()()50 Status NonMappableLeafOp::operator()() {
51 RETURN_IF_NOT_OK(CalculateNumRowsPerShard());
52
53 // Put here to avoid register failed when Worker_Entry thread exits unexpected
54 RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
55
56 // launch one thread, responsible for filling mIOBlockQueue
57 RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&NonMappableLeafOp::WaitToFillIOBlockQueue, this), "", id()));
58
59 // launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
60 // data from disk into TensorRows
61 RETURN_IF_NOT_OK(tree_->LaunchWorkers(
62 num_workers_, std::bind(&NonMappableLeafOp::WorkerEntry, this, std::placeholders::_1), "", id()));
63
64 // must be called after launching workers. workers can't be spawned after this post,
65 // so workers have to be kept alive until the end of the program
66 TaskManager::FindMe()->Post();
67
68 NotifyToFillIOBlockQueue();
69 while (!finished_reading_dataset_) {
70 int32_t workers_done = 0;
71 int64_t rows_read = 0;
72 {
73 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
74 load_io_block_queue_ = true;
75 }
76
77 while (workers_done < num_workers_) {
78 TensorRow fetched_row;
79 RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
80 if (fetched_row.eoe()) {
81 workers_done++;
82 } else if (total_rows_ == 0 || rows_read < total_rows_) {
83 // we need to push a row
84 RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row), 0));
85 rows_read++;
86 } else {
87 // IOBlockQueue thread needs to:
88 // -stop pushing stuff to IOBlockQueue
89 // -call PostEndOfEpoch (will send EOE)
90 // -wait for reset
91 //
92 // Worker threads need to:
93 // -stop reading the file they are currently reading and throw it away
94 // -keep pulling, but dont read other files (eventually skips all IOBlocks and will get EOE)
95 //
96 // Master thread needs to:
97 // -tell IOBlockQueue thread to stop pushing
98 // -tell worker threads to stop reading the file tey are currently reading
99 // -keep pulling until EOE
100
101 // don't think we need a lock for now
102 load_jagged_connector_ = false;
103
104 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
105 load_io_block_queue_ = false;
106 }
107 }
108
109 // all workers finished reading for this epoch, and we have read all the data from all workers
110 RETURN_IF_NOT_OK(out_connector_->SendEOE());
111
112 if (IsLastIteration()) {
113 finished_reading_dataset_ = true;
114 NotifyToFillIOBlockQueue();
115 } else {
116 jagged_rows_connector_->DoReset();
117 // Self-reset to start a new iteration
118 RETURN_IF_NOT_OK(Reset());
119 }
120 UpdateRepeatAndEpochCounter();
121 }
122
123 RETURN_IF_NOT_OK(out_connector_->SendEOF());
124
125 RETURN_IF_NOT_OK(PostEndOfData());
126
127 return Status::OK();
128 }
129
130 // The entry point for when workers are launched.
WorkerEntry(int32_t worker_id)131 Status NonMappableLeafOp::WorkerEntry(int32_t worker_id) {
132 // must be called first if called by worker spawned by taskgroup
133 TaskManager::FindMe()->Post();
134
135 std::unique_ptr<FilenameBlock> io_block;
136 RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
137
138 while (!io_block->eof()) {
139 if (!io_block->eoe()) {
140 if (load_jagged_connector_) {
141 std::string filename;
142 RETURN_IF_NOT_OK(io_block->GetFilename(&filename, *filename_index_));
143 int64_t start_offset = io_block->GetStartOffset();
144 int64_t end_offset = io_block->GetEndOffset();
145 RETURN_IF_NOT_OK(LoadFile(filename, start_offset, end_offset, worker_id));
146 MS_LOG(DEBUG) << Name() << " operator worker " << worker_id << " loaded file " << filename << ".";
147 }
148 } else {
149 TensorRow eoe = TensorRow(TensorRow::kFlagEOE);
150 RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(eoe)));
151 }
152
153 RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
154 }
155
156 return Status::OK();
157 }
158
159 // Pushes a control indicator onto the IOBlockQueue for each worker to consume.
160 // When the worker pops this control indicator, it will shut itself down gracefully.
PostEndOfData()161 Status NonMappableLeafOp::PostEndOfData() {
162 for (int i = 0; i < num_workers_; ++i) {
163 std::unique_ptr<FilenameBlock> eof = std::make_unique<FilenameBlock>(IOBlock::kDeIoBlockFlagEof);
164 RETURN_IF_NOT_OK(PushIoBlockQueue(i, std::move(eof)));
165 }
166
167 return Status::OK();
168 }
169
170 // Pushes a control indicator onto the IOBlockQueue for each worker to consume. When the worker
171 // pops this control indicator, it will wait until the next epoch starts and then resume execution.
PostEndOfEpoch(int32_t queue_index)172 Status NonMappableLeafOp::PostEndOfEpoch(int32_t queue_index) {
173 for (int i = 0; i < num_workers_; ++i) {
174 std::unique_ptr<FilenameBlock> eoe = std::make_unique<FilenameBlock>(IOBlock::kDeIoBlockFlagEoe);
175 RETURN_IF_NOT_OK(PushIoBlockQueue((queue_index + i) % num_workers_, std::move(eoe)));
176 }
177
178 return Status::OK();
179 }
180
181 // Notifies the thread which called WaitToFillIOBlockQueue to resume execution.
NotifyToFillIOBlockQueue()182 void NonMappableLeafOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); }
183
184 // Pops an element from a queue in io_block_queues
PopIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> * out_block)185 Status NonMappableLeafOp::PopIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> *out_block) {
186 RETURN_IF_NOT_OK(io_block_queues_[index]->PopFront(out_block));
187 return Status::OK();
188 }
189
190 // Pushes an element to a queue in io_block_queues
PushIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> && io_block)191 Status NonMappableLeafOp::PushIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> &&io_block) {
192 RETURN_IF_NOT_OK(io_block_queues_[index]->Add(std::move(io_block)));
193 return Status::OK();
194 }
195
196 // Overrides base class reset method. Cleans up any state info from it's previous execution and
197 // reinitializes itself so that it can be executed again, as if it was just created.
Reset()198 Status NonMappableLeafOp::Reset() {
199 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
200 // start workers first, otherwise IOBlocks will fall through if workers see it before this is set to true
201 load_jagged_connector_ = true;
202
203 {
204 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
205 load_io_block_queue_ = true;
206 }
207
208 RETURN_IF_NOT_OK(ParallelOp::Reset());
209 NotifyToFillIOBlockQueue();
210
211 return Status::OK();
212 }
213
NeedPushFileToBlockQueue(const std::string & file_name,int64_t * start_offset,int64_t * end_offset,const int64_t & pre_count)214 bool NonMappableLeafOp::NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset,
215 int64_t *end_offset, const int64_t &pre_count) {
216 *start_offset = 0;
217 *end_offset = 0;
218 bool push = false;
219 int64_t start_index = device_id_ * num_rows_per_shard_;
220 if (device_id_ + 1 < 0) {
221 MS_LOG(ERROR) << "Device id is invalid, got " + std::to_string(device_id_);
222 return false;
223 }
224
225 int64_t end_index = (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_;
226 if (pre_count <= start_index && pre_count + filename_numrows_[file_name] > start_index) {
227 *start_offset = start_index - pre_count;
228 push = true;
229 if (pre_count < end_index && pre_count + filename_numrows_[file_name] >= end_index) {
230 *end_offset = end_index - pre_count;
231 } else {
232 *end_offset = filename_numrows_[file_name];
233 }
234 }
235
236 if (pre_count >= start_index && pre_count < end_index) {
237 *start_offset = 0;
238 push = true;
239 if (pre_count + filename_numrows_[file_name] >= end_index) {
240 *end_offset = end_index - pre_count;
241 } else {
242 *end_offset = filename_numrows_[file_name];
243 }
244 }
245
246 return push;
247 }
248
ShuffleKeys(std::vector<int64_t> * i_keys,uint32_t seed)249 void NonMappableLeafOp::ShuffleKeys(std::vector<int64_t> *i_keys, uint32_t seed) {
250 std::mt19937 rng(seed);
251 std::shuffle(i_keys->begin(), i_keys->end(), rng);
252 }
253
WaitToFillIOBlockQueue()254 Status NonMappableLeafOp::WaitToFillIOBlockQueue() {
255 // must be called first if called by worker spanwed by taskgroup
256 TaskManager::FindMe()->Post();
257
258 std::vector<int64_t> i_keys;
259 if (shuffle_files_) {
260 for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
261 i_keys.push_back(it.key());
262 }
263 }
264 uint32_t seed = 0;
265 while (true) {
266 RETURN_IF_NOT_OK(io_block_queue_wait_post_.Wait());
267 io_block_queue_wait_post_.Clear();
268
269 if (finished_reading_dataset_) {
270 break;
271 }
272
273 if (shuffle_files_) {
274 ShuffleKeys(&i_keys, num_devices_ == 1 ? GetSeed() : ++seed);
275 }
276 RETURN_IF_NOT_OK(FillIOBlockQueue(i_keys));
277 }
278 return Status::OK();
279 }
280 } // namespace dataset
281 } // namespace mindspore
282