1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
17
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/io_block.h"
20 #include "minddata/dataset/engine/execution_tree.h"
21 #include "minddata/dataset/engine/jagged_connector.h"
22 #include "minddata/dataset/util/random.h"
23 #include "minddata/dataset/util/status.h"
24 #include "minddata/dataset/util/task_manager.h"
25 #include "minddata/dataset/util/wait_post.h"
26
27 namespace mindspore {
28 namespace dataset {
NonMappableLeafOp(int32_t num_workers,int32_t worker_connector_size,int64_t total_num_rows,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id,const CompressionType & compression_type)29 NonMappableLeafOp::NonMappableLeafOp(int32_t num_workers, int32_t worker_connector_size, int64_t total_num_rows,
30 int32_t op_connector_size, bool shuffle_files, int32_t num_devices,
31 int32_t device_id, const CompressionType &compression_type)
32 : ParallelOp(num_workers, op_connector_size),
33 device_id_(device_id),
34 num_devices_(num_devices),
35 load_jagged_connector_(true),
36 filename_index_(std::make_unique<StringIndex>()),
37 finished_reading_dataset_(false),
38 total_rows_(total_num_rows),
39 load_io_block_queue_(true),
40 shuffle_files_(shuffle_files),
41 num_rows_per_shard_(0),
42 compression_type_(compression_type),
43 num_rows_(0),
44 shuffled_keys_({}),
45 prepared_data_{false},
46 curr_row_{0},
47 workers_done_{0},
48 seed_(0) {
49 worker_connector_size_ = worker_connector_size;
50 }
51
52 // Class functor operator () override.
53 // All dataset operators operate by launching a thread (see ExecutionTree). This class functor will
54 // provide the master loop that drives the logic for performing the work
operator ()()55 Status NonMappableLeafOp::operator()() {
56 RETURN_IF_NOT_OK(PrepareData());
57 while (!finished_reading_dataset_) {
58 int32_t workers_done = 0;
59 int64_t rows_read = 0;
60 {
61 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
62 load_io_block_queue_ = true;
63 }
64
65 while (workers_done < num_workers_) {
66 TensorRow fetched_row;
67 RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
68 if (fetched_row.eoe()) {
69 workers_done++;
70 } else if ((compression_type_ == CompressionType::NONE || compression_type_ == CompressionType::GZIP_WITH_COUNT ||
71 compression_type_ == CompressionType::ZLIB_WITH_COUNT) &&
72 (total_rows_ == 0 || rows_read < total_rows_)) {
73 // we need to push a row
74 RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
75 rows_read++;
76 } else if ((compression_type_ == CompressionType::GZIP || compression_type_ == CompressionType::ZLIB) &&
77 (rows_read < total_rows_ * num_devices_)) {
78 // for compressed version, total_rows_ is total rows that will be read per shard
79 // we need to push a row
80 RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
81 rows_read++;
82 } else {
83 // IOBlockQueue thread needs to:
84 // -stop pushing stuff to IOBlockQueue
85 // -call PostEndOfEpoch (will send EOE)
86 // -wait for reset
87 //
88 // Worker threads need to:
89 // -stop reading the file they are currently reading and throw it away
90 // -keep pulling, but dont read other files (eventually skips all IOBlocks and will get EOE)
91 //
92 // Master thread needs to:
93 // -tell IOBlockQueue thread to stop pushing
94 // -tell worker threads to stop reading the file they are currently reading
95 // -keep pulling until EOE
96
97 // don't think we need a lock for now
98 {
99 std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
100 load_jagged_connector_ = false;
101 }
102 {
103 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
104 load_io_block_queue_ = false;
105 }
106 }
107 }
108
109 // all workers finished reading for this epoch, and we have read all the data from all workers
110 RETURN_IF_NOT_OK(out_connector_->SendEOE());
111
112 RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
113 }
114
115 RETURN_IF_NOT_OK(out_connector_->SendEOF());
116
117 RETURN_IF_NOT_OK(PostEndOfData());
118
119 return Status::OK();
120 }
121
122 // The entry point for when workers are launched.
WorkerEntry(int32_t worker_id)123 Status NonMappableLeafOp::WorkerEntry(int32_t worker_id) {
124 // must be called first if called by worker spawned by taskgroup
125 TaskManager::FindMe()->Post();
126
127 std::unique_ptr<FilenameBlock> io_block;
128 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
129 RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
130 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
131 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
132
133 while (!io_block->eof()) {
134 if (!io_block->eoe()) {
135 if (GetLoadJaggedConnector()) {
136 std::string filename;
137 RETURN_IF_NOT_OK(io_block->GetFilename(&filename, *filename_index_));
138 int64_t start_offset = io_block->GetStartOffset();
139 int64_t end_offset = io_block->GetEndOffset();
140 RETURN_IF_NOT_OK(LoadFile(filename, start_offset, end_offset, worker_id));
141 RETURN_IF_NOT_OK(
142 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
143 MS_LOG(DEBUG) << Name() << " operator worker " << worker_id << " loaded file " << filename << ".";
144 }
145 } else {
146 TensorRow eoe = TensorRow(TensorRow::kFlagEOE);
147 RETURN_IF_NOT_OK(
148 CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
149 RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(eoe)));
150 }
151 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
152 RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
153 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
154 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
155 }
156 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
157 return Status::OK();
158 }
159
160 // Pushes a control indicator onto the IOBlockQueue for each worker to consume.
161 // When the worker pops this control indicator, it will shut itself down gracefully.
PostEndOfData()162 Status NonMappableLeafOp::PostEndOfData() {
163 for (int i = 0; i < num_workers_; ++i) {
164 std::unique_ptr<FilenameBlock> eof = std::make_unique<FilenameBlock>(IOBlock::kFlagEOF);
165 RETURN_IF_NOT_OK(PushIoBlockQueue(i, std::move(eof)));
166 }
167
168 return Status::OK();
169 }
170
171 // Pushes a control indicator onto the IOBlockQueue for each worker to consume. When the worker
172 // pops this control indicator, it will wait until the next epoch starts and then resume execution.
PostEndOfEpoch(int32_t queue_index)173 Status NonMappableLeafOp::PostEndOfEpoch(int32_t queue_index) {
174 for (int i = 0; i < num_workers_; ++i) {
175 std::unique_ptr<FilenameBlock> eoe = std::make_unique<FilenameBlock>(IOBlock::kFlagEOE);
176 RETURN_IF_NOT_OK(PushIoBlockQueue((queue_index + i) % num_workers_, std::move(eoe)));
177 }
178
179 return Status::OK();
180 }
181
182 // Notifies the thread which called WaitToFillIOBlockQueue to resume execution.
NotifyToFillIOBlockQueue()183 void NonMappableLeafOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); }
184
185 // Pops an element from a queue in io_block_queues
PopIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> * out_block)186 Status NonMappableLeafOp::PopIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> *out_block) {
187 RETURN_IF_NOT_OK(io_block_queues_[index]->PopFront(out_block));
188 return Status::OK();
189 }
190
191 // Pushes an element to a queue in io_block_queues
PushIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> && io_block)192 Status NonMappableLeafOp::PushIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> &&io_block) {
193 RETURN_IF_NOT_OK(io_block_queues_[index]->Add(std::move(io_block)));
194 return Status::OK();
195 }
196
197 // Overrides base class reset method. Cleans up any state info from it's previous execution and
198 // reinitializes itself so that it can be executed again, as if it was just created.
Reset()199 Status NonMappableLeafOp::Reset() {
200 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
201 curr_row_ = 0;
202 workers_done_ = 0;
203 // start workers first, otherwise IOBlocks will fall through if workers see it before this is set to true
204 {
205 std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
206 load_jagged_connector_ = true;
207 }
208
209 {
210 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
211 load_io_block_queue_ = true;
212 }
213
214 NotifyToFillIOBlockQueue();
215
216 return Status::OK();
217 }
218
NeedPushFileToBlockQueue(const std::string & file_name,int64_t * start_offset,int64_t * end_offset,const int64_t & pre_count)219 bool NonMappableLeafOp::NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset,
220 int64_t *end_offset, const int64_t &pre_count) {
221 *start_offset = 0;
222 *end_offset = 0;
223 bool push = false;
224 int64_t start_index = device_id_ * num_rows_per_shard_;
225 if (device_id_ + 1 < 0) {
226 MS_LOG(ERROR) << "Invalid device id, device id should be greater than or equal 0, but got "
227 << std::to_string(device_id_);
228 return false;
229 }
230
231 int64_t end_index = (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_;
232 if (pre_count <= start_index && pre_count + filename_numrows_[file_name] > start_index) {
233 *start_offset = start_index - pre_count;
234 push = true;
235 if (pre_count < end_index && pre_count + filename_numrows_[file_name] >= end_index) {
236 *end_offset = end_index - pre_count;
237 } else {
238 *end_offset = filename_numrows_[file_name];
239 }
240 }
241
242 if (pre_count >= start_index && pre_count < end_index) {
243 *start_offset = 0;
244 push = true;
245 if (pre_count + filename_numrows_[file_name] >= end_index) {
246 *end_offset = end_index - pre_count;
247 } else {
248 *end_offset = filename_numrows_[file_name];
249 }
250 }
251
252 return push;
253 }
254
ShuffleKeys()255 void NonMappableLeafOp::ShuffleKeys() {
256 std::mt19937 rng(num_devices_ == 1 ? GetSeed() : ++seed_);
257 std::shuffle(shuffled_keys_.begin(), shuffled_keys_.end(), rng);
258 }
259
WaitToFillIOBlockQueue()260 Status NonMappableLeafOp::WaitToFillIOBlockQueue() {
261 // must be called first if called by worker spanwed by taskgroup
262 TaskManager::FindMe()->Post();
263
264 while (true) {
265 RETURN_IF_NOT_OK(io_block_queue_wait_post_.Wait());
266 io_block_queue_wait_post_.Clear();
267
268 if (finished_reading_dataset_) {
269 break;
270 }
271
272 if (shuffle_files_) {
273 ShuffleKeys();
274 }
275 RETURN_IF_NOT_OK(FillIOBlockQueue(shuffled_keys_));
276 }
277 return Status::OK();
278 }
279
PrepareOperatorImplementation()280 Status NonMappableLeafOp::PrepareOperatorImplementation() {
281 if (shuffle_files_) {
282 for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
283 shuffled_keys_.push_back(it.key());
284 }
285 // Please note that this code is added for future use. Resetting dataset is only used in sink mode and pull mode
286 // doesn't support sink mode.
287 if (GlobalContext::config_manager()->fast_recovery() && op_current_repeats_ > 0) {
288 // in reset mode, shuffled_keys needs to be ordered in the resetting epoch
289 for (auto i = 0; i < op_current_repeats_; i++) {
290 ShuffleKeys();
291 }
292 }
293 }
294 return Status::OK();
295 }
296
PrepareOperator()297 Status NonMappableLeafOp::PrepareOperator() {
298 // Run any common code from super class first before adding our own
299 RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
300 return PrepareOperatorImplementation();
301 }
302
PrepareOperatorPullBased()303 Status NonMappableLeafOp::PrepareOperatorPullBased() {
304 // Run any common code from super class first before adding our own
305 RETURN_IF_NOT_OK(DatasetOp::PrepareOperatorPullBased());
306 return PrepareOperatorImplementation();
307 }
308
PrepareData()309 Status NonMappableLeafOp::PrepareData() {
310 RETURN_IF_NOT_OK(CalculateNumRowsPerShard());
311
312 // Put here to avoid register failed when Worker_Entry thread exits unexpected
313 RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
314
315 // launch one thread, responsible for filling IOBlockQueue
316 RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&NonMappableLeafOp::WaitToFillIOBlockQueue, this),
317 Name() + "::WaitToFillIOBlockQueue", id()));
318
319 // launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
320 // data from disk into TensorRows
321 RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
322
323 // must be called after launching workers. workers can't be spawned after this post,
324 // so workers have to be kept alive until the end of the program
325 TaskManager::FindMe()->Post();
326
327 NotifyToFillIOBlockQueue();
328
329 return Status::OK();
330 }
331
GetNextRowPullMode(TensorRow * const row)332 Status NonMappableLeafOp::GetNextRowPullMode(TensorRow *const row) {
333 RETURN_UNEXPECTED_IF_NULL(row);
334 row->reset();
335
336 // IOBlockQueue threads keep filling IOBlockQueue, and worker threads keep pulling files from IOBlockQueue, reading
337 // and then pushing tensors into jagged_rows_connector queue. This Preparation process is done asynchronously. Please
338 // note that even when num_parallel_workers is set to 1, there still be 3 async threads alive in the source op: 1
339 // main thread, 1 worker thread and 1 IOBlockQueue thread.
340 if (!prepared_data_) {
341 RETURN_IF_NOT_OK(PrepareData());
342 prepared_data_ = true;
343 }
344 if (finished_reading_dataset_) {
345 *row = TensorRow(TensorRow::kFlagEOF);
346 return Status::OK();
347 }
348 TensorRow new_row;
349 RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &new_row));
350 // Pull tensor from jagged_rows_connector queue. It has 4 cases:
351 // 1) If eoe signal reaches and all workers have finished reading, propagate eoe to the next op and do a self-reset.
352 // 2) If eoe signal reaches but not all the workers finishes reading, consume eoe and pull the next non-eoe tensor
353 // from the jagged_rows_connector queue.
354 // 3) If maximum count of rows to be read doesn't reach, returns the tensor data and increments curr_row_.
355 // 4) If maximum count of rows to be read reaches, notify IOBlockQueue thread and worker thread by setting
356 // load_jagged_connector_ and load_io_block_queue_ to false. Then, drain data from jagged_rows_connector queue
357 // until eoe is hit so that no data remains in all queues and they can be reset properly for the new iteration.
358 while (new_row.eoe()) {
359 workers_done_++;
360 if (static_cast<int32_t>(workers_done_) == num_workers_) {
361 RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
362 *row = TensorRow(TensorRow::kFlagEOE);
363 return Status::OK();
364 } else {
365 RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &new_row));
366 }
367 }
368
369 if (((compression_type_ == CompressionType::NONE || compression_type_ == CompressionType::GZIP_WITH_COUNT ||
370 compression_type_ == CompressionType::ZLIB_WITH_COUNT) &&
371 (total_rows_ == 0 || curr_row_ < total_rows_)) ||
372 ((compression_type_ == CompressionType::GZIP || compression_type_ == CompressionType::ZLIB) &&
373 (curr_row_ < total_rows_ * num_devices_))) {
374 curr_row_++;
375 } else {
376 {
377 std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
378 load_jagged_connector_ = false;
379 }
380 {
381 std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
382 load_io_block_queue_ = false;
383 }
384 // drain data in jagged_rows_connector_ until eoe is hit.
385 while (static_cast<int32_t>(workers_done_) < num_workers_) {
386 TensorRow next_row;
387 jagged_rows_connector_->Pop(0, &next_row);
388 if (next_row.eoe()) {
389 workers_done_++;
390 }
391 }
392 RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
393 new_row = TensorRow(TensorRow::kFlagEOE);
394 }
395 *row = std::move(new_row);
396 return Status::OK();
397 }
398
ResetAndUpdateRepeat()399 Status NonMappableLeafOp::ResetAndUpdateRepeat() {
400 if (IsLastIteration()) {
401 finished_reading_dataset_ = true;
402 NotifyToFillIOBlockQueue();
403 } else {
404 jagged_rows_connector_->DoReset();
405 // Self-reset to start a new iteration
406 RETURN_IF_NOT_OK(Reset());
407 }
408 UpdateRepeatAndEpochCounter();
409 return Status::OK();
410 }
411 } // namespace dataset
412 } // namespace mindspore
413