• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
17 
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/datasetops/source/io_block.h"
20 #include "minddata/dataset/engine/db_connector.h"
21 #include "minddata/dataset/engine/execution_tree.h"
22 #include "minddata/dataset/engine/jagged_connector.h"
23 #include "minddata/dataset/util/random.h"
24 #include "minddata/dataset/util/status.h"
25 #include "minddata/dataset/util/task_manager.h"
26 #include "minddata/dataset/util/wait_post.h"
27 
28 namespace mindspore {
29 namespace dataset {
NonMappableLeafOp(int32_t num_workers,int32_t worker_connector_size,int64_t total_num_rows,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id)30 NonMappableLeafOp::NonMappableLeafOp(int32_t num_workers, int32_t worker_connector_size, int64_t total_num_rows,
31                                      int32_t op_connector_size, bool shuffle_files, int32_t num_devices,
32                                      int32_t device_id)
33     : ParallelOp(num_workers, op_connector_size),
34       device_id_(device_id),
35       num_devices_(num_devices),
36       load_jagged_connector_(true),
37       filename_index_(std::make_unique<StringIndex>()),
38       finished_reading_dataset_(false),
39       total_rows_(total_num_rows),
40       load_io_block_queue_(true),
41       shuffle_files_(shuffle_files),
42       num_rows_per_shard_(0),
43       num_rows_(0) {
44   worker_connector_size_ = worker_connector_size;
45 }
46 
47 // Class functor operator () override.
48 // All dataset operators operate by launching a thread (see ExecutionTree). This class functor will
49 // provide the master loop that drives the logic for performing the work
operator ()()50 Status NonMappableLeafOp::operator()() {
51   RETURN_IF_NOT_OK(CalculateNumRowsPerShard());
52 
53   // Put here to avoid register failed when Worker_Entry thread exits unexpected
54   RETURN_IF_NOT_OK(io_block_queue_wait_post_.Register(tree_->AllTasks()));
55 
56   // launch one thread, responsible for filling mIOBlockQueue
57   RETURN_IF_NOT_OK(tree_->LaunchWorkers(1, std::bind(&NonMappableLeafOp::WaitToFillIOBlockQueue, this), "", id()));
58 
59   // launch num_workers_ worker threads, responsible for pulling from the IOBlockQueue and reading
60   // data from disk into TensorRows
61   RETURN_IF_NOT_OK(tree_->LaunchWorkers(
62     num_workers_, std::bind(&NonMappableLeafOp::WorkerEntry, this, std::placeholders::_1), "", id()));
63 
64   // must be called after launching workers. workers can't be spawned after this post,
65   // so workers have to be kept alive until the end of the program
66   TaskManager::FindMe()->Post();
67 
68   NotifyToFillIOBlockQueue();
69   while (!finished_reading_dataset_) {
70     int32_t workers_done = 0;
71     int64_t rows_read = 0;
72     {
73       std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
74       load_io_block_queue_ = true;
75     }
76 
77     while (workers_done < num_workers_) {
78       TensorRow fetched_row;
79       RETURN_IF_NOT_OK(jagged_rows_connector_->Pop(0, &fetched_row));
80       if (fetched_row.eoe()) {
81         workers_done++;
82       } else if (total_rows_ == 0 || rows_read < total_rows_) {
83         // we need to push a row
84         RETURN_IF_NOT_OK(out_connector_->Add(std::move(fetched_row), 0));
85         rows_read++;
86       } else {
87         // IOBlockQueue thread needs to:
88         // -stop pushing stuff to IOBlockQueue
89         // -call PostEndOfEpoch (will send EOE)
90         // -wait for reset
91         //
92         // Worker threads need to:
93         // -stop reading the file they are currently reading and throw it away
94         // -keep pulling, but dont read other files (eventually skips all IOBlocks and will get EOE)
95         //
96         // Master thread needs to:
97         // -tell IOBlockQueue thread to stop pushing
98         // -tell worker threads to stop reading the file tey are currently reading
99         // -keep pulling until EOE
100 
101         // don't think we need a lock for now
102         load_jagged_connector_ = false;
103 
104         std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
105         load_io_block_queue_ = false;
106       }
107     }
108 
109     // all workers finished reading for this epoch, and we have read all the data from all workers
110     RETURN_IF_NOT_OK(out_connector_->SendEOE());
111 
112     if (IsLastIteration()) {
113       finished_reading_dataset_ = true;
114       NotifyToFillIOBlockQueue();
115     } else {
116       jagged_rows_connector_->DoReset();
117       // Self-reset to start a new iteration
118       RETURN_IF_NOT_OK(Reset());
119     }
120     UpdateRepeatAndEpochCounter();
121   }
122 
123   RETURN_IF_NOT_OK(out_connector_->SendEOF());
124 
125   RETURN_IF_NOT_OK(PostEndOfData());
126 
127   return Status::OK();
128 }
129 
130 // The entry point for when workers are launched.
WorkerEntry(int32_t worker_id)131 Status NonMappableLeafOp::WorkerEntry(int32_t worker_id) {
132   // must be called first if called by worker spawned by taskgroup
133   TaskManager::FindMe()->Post();
134 
135   std::unique_ptr<FilenameBlock> io_block;
136   RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
137 
138   while (!io_block->eof()) {
139     if (!io_block->eoe()) {
140       if (load_jagged_connector_) {
141         std::string filename;
142         RETURN_IF_NOT_OK(io_block->GetFilename(&filename, *filename_index_));
143         int64_t start_offset = io_block->GetStartOffset();
144         int64_t end_offset = io_block->GetEndOffset();
145         RETURN_IF_NOT_OK(LoadFile(filename, start_offset, end_offset, worker_id));
146         MS_LOG(DEBUG) << Name() << " operator worker " << worker_id << " loaded file " << filename << ".";
147       }
148     } else {
149       TensorRow eoe = TensorRow(TensorRow::kFlagEOE);
150       RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(eoe)));
151     }
152 
153     RETURN_IF_NOT_OK(PopIoBlockQueue(worker_id, &io_block));
154   }
155 
156   return Status::OK();
157 }
158 
159 // Pushes a control indicator onto the IOBlockQueue for each worker to consume.
160 // When the worker pops this control indicator, it will shut itself down gracefully.
PostEndOfData()161 Status NonMappableLeafOp::PostEndOfData() {
162   for (int i = 0; i < num_workers_; ++i) {
163     std::unique_ptr<FilenameBlock> eof = std::make_unique<FilenameBlock>(IOBlock::kDeIoBlockFlagEof);
164     RETURN_IF_NOT_OK(PushIoBlockQueue(i, std::move(eof)));
165   }
166 
167   return Status::OK();
168 }
169 
170 // Pushes a control indicator onto the IOBlockQueue for each worker to consume. When the worker
171 // pops this control indicator, it will wait until the next epoch starts and then resume execution.
PostEndOfEpoch(int32_t queue_index)172 Status NonMappableLeafOp::PostEndOfEpoch(int32_t queue_index) {
173   for (int i = 0; i < num_workers_; ++i) {
174     std::unique_ptr<FilenameBlock> eoe = std::make_unique<FilenameBlock>(IOBlock::kDeIoBlockFlagEoe);
175     RETURN_IF_NOT_OK(PushIoBlockQueue((queue_index + i) % num_workers_, std::move(eoe)));
176   }
177 
178   return Status::OK();
179 }
180 
181 // Notifies the thread which called WaitToFillIOBlockQueue to resume execution.
NotifyToFillIOBlockQueue()182 void NonMappableLeafOp::NotifyToFillIOBlockQueue() { io_block_queue_wait_post_.Set(); }
183 
184 // Pops an element from a queue in io_block_queues
PopIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> * out_block)185 Status NonMappableLeafOp::PopIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> *out_block) {
186   RETURN_IF_NOT_OK(io_block_queues_[index]->PopFront(out_block));
187   return Status::OK();
188 }
189 
190 // Pushes an element to a queue in io_block_queues
PushIoBlockQueue(int32_t index,std::unique_ptr<FilenameBlock> && io_block)191 Status NonMappableLeafOp::PushIoBlockQueue(int32_t index, std::unique_ptr<FilenameBlock> &&io_block) {
192   RETURN_IF_NOT_OK(io_block_queues_[index]->Add(std::move(io_block)));
193   return Status::OK();
194 }
195 
196 // Overrides base class reset method. Cleans up any state info from it's previous execution and
197 // reinitializes itself so that it can be executed again, as if it was just created.
Reset()198 Status NonMappableLeafOp::Reset() {
199   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
200   // start workers first, otherwise IOBlocks will fall through if workers see it before this is set to true
201   load_jagged_connector_ = true;
202 
203   {
204     std::unique_lock<std::mutex> lock(load_io_block_queue_mutex_);
205     load_io_block_queue_ = true;
206   }
207 
208   RETURN_IF_NOT_OK(ParallelOp::Reset());
209   NotifyToFillIOBlockQueue();
210 
211   return Status::OK();
212 }
213 
NeedPushFileToBlockQueue(const std::string & file_name,int64_t * start_offset,int64_t * end_offset,const int64_t & pre_count)214 bool NonMappableLeafOp::NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset,
215                                                  int64_t *end_offset, const int64_t &pre_count) {
216   *start_offset = 0;
217   *end_offset = 0;
218   bool push = false;
219   int64_t start_index = device_id_ * num_rows_per_shard_;
220   if (device_id_ + 1 < 0) {
221     MS_LOG(ERROR) << "Device id is invalid, got " + std::to_string(device_id_);
222     return false;
223   }
224 
225   int64_t end_index = (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_;
226   if (pre_count <= start_index && pre_count + filename_numrows_[file_name] > start_index) {
227     *start_offset = start_index - pre_count;
228     push = true;
229     if (pre_count < end_index && pre_count + filename_numrows_[file_name] >= end_index) {
230       *end_offset = end_index - pre_count;
231     } else {
232       *end_offset = filename_numrows_[file_name];
233     }
234   }
235 
236   if (pre_count >= start_index && pre_count < end_index) {
237     *start_offset = 0;
238     push = true;
239     if (pre_count + filename_numrows_[file_name] >= end_index) {
240       *end_offset = end_index - pre_count;
241     } else {
242       *end_offset = filename_numrows_[file_name];
243     }
244   }
245 
246   return push;
247 }
248 
ShuffleKeys(std::vector<int64_t> * i_keys,uint32_t seed)249 void NonMappableLeafOp::ShuffleKeys(std::vector<int64_t> *i_keys, uint32_t seed) {
250   std::mt19937 rng(seed);
251   std::shuffle(i_keys->begin(), i_keys->end(), rng);
252 }
253 
WaitToFillIOBlockQueue()254 Status NonMappableLeafOp::WaitToFillIOBlockQueue() {
255   // must be called first if called by worker spanwed by taskgroup
256   TaskManager::FindMe()->Post();
257 
258   std::vector<int64_t> i_keys;
259   if (shuffle_files_) {
260     for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
261       i_keys.push_back(it.key());
262     }
263   }
264   uint32_t seed = 0;
265   while (true) {
266     RETURN_IF_NOT_OK(io_block_queue_wait_post_.Wait());
267     io_block_queue_wait_post_.Clear();
268 
269     if (finished_reading_dataset_) {
270       break;
271     }
272 
273     if (shuffle_files_) {
274       ShuffleKeys(&i_keys, num_devices_ == 1 ? GetSeed() : ++seed);
275     }
276     RETURN_IF_NOT_OK(FillIOBlockQueue(i_keys));
277   }
278   return Status::OK();
279 }
280 }  // namespace dataset
281 }  // namespace mindspore
282