• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #if defined(_WIN32) || defined(_WIN64)
17 #include <stdlib.h>
18 #endif
19 #include <chrono>
20 #include <iomanip>
21 #include <iostream>
22 #include <random>
23 #include <utility>
24 
25 #include "minddata/dataset/core/config_manager.h"
26 #include "minddata/dataset/engine/datasetops/shuffle_op.h"
27 #include "minddata/dataset/engine/dataset_iterator.h"
28 
29 #include "minddata/dataset/engine/db_connector.h"
30 #include "minddata/dataset/util/log_adapter.h"
31 #include "minddata/dataset/util/random.h"
32 #include "minddata/dataset/util/status.h"
33 
34 namespace mindspore {
35 namespace dataset {
36 constexpr int32_t ShuffleOp::kShuffleStateInit;
37 constexpr int32_t ShuffleOp::kShuffleStateActive;
38 constexpr int32_t ShuffleOp::kShuffleStateDrain;
39 
40 // Constructor of the ShuffleOp
ShuffleOp(int32_t shuffle_size,uint32_t shuffle_seed,int32_t op_connector_size,bool reset_every_epoch)41 ShuffleOp::ShuffleOp(int32_t shuffle_size, uint32_t shuffle_seed, int32_t op_connector_size, bool reset_every_epoch)
42     : PipelineOp(op_connector_size),
43       shuffle_size_(shuffle_size),
44       shuffle_seed_(shuffle_seed),
45       reshuffle_each_epoch_(reset_every_epoch),
46       rng_(shuffle_seed),
47       shuffle_buffer_(std::make_unique<TensorTable>()),
48       shuffle_last_row_idx_(0),
49       shuffle_buffer_state_(kShuffleStateInit) {}
50 
51 // Private function to re-init the shuffle op for another epoch.  Shuffle op calls this by
52 // itself rather than waiting for the reset driven from operators above it in the pipeline.
SelfReset()53 Status ShuffleOp::SelfReset() {
54   MS_LOG(DEBUG) << "Shuffle operator performing a self-reset.";
55   // If reshuffle_each_epoch is false, then we always use the same seed for every
56   // epoch.
57   // If reshuffle_each_epoch is true, then the first epoch uses the given seed,
58   // and all subsequent epochs will then keep on using the rng_ without resetting it
59   if (!reshuffle_each_epoch_) {
60     rng_ = std::mt19937_64(shuffle_seed_);
61   }
62 
63   shuffle_buffer_ = std::make_unique<TensorTable>();
64   shuffle_last_row_idx_ = 0;
65   shuffle_buffer_state_ = kShuffleStateInit;
66   return Status::OK();
67 }
68 
69 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const70 void ShuffleOp::Print(std::ostream &out, bool show_all) const {
71   if (!show_all) {
72     // Call the super class for displaying any common 1-liner info
73     PipelineOp::Print(out, show_all);
74     // Then show any custom derived-internal 1-liner info for this op
75     out << " [shuffle size: " << shuffle_size_ << "]\n";
76   } else {
77     // Call the super class for displaying any common detailed info
78     PipelineOp::Print(out, show_all);
79     // Then show any custom derived-internal stuff
80     out << "\nShuffle size: " << shuffle_size_ << "\nShuffle buffer state: " << shuffle_buffer_state_
81         << "\nShuffle seed: " << shuffle_seed_ << "\n\n";
82   }
83 }
84 
85 // Private function to add a new row to the shuffle buffer.
AddRowToShuffleBuffer(TensorRow new_shuffle_row)86 Status ShuffleOp::AddRowToShuffleBuffer(TensorRow new_shuffle_row) {
87   // If the last slot of our shuffle buffer was not the full size of the shuffle buffer then we are
88   // filling it during the initial fill codepath and thus growing it's size. In that case, we push
89   // back the new row to grow our shuffle buffer size by 1.
90   // If we are already at the full size, then we overwrite the last slot with our row (and the last
91   // slot better be empty because it should already have been swapped out during the random row
92   // selection that was done previously!)
93   if (shuffle_last_row_idx_ < (shuffle_size_ - 1)) {
94     shuffle_buffer_->push_back(std::move(new_shuffle_row));
95     shuffle_last_row_idx_ = (shuffle_buffer_->size()) - 1;
96   } else {
97     if (!(*shuffle_buffer_)[shuffle_last_row_idx_].empty()) {
98       return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
99                     "[Internal ERROR] Last row of shuffle buffer should not be occupied!");
100     }
101     (*shuffle_buffer_)[shuffle_last_row_idx_] = std::move(new_shuffle_row);
102   }
103   return Status::OK();
104 }
105 
106 // Class functor operator () override.
107 // All dataset ops operate by launching a thread (see ExecutionTree). This class functor will
108 // provide the master loop that drives the logic for performing the work
operator ()()109 Status ShuffleOp::operator()() {
110   std::unique_ptr<TensorQTable> new_buffer_table;  // A tensor table to be used for output.
111 
112   // Synchronize with TaskManager once the thread is launched.
113   TaskManager::FindMe()->Post();
114 
115   // Shuffle op does not have workers, and only consumes from child 0.
116   // Create the child iterator to fetch our data from.
117   int32_t worker_id = 0;
118   int32_t child_idx = 0;
119   child_iterator_ = std::make_unique<ChildIterator>(this, worker_id, child_idx);
120 
121   // Main operator loop
122   while (true) {
123     // Do an initial populate of the shuffle buffer
124     RETURN_IF_NOT_OK(InitShuffleBuffer());
125 
126     // This is our main loop exit condition, when the iterator has no more data completely.
127     if (child_iterator_->EofHandled()) {
128       RETURN_IF_NOT_OK(out_connector_->SendEOF());
129       break;
130     }
131 
132     // Next, enter into the main execution loop of the shuffle op.
133     // When the tail index position of our shuffle buffer goes negative it means that we've
134     // fully drained the data from the shuffle buffer and we're done.
135     while (shuffle_last_row_idx_ >= 0) {
136       // Step 1)
137       // Create an output tensor table if one is not created yet.
138       if (!new_buffer_table) {
139         new_buffer_table = std::make_unique<TensorQTable>();
140       }
141 
142       // Step 2)
143       // Randomly select a slot from our shuffle buffer and copy that row into the output
144       // tensor table. We remove the data from the shuffle buffer, leaving that slot
145       // in the table as an empty vector
146       int64_t random_slot = rng_() % (shuffle_last_row_idx_ + 1);
147       TensorRow random_row = std::move((*shuffle_buffer_)[random_slot]);
148       MS_LOG(DEBUG) << "Shuffle operator sending a row to output.";
149       RETURN_IF_NOT_OK(out_connector_->Add(std::move(random_row)));
150 
151       // Step 3)
152       // Take the last row from shuffle buffer, and swap it into the row position that was
153       // just vacated.  This makes the shuffle buffer contiguous, with an empty slot at the
154       // tail of the shuffle buffer.
155       if (random_slot != shuffle_last_row_idx_) {
156         (*shuffle_buffer_)[random_slot] = std::move((*shuffle_buffer_)[shuffle_last_row_idx_]);
157       }
158 
159       // Step 4)
160       // Refill the last slot of the shuffle buffer with the next row from input if we are in the
161       // active state.
162       // If we are in the draining state, we do not need to fetch another row to replace the one we
163       // just drained.
164       if (shuffle_buffer_state_ == kShuffleStateActive) {
165         TensorRow new_row;
166         RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
167 
168         if (!new_row.empty()) {
169           RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
170         } else {
171           shuffle_buffer_state_ = kShuffleStateDrain;
172         }
173       }
174 
175       // If we are draining, reposition (decrement) our tail index in the shuffle buffer since we
176       // just drained a row from it.
177       if (shuffle_buffer_state_ == kShuffleStateDrain) {
178         shuffle_last_row_idx_--;
179       }
180     }
181 
182     // Since we overloaded eoeReceived function, we are responsible to flow the EOE up the
183     // pipeline manually now that we are done draining the shuffle buffer
184     MS_LOG(DEBUG) << "Shuffle operator sending EOE.";
185     RETURN_IF_NOT_OK(out_connector_->SendEOE());
186 
187     // Do not wait for any reset to be flown down from operators above us.
188     // Instead, manually update ourselves and then go reloop to start fetching from child operator
189     // right away.  Any Reset() from the parent will still perform common reset actions.
190     RETURN_IF_NOT_OK(this->SelfReset());
191   }
192 
193   return Status::OK();
194 }
195 
196 // Private function populate the shuffle buffer initially by fetching from the child output
197 // connector until the shuffle buffer is full (or there is no more data coming).
InitShuffleBuffer()198 Status ShuffleOp::InitShuffleBuffer() {
199   MS_LOG(DEBUG) << "Shuffle operator initializing the shuffle buffer.";
200 
201   // The first phase of this operator is to read incoming buffers and then drain those
202   // rows from the buffers, putting them into our own local table of tensors (the shuffle
203   // buffer).
204   // This shuffle buffer initialization phase stops when we've either filled up the
205   // shuffle buffer to it's max size, or the dataset below us is not providing any more
206   // rows.
207   if (shuffle_buffer_state_ != kShuffleStateInit) {
208     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
209                   "Invalid shuffle buffer state, shuffle buffer should be init first or reset after each epoch.");
210   }
211 
212   // Before we drop into the fetching loop, call the fetch once for the first time
213   // to fill the first row and grab the first buffer.
214   TensorRow new_row;
215   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
216 
217   if (child_iterator_->EofHandled()) {
218     MS_LOG(DEBUG) << "Shuffle operator init picked up EOF. No more epochs.";
219     RETURN_IF_NOT_OK(out_connector_->SendEOF());
220     return Status::OK();
221   }
222 
223   if (new_row.empty()) {
224     RETURN_STATUS_UNEXPECTED("Invalid data, unable to fetch a single row for shuffle buffer.");
225   }
226 
227   // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached
228   // the desired shuffle buffer size.
229   while (!new_row.empty() && shuffle_buffer_->size() < static_cast<size_t>(shuffle_size_ - 1)) {
230     // Add the previously fetched row
231     RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
232 
233     // Fetch the next row
234     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
235   }
236 
237   // If we quit the loop due to being at the shuffle size, still need to add the last row here.
238   if (!new_row.empty()) {
239     RETURN_IF_NOT_OK(AddRowToShuffleBuffer(std::move(new_row)));
240     shuffle_buffer_state_ = kShuffleStateActive;  // Transition to the active state
241   } else {
242     // If init phase doesn't have more rows, then skip the active state and jump straight to the
243     // shuffle buffer draining state
244     shuffle_buffer_state_ = kShuffleStateDrain;
245   }
246 
247   MS_LOG(DEBUG) << "Shuffle operator finished initializing the shuffle buffer.";
248   return Status::OK();
249 }
250 
EoeReceived(int32_t worker_id)251 Status ShuffleOp::EoeReceived(int32_t worker_id) {
252   state_ = OpState::kDeOpIdle;
253   return Status::OK();
254 }
255 }  // namespace dataset
256 }  // namespace mindspore
257