• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
17 
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/io_block.h"
20 #include "minddata/dataset/engine/execution_tree.h"
21 #include "minddata/dataset/engine/jagged_connector.h"
22 #include "minddata/dataset/util/random.h"
23 #include "minddata/dataset/util/status.h"
24 #include "minddata/dataset/util/task_manager.h"
25 #include "minddata/dataset/util/wait_post.h"
26 
27 namespace mindspore {
28 namespace dataset {
NonMappableLeafOp(int32_t num_workers,int32_t worker_connector_size,int64_t total_num_rows,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id,const CompressionType & compression_type)29 NonMappableLeafOp::NonMappableLeafOp(int32_t num_workers, int32_t worker_connector_size, int64_t total_num_rows,
30                                      int32_t op_connector_size, bool shuffle_files, int32_t num_devices,
31                                      int32_t device_id, const CompressionType &compression_type)
32     : ParallelOp(num_workers, op_connector_size),
33       device_id_(device_id),
34       num_devices_(num_devices),
35       load_jagged_connector_(true),
36       filename_index_(std::make_unique<StringIndex>()),
37       finished_reading_dataset_(false),
38       total_rows_(total_num_rows),
39       load_io_block_queue_(true),
40       shuffle_files_(shuffle_files),
41       num_rows_per_shard_(0),
42       compression_type_(compression_type),
43       num_rows_(0),
44       shuffled_keys_({}),
45       prepared_data_{false},
46       curr_row_{0},
47       workers_done_{0},
48       seed_(0) {
49   worker_connector_size_ = worker_connector_size;
50 }
51 
52 // Class functor operator () override.
53 // All dataset operators operate by launching a thread (see ExecutionTree). This class functor will
54 // provide the master loop that drives the logic for performing the work
operator ()()55 Status NonMappableLeafOp::operator()() {
56   RETURN_IF_NOT_OK(PrepareData());
57   while (!finished_reading_dataset_) {
58     int32_t workers_done = 0;
59     int64_t rows_read = 0;
60     {
61       std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
62       load_io_block_queue_ = true;
63     }
64 
65     while (workers_done < num_workers_) {
66       TensorRow fetched_row;
67       RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
68       if (fetched_row.eoe()) {
69         workers_done++;
70       } else if ((compression_type_ == CompressionType::NONE || compression_type_ == CompressionType::GZIP_WITH_COUNT ||
71                   compression_type_ == CompressionType::ZLIB_WITH_COUNT) &&
72                  (total_rows_ == 0 || rows_read < total_rows_)) {
73         // we need to push a row
74         RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
75         rows_read++;
76       } else if ((compression_type_ == CompressionType::GZIP || compression_type_ == CompressionType::ZLIB) &&
77                  (rows_read < total_rows_ * num_devices_)) {
78         // for compressed version, total_rows_ is total rows that will be read per shard
79         // we need to push a row
80         RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row)));
81         rows_read++;
82       } else {
83         // IOBlockQueue thread needs to:
84         // -stop pushing stuff to IOBlockQueue
85         // -call PostEndOfEpoch (will send EOE)
86         // -wait for reset
87         //
88         // Worker threads need to:
89         // -stop reading the file they are currently reading and throw it away
90         // -keep pulling, but dont read other files (eventually skips all IOBlocks and will get EOE)
91         //
92         // Master thread needs to:
93         // -tell IOBlockQueue thread to stop pushing
94         // -tell worker threads to stop reading the file they are currently reading
95         // -keep pulling until EOE
96 
97         // don't think we need a lock for now
98         {
99           std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
100           load_jagged_connector_ = false;
101         }
102         {
103           std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
104           load_io_block_queue_ = false;
105         }
106       }
107     }
108 
109     // all workers finished reading for this epoch, and we have read all the data from all workers
110     RETURN_IF_NOT_OK(out_connector_->SendEOE());
111 
112     RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
113   }
114 
115   RETURN_IF_NOT_OK(out_connector_->SendEOF());
116 
117   RETURN_IF_NOT_OK(PostEndOfData());
118 
119   return Status::OK();
120 }
121 
122 // The entry point for when workers are launched.
WorkerEntry(int32_t worker_id)123 Status NonMappableLeafOp::WorkerEntry(int32_t worker_id) {
124   // must be called first if called by worker spawned by taskgroup
125   TaskManager::FindMe()->Post();
126 
127   std::unique_ptr<FilenameBlock> io_block;
128   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
129   RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
130   RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
131   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
132 
133   while (!io_block->eof()) {
134     if (!io_block->eoe()) {
135       if (GetLoadJaggedConnector()) {
136         std::string filename;
137         RETURN_IF_NOT_OK(io_block->GetFilename(&filename, *filename_index_));
138         int64_t start_offset = io_block->GetStartOffset();
139         int64_t end_offset = io_block->GetEndOffset();
140         RETURN_IF_NOT_OK(LoadFile(filename, start_offset, end_offset, worker_id));
141         RETURN_IF_NOT_OK(
142           CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
143         MS_LOG(DEBUG) << Name() << " operator worker " << worker_id << " loaded file " << filename << ".";
144       }
145     } else {
146       TensorRow eoe = TensorRow(TensorRow::kFlagEOE);
147       RETURN_IF_NOT_OK(
148         CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
149       RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(eoe)));
150     }
151     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerGet"));
152     RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
153     RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerGet", {{"TensorRowFlags", io_block->FlagName()}}));
154     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WorkerProcess"));
155   }
156   RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WorkerProcess", {{"TensorRowFlags", io_block->FlagName()}}));
157   return Status::OK();
158 }
159 
160 // Pushes a control indicator onto the IOBlockQueue for each worker to consume.
161 // When the worker pops this control indicator, it will shut itself down gracefully.
PostEndOfData()162 Status NonMappableLeafOp::PostEndOfData() {
163   for (int i = 0; i < num_workers_; ++i) {
164     std::unique_ptr<FilenameBlock> eof = std::make_unique<FilenameBlock>(IOBlock::kFlagEOF);
165     RETURN_IF_NOT_OK(PushIoBlockQueue(i, std::move(eof)));
166   }
167 
168   return Status::OK();
169 }
170 
171 // Pushes a control indicator onto the IOBlockQueue for each worker to consume. When the worker
172 // pops this control indicator, it will wait until the next epoch starts and then resume execution.
PostEndOfEpoch(int32_t queue_index)173 Status NonMappableLeafOp::PostEndOfEpoch(int32_t queue_index) {
174   for (int i = 0; i < num_workers_; ++i) {
175     std::unique_ptr<FilenameBlock> eoe = std::make_unique<FilenameBlock>(IOBlock::kFlagEOE);
176     RETURN_IF_NOT_OK(PushIoBlockQueue((queue_index + i) % num_workers_, std::move(eoe)));
177   }
178 
179   return Status::OK();
180 }
181 
182 // Notifies the thread which called WaitToFillIOBlockQueue to resume execution.
NotifyToFillIOBlockQueue()183 void NonMappableLeafOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); }
184 
185 // Pops an element from a queue in io_block_queues
PopIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> * out_block)186 Status NonMappableLeafOp::PopIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> *out_block) {
187   RETURN_IF_NOT_OK(io_block_queues_[index]->PopFront(out_block));
188   return Status::OK();
189 }
190 
191 // Pushes an element to a queue in io_block_queues
PushIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> && io_block)192 Status NonMappableLeafOp::PushIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> &&io_block) {
193   RETURN_IF_NOT_OK(io_block_queues_[index]->Add(std::move(io_block)));
194   return Status::OK();
195 }
196 
197 // Overrides base class reset method. Cleans up any state info from it's previous execution and
198 // reinitializes itself so that it can be executed again, as if it was just created.
Reset()199 Status NonMappableLeafOp::Reset() {
200   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
201   curr_row_ = 0;
202   workers_done_ = 0;
203   // start workers first, otherwise IOBlocks will fall through if workers see it before this is set to true
204   {
205     std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
206     load_jagged_connector_ = true;
207   }
208 
209   {
210     std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
211     load_io_block_queue_ = true;
212   }
213 
214   NotifyToFillIOBlockQueue();
215 
216   return Status::OK();
217 }
218 
NeedPushFileToBlockQueue(const std::string & file_name,int64_t * start_offset,int64_t * end_offset,const int64_t & pre_count)219 bool NonMappableLeafOp::NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset,
220                                                  int64_t *end_offset, const int64_t &pre_count) {
221   *start_offset = 0;
222   *end_offset = 0;
223   bool push = false;
224   int64_t start_index = device_id_ * num_rows_per_shard_;
225   if (device_id_ + 1 < 0) {
226     MS_LOG(ERROR) << "Invalid device id, device id should be greater than or equal 0, but got "
227                   << std::to_string(device_id_);
228     return false;
229   }
230 
231   int64_t end_index = (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_;
232   if (pre_count <= start_index && pre_count + filename_numrows_[file_name] > start_index) {
233     *start_offset = start_index - pre_count;
234     push = true;
235     if (pre_count < end_index && pre_count + filename_numrows_[file_name] >= end_index) {
236       *end_offset = end_index - pre_count;
237     } else {
238       *end_offset = filename_numrows_[file_name];
239     }
240   }
241 
242   if (pre_count >= start_index && pre_count < end_index) {
243     *start_offset = 0;
244     push = true;
245     if (pre_count + filename_numrows_[file_name] >= end_index) {
246       *end_offset = end_index - pre_count;
247     } else {
248       *end_offset = filename_numrows_[file_name];
249     }
250   }
251 
252   return push;
253 }
254 
ShuffleKeys()255 void NonMappableLeafOp::ShuffleKeys() {
256   std::mt19937 rng(num_devices_ == 1 ? GetSeed() : ++seed_);
257   std::shuffle(shuffled_keys_.begin(), shuffled_keys_.end(), rng);
258 }
259 
WaitToFillIOBlockQueue()260 Status NonMappableLeafOp::WaitToFillIOBlockQueue() {
261   // must be called first if called by worker spanwed by taskgroup
262   TaskManager::FindMe()->Post();
263 
264   while (true) {
265     RETURN_IF_NOT_OK(io_block_queue_wait_post_.Wait());
266     io_block_queue_wait_post_.Clear();
267 
268     if (finished_reading_dataset_) {
269       break;
270     }
271 
272     if (shuffle_files_) {
273       ShuffleKeys();
274     }
275     RETURN_IF_NOT_OK(FillIOBlockQueue(shuffled_keys_));
276   }
277   return Status::OK();
278 }
279 
PrepareOperatorImplementation()280 Status NonMappableLeafOp::PrepareOperatorImplementation() {
281   if (shuffle_files_) {
282     for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
283       shuffled_keys_.push_back(it.key());
284     }
285     // Please note that this code is added for future use. Resetting dataset is only used in sink mode and pull mode
286     // doesn't support sink mode.
287     if (GlobalContext::config_manager()->fast_recovery() && op_current_repeats_ > 0) {
288       // in reset mode, shuffled_keys needs to be ordered in the resetting epoch
289       for (auto i = 0; i < op_current_repeats_; i++) {
290         ShuffleKeys();
291       }
292     }
293   }
294   return Status::OK();
295 }
296 
PrepareOperator()297 Status NonMappableLeafOp::PrepareOperator() {
298   // Run any common code from super class first before adding our own
299   RETURN_IF_NOT_OK(DatasetOp::PrepareOperator());
300   return PrepareOperatorImplementation();
301 }
302 
PrepareOperatorPullBased()303 Status NonMappableLeafOp::PrepareOperatorPullBased() {
304   // Run any common code from super class first before adding our own
305   RETURN_IF_NOT_OK(DatasetOp::PrepareOperatorPullBased());
306   return PrepareOperatorImplementation();
307 }
308 
PrepareData()309 Status NonMappableLeafOp::PrepareData() {
310   RETURN_IF_NOT_OK(CalculateNumRowsPerShard());
311 
312   // Put here to avoid register failed when Worker_Entry thread exits unexpected
313   RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
314 
315   // launch one thread, responsible for filling IOBlockQueue
316   RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&NonMappableLeafOp::WaitToFillIOBlockQueue, this),
317                                         Name() + "::WaitToFillIOBlockQueue", id()));
318 
319   // launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
320   // data from disk into TensorRows
321   RETURN_IF_NOT_OK(RegisterAndLaunchThreads());
322 
323   // must be called after launching workers. workers can't be spawned after this post,
324   // so workers have to be kept alive until the end of the program
325   TaskManager::FindMe()->Post();
326 
327   NotifyToFillIOBlockQueue();
328 
329   return Status::OK();
330 }
331 
GetNextRowPullMode(TensorRow * const row)332 Status NonMappableLeafOp::GetNextRowPullMode(TensorRow *const row) {
333   RETURN_UNEXPECTED_IF_NULL(row);
334   row->reset();
335 
336   // IOBlockQueue threads keep filling IOBlockQueue, and worker threads keep pulling files from IOBlockQueue, reading
337   // and then pushing tensors into jagged_rows_connector queue. This Preparation process is done asynchronously. Please
338   // note that even when num_parallel_workers is set to 1, there still be 3 async threads alive in the source op: 1
339   // main thread, 1 worker thread and 1 IOBlockQueue thread.
340   if (!prepared_data_) {
341     RETURN_IF_NOT_OK(PrepareData());
342     prepared_data_ = true;
343   }
344   if (finished_reading_dataset_) {
345     *row = TensorRow(TensorRow::kFlagEOF);
346     return Status::OK();
347   }
348   TensorRow new_row;
349   RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &new_row));
350   // Pull tensor from jagged_rows_connector queue. It has 4 cases:
351   // 1) If eoe signal reaches and all workers have finished reading, propagate eoe to the next op and do a self-reset.
352   // 2) If eoe signal reaches but not all the workers finishes reading, consume eoe and pull the next non-eoe tensor
353   //    from the jagged_rows_connector queue.
354   // 3) If maximum count of rows to be read doesn't reach, returns the tensor data and increments curr_row_.
355   // 4) If maximum count of rows to be read reaches, notify IOBlockQueue thread and worker thread by setting
356   //    load_jagged_connector_ and load_io_block_queue_ to false. Then, drain data from jagged_rows_connector queue
357   //    until eoe is hit so that no data remains in all queues and they can be reset properly for the new iteration.
358   while (new_row.eoe()) {
359     workers_done_++;
360     if (static_cast<int32_t>(workers_done_) == num_workers_) {
361       RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
362       *row = TensorRow(TensorRow::kFlagEOE);
363       return Status::OK();
364     } else {
365       RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &new_row));
366     }
367   }
368 
369   if (((compression_type_ == CompressionType::NONE || compression_type_ == CompressionType::GZIP_WITH_COUNT ||
370         compression_type_ == CompressionType::ZLIB_WITH_COUNT) &&
371        (total_rows_ == 0 || curr_row_ < total_rows_)) ||
372       ((compression_type_ == CompressionType::GZIP || compression_type_ == CompressionType::ZLIB) &&
373        (curr_row_ < total_rows_ * num_devices_))) {
374     curr_row_++;
375   } else {
376     {
377       std::unique_lock<std::mutex> lock(load_jagged_connector_mutex_);
378       load_jagged_connector_ = false;
379     }
380     {
381       std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
382       load_io_block_queue_ = false;
383     }
384     // drain data in jagged_rows_connector_ until eoe is hit.
385     while (static_cast<int32_t>(workers_done_) < num_workers_) {
386       TensorRow next_row;
387       jagged_rows_connector_->Pop(0, &next_row);
388       if (next_row.eoe()) {
389         workers_done_++;
390       }
391     }
392     RETURN_IF_NOT_OK(ResetAndUpdateRepeat());
393     new_row = TensorRow(TensorRow::kFlagEOE);
394   }
395   *row = std::move(new_row);
396   return Status::OK();
397 }
398 
ResetAndUpdateRepeat()399 Status NonMappableLeafOp::ResetAndUpdateRepeat() {
400   if (IsLastIteration()) {
401     finished_reading_dataset_ = true;
402     NotifyToFillIOBlockQueue();
403   } else {
404     jagged_rows_connector_->DoReset();
405     // Self-reset to start a new iteration
406     RETURN_IF_NOT_OK(Reset());
407   }
408   UpdateRepeatAndEpochCounter();
409   return Status::OK();
410 }
411 }  // namespace dataset
412 }  // namespace mindspore
413