1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #if defined(_WIN32) || defined(_WIN64)
17 #include <stdlib.h>
18 #endif
19 #include <chrono>
20 #include <iomanip>
21 #include <iostream>
22 #include <random>
23 #include <utility>
24
25 #include "minddata/dataset/core/config_manager.h"
26 #include "minddata/dataset/engine/datasetops/shuffle_op.h"
27 #include "minddata/dataset/engine/dataset_iterator.h"
28
29 #include "minddata/dataset/engine/db_connector.h"
30 #include "minddata/dataset/util/log_adapter.h"
31 #include "minddata/dataset/util/random.h"
32 #include "minddata/dataset/util/status.h"
33
34 namespace mindspore {
35 namespace dataset {
36 constexpr int32_t ShuffleOp::kShuffleStateInit;
37 constexpr int32_t ShuffleOp::kShuffleStateActive;
38 constexpr int32_t ShuffleOp::kShuffleStateDrain;
39
40 // Constructor of the ShuffleOp
ShuffleOp(int32_t shuffle_size,uint32_t shuffle_seed,int32_t op_connector_size,bool reset_every_epoch)41 ShuffleOp::ShuffleOp(int32_t shuffle_size, uint32_t shuffle_seed, int32_t op_connector_size, bool reset_every_epoch)
42 : PipelineOp(op_connector_size),
43 shuffle_size_(shuffle_size),
44 shuffle_seed_(shuffle_seed),
45 reshuffle_each_epoch_(reset_every_epoch),
46 rng_(shuffle_seed),
47 shuffle_buffer_(std::make_unique<TensorTable>()),
48 shuffle_last_row_idx_(0),
49 shuffle_buffer_state_(kShuffleStateInit) {}
50
51 // Private function to re-init the shuffle op for another epoch. Shuffle op calls this by
52 // itself rather than waiting for the reset driven from operators above it in the pipeline.
SelfReset()53 Status ShuffleOp::SelfReset() {
54 MS_LOG(DEBUG) << "Shuffle operator performing a self-reset.";
55 // If reshuffle_each_epoch is false, then we always use the same seed for every
56 // epoch.
57 // If reshuffle_each_epoch is true, then the first epoch uses the given seed,
58 // and all subsequent epochs will then keep on using the rng_ without resetting it
59 if (!reshuffle_each_epoch_) {
60 rng_ = std::mt19937_64(shuffle_seed_);
61 }
62
63 shuffle_buffer_ = std::make_unique<TensorTable>();
64 shuffle_last_row_idx_ = 0;
65 shuffle_buffer_state_ = kShuffleStateInit;
66 return Status::OK();
67 }
68
69 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const70 void ShuffleOp::Print(std::ostream &out, bool show_all) const {
71 if (!show_all) {
72 // Call the super class for displaying any common 1-liner info
73 PipelineOp::Print(out, show_all);
74 // Then show any custom derived-internal 1-liner info for this op
75 out << " [shuffle size: " << shuffle_size_ << "]\n";
76 } else {
77 // Call the super class for displaying any common detailed info
78 PipelineOp::Print(out, show_all);
79 // Then show any custom derived-internal stuff
80 out << "\nShuffle size: " << shuffle_size_ << "\nShuffle buffer state: " << shuffle_buffer_state_
81 << "\nShuffle seed: " << shuffle_seed_ << "\n\n";
82 }
83 }
84
85 // Private function to add a new row to the shuffle buffer.
AddRowToShuffleBuffer(TensorRow new_shuffle_row)86 Status ShuffleOp::AddRowToShuffleBuffer(TensorRow new_shuffle_row) {
87 // If the last slot of our shuffle buffer was not the full size of the shuffle buffer then we are
88 // filling it during the initial fill codepath and thus growing it's size. In that case, we push
89 // back the new row to grow our shuffle buffer size by 1.
90 // If we are already at the full size, then we overwrite the last slot with our row (and the last
91 // slot better be empty because it should already have been swapped out during the random row
92 // selection that was done previously!)
93 if (shuffle_last_row_idx_ < (shuffle_size_ - 1)) {
94 shuffle_buffer_->push_back(std::move(new_shuffle_row));
95 shuffle_last_row_idx_ = (shuffle_buffer_->size()) - 1;
96 } else {
97 if (!(*shuffle_buffer_)[shuffle_last_row_idx_].empty()) {
98 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
99 "[Internal ERROR] Last row of shuffle buffer should not be occupied!");
100 }
101 (*shuffle_buffer_)[shuffle_last_row_idx_] = std::move(new_shuffle_row);
102 }
103 return Status::OK();
104 }
105
106 // Class functor operator () override.
107 // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
108 // provide the master loop that drives the logic for performing the work
operator ()()109 Status ShuffleOp::operator()() {
110 std::unique_ptr<TensorQTable> new_buffer_table; // A tensor table to be used for output.
111
112 // Synchronize with TaskManager once the thread is launched.
113 TaskManager::FindMe()->Post();
114
115 // Shuffle op does not have workers, and only consumes from child 0.
116 // Create the child iterator to fetch our data from.
117 int32_t worker_id = 0;
118 int32_t child_idx = 0;
119 child_iterator_ = std::make_unique<ChildIterator>(this, worker_id, child_idx);
120
121 // Main operator loop
122 while (true) {
123 // Do an initial populate of the shuffle buffer
124 RETURN_IF_NOT_OK(InitShuffleBuffer());
125
126 // This is our main loop exit condition, when the iterator has no more data completely.
127 if (child_iterator_->EofHandled()) {
128 RETURN_IF_NOT_OK(out_connector_->SendEOF());
129 break;
130 }
131
132 // Next, enter into the main execution loop of the shuffle op.
133 // When the tail index position of our shuffle buffer goes negative it means that we've
134 // fully drained the data from the shuffle buffer and we're done.
135 while (shuffle_last_row_idx_ >= 0) {
136 // Step 1)
137 // Create an output tensor table if one is not created yet.
138 if (!new_buffer_table) {
139 new_buffer_table = std::make_unique<TensorQTable>();
140 }
141
142 // Step 2)
143 // Randomly select a slot from our shuffle buffer and copy that row into the output
144 // tensor table. We remove the data from the shuffle buffer, leaving that slot
145 // in the table as an empty vector
146 int64_t random_slot = rng_() % (shuffle_last_row_idx_ + 1);
147 TensorRow random_row = std::move((*shuffle_buffer_)[random_slot]);
148 MS_LOG(DEBUG) << "Shuffle operator sending a row to output.";
149 RETURN_IF_NOT_OK(out_connector_->Add(std::move(random_row)));
150
151 // Step 3)
152 // Take the last row from shuffle buffer, and swap it into the row position that was
153 // just vacated. This makes the shuffle buffer contiguous, with an empty slot at the
154 // tail of the shuffle buffer.
155 if (random_slot != shuffle_last_row_idx_) {
156 (*shuffle_buffer_)[random_slot] = std::move((*shuffle_buffer_)[shuffle_last_row_idx_]);
157 }
158
159 // Step 4)
160 // Refill the last slot of the shuffle buffer with the next row from input if we are in the
161 // active state.
162 // If we are in the draining state, we do not need to fetch another row to replace the one we
163 // just drained.
164 if (shuffle_buffer_state_ == kShuffleStateActive) {
165 TensorRow new_row;
166 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
167
168 if (!new_row.empty()) {
169 RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
170 } else {
171 shuffle_buffer_state_ = kShuffleStateDrain;
172 }
173 }
174
175 // If we are draining, reposition (decrement) our tail index in the shuffle buffer since we
176 // just drained a row from it.
177 if (shuffle_buffer_state_ == kShuffleStateDrain) {
178 shuffle_last_row_idx_--;
179 }
180 }
181
182 // Since we overloaded eoeReceived function, we are responsible to flow the EOE up the
183 // pipeline manually now that we are done draining the shuffle buffer
184 MS_LOG(DEBUG) << "Shuffle operator sending EOE.";
185 RETURN_IF_NOT_OK(out_connector_->SendEOE());
186
187 // Do not wait for any reset to be flown down from operators above us.
188 // Instead, manually update ourselves and then go reloop to start fetching from child operator
189 // right away. Any Reset() from the parent will still perform common reset actions.
190 RETURN_IF_NOT_OK(this->SelfReset());
191 }
192
193 return Status::OK();
194 }
195
196 // Private function populate the shuffle buffer initially by fetching from the child output
197 // connector until the shuffle buffer is full (or there is no more data coming).
InitShuffleBuffer()198 Status ShuffleOp::InitShuffleBuffer() {
199 MS_LOG(DEBUG) << "Shuffle operator initializing the shuffle buffer.";
200
201 // The first phase of this operator is to read incoming buffers and then drain those
202 // rows from the buffers, putting them into our own local table of tensors (the shuffle
203 // buffer).
204 // This shuffle buffer initialization phase stops when we've either filled up the
205 // shuffle buffer to it's max size, or the dataset below us is not providing any more
206 // rows.
207 if (shuffle_buffer_state_ != kShuffleStateInit) {
208 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
209 "Invalid shuffle buffer state, shuffle buffer should be init first or reset after each epoch.");
210 }
211
212 // Before we drop into the fetching loop, call the fetch once for the first time
213 // to fill the first row and grab the first buffer.
214 TensorRow new_row;
215 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
216
217 if (child_iterator_->EofHandled()) {
218 MS_LOG(DEBUG) << "Shuffle operator init picked up EOF. No more epochs.";
219 RETURN_IF_NOT_OK(out_connector_->SendEOF());
220 return Status::OK();
221 }
222
223 if (new_row.empty()) {
224 RETURN_STATUS_UNEXPECTED("Invalid data, unable to fetch a single row for shuffle buffer.");
225 }
226
227 // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached
228 // the desired shuffle buffer size.
229 while (!new_row.empty() && shuffle_buffer_->size() < static_cast<size_t>(shuffle_size_ - 1)) {
230 // Add the previously fetched row
231 RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
232
233 // Fetch the next row
234 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
235 }
236
237 // If we quit the loop due to being at the shuffle size, still need to add the last row here.
238 if (!new_row.empty()) {
239 RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
240 shuffle_buffer_state_ = kShuffleStateActive; // Transition to the active state
241 } else {
242 // If init phase doesn't have more rows, then skip the active state and jump straight to the
243 // shuffle buffer draining state
244 shuffle_buffer_state_ = kShuffleStateDrain;
245 }
246
247 MS_LOG(DEBUG) << "Shuffle operator finished initializing the shuffle buffer.";
248 return Status::OK();
249 }
250
EoeReceived(int32_t worker_id)251 Status ShuffleOp::EoeReceived(int32_t worker_id) {
252 state_ = OpState::kDeOpIdle;
253 return Status::OK();
254 }
255 } // namespace dataset
256 } // namespace mindspore
257