• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
18 
19 #include <algorithm>
20 #include <iomanip>
21 #include <random>
22 #include "minddata/dataset/engine/execution_tree.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/util/random.h"
25 #include "minddata/dataset/util/wait_post.h"
26 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
27 
28 namespace mindspore {
29 namespace dataset {
30 // Constructor for RandomDataOp
RandomDataOp(int32_t num_workers,int32_t op_connector_size,int64_t total_rows,std::unique_ptr<DataSchema> data_schema)31 RandomDataOp::RandomDataOp(int32_t num_workers, int32_t op_connector_size, int64_t total_rows,
32                            std::unique_ptr<DataSchema> data_schema)
33     : ParallelOp(num_workers, op_connector_size),
34       total_rows_(total_rows),
35       epoch_rows_sent_(0),
36       guys_in_(0),
37       guys_out_(num_workers_),
38       eoe_worker_id_(0),
39       data_schema_(std::move(data_schema)) {
40   rand_gen_.seed(GetSeed());  // seed the random generator
41   // If total rows was not given, then randomly pick a number
42   if (total_rows_ == 0) {
43     total_rows_ = GenRandomInt(1, kMaxTotalRows);
44   }
45   // If the user did not provide a schema, then we will ask the op to generate a pseudo-random schema.
46   // See details of generateSchema function to learn what type of schema it will create.
47   if (data_schema_ == nullptr) {
48     GenerateSchema();
49   }
50   // Everyone is already out from the sync area.
51   all_out_.Set();
52 }
53 
54 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const55 void RandomDataOp::Print(std::ostream &out, bool show_all) const {
56   if (!show_all) {
57     // Call the super class for displaying any common 1-liner info
58     ParallelOp::Print(out, show_all);
59     // Then show any custom derived-internal 1-liner info for this op
60     out << " [total rows: " << total_rows_ << "]\n";
61   } else {
62     // Call the super class for displaying any common detailed info
63     ParallelOp::Print(out, show_all);
64     // Then show any custom derived-internal stuff
65     out << "\nTotal_rows: " << total_rows_ << " \nSchema:\n" << *data_schema_ << "\n\n";
66   }
67 }
68 
69 // Helper function to produce a default/random schema if one didn't exist
GenerateSchema()70 void RandomDataOp::GenerateSchema() {
71   const int32_t type_offset = 2;
72   // To randomly create a schema, we need to choose:
73   // a) how many columns
74   // b) the type of each column
75   // c) the shape of each column (number of dimensions i.e. rank)
76   // d) the shape of each column (dimension values)
77   data_schema_ = std::make_unique<DataSchema>();
78   std::unique_ptr<TensorShape> new_shape;
79   std::unique_ptr<ColDescriptor> new_col;
80 
81   // Loop over the number of chosen columns
82   int32_t numColumns = GenRandomInt(1, kMaxNumColumns);
83   for (int32_t i = 0; i < numColumns; i++) {
84     // For each column:
85     // - choose a datatype
86     // - generate a shape that randomly chooses the number of dimensions and the dimension values.
87     DataType::Type newType = static_cast<DataType::Type>(GenRandomInt(1, DataType::NUM_OF_TYPES - type_offset));
88     int32_t rank = GenRandomInt(1, kMaxRank);
89     std::vector<dsize_t> dims;
90     for (int32_t d = 0; d < rank; d++) {
91       // 0 is not a valid dimension value.  however, we can support "*" or unknown, so map the random
92       // 0 value to the unknown attribute if 0 is chosen
93       dsize_t dim_value = static_cast<dsize_t>(GenRandomInt(0, kMaxDimValue));
94       if (dim_value == 0) {
95         dim_value = TensorShape::kDimUnknown;
96       }
97       dims.push_back(dim_value);
98     }
99     new_shape = std::make_unique<TensorShape>(dims);
100 
101     // Create the column descriptor
102     std::string col_name = "c" + std::to_string(i);
103     new_col =
104       std::make_unique<ColDescriptor>(col_name, DataType(newType), TensorImpl::kFlexible, rank, new_shape.get());
105 
106     Status rc = data_schema_->AddColumn(*new_col);
107     if (rc.IsError()) MS_LOG(ERROR) << "Failed to generate a schema. Message:" << rc;
108   }
109 }
110 
111 // Class functor operator () override.
112 // All DatasetOps operate by launching a thread (see ExecutionTree). This class functor will
113 // provide the master loop that drives the logic for performing the work.
operator ()()114 Status RandomDataOp::operator()() {
115   CHECK_FAIL_RETURN_UNEXPECTED(total_rows_ >= num_workers_,
116                                "RandomDataOp expects total_rows < num_workers. Try adjust num_workers, total_row=" +
117                                  std::to_string(total_rows_) + ", num_workers=" + std::to_string(num_workers_) + " .");
118 
119   // If the amount of workers we have exceeds the number of rows to produce, then we'll have
120   // idle workers doing nothing.  In that case, let's throttle the worker count.
121   if (num_workers_ > total_rows_) {
122     MS_LOG(INFO) << "RandomDataOp throttling worker count from " << num_workers_ << "to " << total_rows_;
123     num_workers_ = total_rows_;
124     num_producers_ = num_workers_;
125     guys_out_ = num_workers_;
126     // The output connector was already created with a different worker count.  We have to drop and recreate
127     // that connector.
128     DatasetOp::CreateConnector(num_producers_, num_workers_);
129   }
130 
131   if (num_workers_ == 0) {
132     RETURN_STATUS_UNEXPECTED("Invalid data, num_workers_ is zero.");
133   }
134   // Assign the number of rows to each worker in a round robin fashion.
135   worker_max_rows_.reserve(num_workers_);
136   worker_rows_packed_.reserve(num_workers_);
137   // init the counts to zero to start.
138   for (int32_t w = 0; w < num_workers_; w++) {
139     worker_max_rows_.push_back(0);
140     worker_rows_packed_.push_back(0);
141   }
142   // then assign round robin row counts
143   int32_t currentWorker = 0;
144   for (int64_t r = 0; r < total_rows_; r++) {
145     worker_max_rows_[currentWorker]++;
146     currentWorker = (currentWorker + 1) % num_workers_;
147   }
148 
149   // Next, compute the total rows count.  This stat is needed during reset logic
150   for (int32_t w = 0; w < num_workers_; w++) {
151     epoch_rows_sent_ += worker_max_rows_[w];
152   }
153 
154   // For the connector to work, we need to target the correct worker channel for the eoe.
155   // This will initialize it for the first one.  reset() handles for the rest of the epochs.
156   eoe_worker_id_ = epoch_rows_sent_ % num_workers_;
157   epoch_rows_sent_++;  // Add the eoe row to the count for subsequent epochs
158 
159   // RandomDataOp doesn't need the master thread to stay around.  Kick off the workers and then master exits.
160   RETURN_IF_NOT_OK(
161     tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1), "", id()));
162 
163   // required task group setup after launching workers
164   TaskManager::FindMe()->Post();
165   RETURN_IF_NOT_OK(epoch_sync_wait_post_.Register(tree_->AllTasks()));
166 
167   return Status::OK();
168 }
169 
170 // Performs a synchronization between workers at the end of an epoch
EpochSync(int32_t worker_id,bool * quitting)171 Status RandomDataOp::EpochSync(int32_t worker_id, bool *quitting) {
172   MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " syncing at end of epoch";
173 
174   // Sync on the guys_in counter
175   // We have to wait the last guy is out.
176   RETURN_IF_NOT_OK(all_out_.Wait());
177   // If we are not in a repeat loop, or that was the last repeat already, then setup our exit
178   // condition from the master loop.
179   if (IsLastIteration()) {
180     *quitting = true;
181   }
182 
183   auto prev = guys_in_.fetch_add(1);
184   bool last_guy_in = (prev + 1) == num_workers_;
185   // If we are the last worker to hit this sync point, we have some extra tasks
186   if (last_guy_in) {
187     MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " is the last one to sync. eoe sent as worker "
188                  << eoe_worker_id_;
189     UpdateRepeatAndEpochCounter();
190     // Prepare for sync
191     all_out_.Clear();
192     // Always flow eoe at the end
193     RETURN_IF_NOT_OK(out_connector_->SendEOE(eoe_worker_id_));
194     // If we're done then also flow the eof
195     if (*quitting) {
196       // The eof needs to be sent from the next sender in the round robin, so +1
197       int32_t eof_worker_id = (eoe_worker_id_ + 1) % num_workers_;
198       MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " has no more epochs.  sending eof as worker "
199                    << eof_worker_id;
200       RETURN_IF_NOT_OK(out_connector_->SendEOF(eof_worker_id));
201     }
202   }
203 
204   if (!(*quitting)) {
205     MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " entering sync wait.";
206     if (last_guy_in) {
207       // If we are the last worker, do reset to wake other workers up
208       RETURN_IF_NOT_OK(Reset());
209     } else {
210       // If we are not the last worker, wait for the reset
211       RETURN_IF_NOT_OK(epoch_sync_wait_post_.Wait());
212     }
213     prev = guys_out_.fetch_add(1);
214     bool last_guy_out = (prev + 1) == num_workers_;
215     // Last guy out will clear the wait post and set the row counts
216     if (last_guy_out) {
217       MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " last guy out clearing wait post.";
218       epoch_sync_wait_post_.Clear();
219       guys_in_ = 0;
220       all_out_.Set();
221     }
222   }
223 
224   MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " epoch sync complete.";
225   return Status::OK();
226 }
227 
228 // The entry point code for when workers are launched
WorkerEntry(int32_t worker_id)229 Status RandomDataOp::WorkerEntry(int32_t worker_id) {
230   MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " entry";
231 
232   // handshake with the master first to tell it we're alive
233   TaskManager::FindMe()->Post();
234 
235   bool quitting = false;
236   std::unique_ptr<TensorQTable> new_tensor_table = nullptr;
237 
238   // Loop until the quitting variable gets set to true
239   do {
240     // If we have not yet reached the row count for this worker then produce another record
241     if (worker_rows_packed_[worker_id] < worker_max_rows_[worker_id]) {
242       TensorRow new_row;
243 
244       // Start a new tensor table if needed
245       if (new_tensor_table == nullptr) {
246         new_tensor_table = std::make_unique<TensorQTable>();
247       }
248 
249       // Create the data for the row
250       RETURN_IF_NOT_OK(CreateRandomRow(worker_id, &new_row));
251 
252       // Add the row to our table
253       worker_rows_packed_[worker_id]++;
254 
255       // Send new_row out
256       RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), worker_id));
257     } else {
258       // Now, let's enter the epoch sync
259       RETURN_IF_NOT_OK(EpochSync(worker_id, &quitting));
260     }
261   } while (!quitting);
262 
263   MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " is now quitting.";
264 
265   return Status::OK();
266 }
267 
268 // A helper function to create random data for the row
CreateRandomRow(int32_t worker_id,TensorRow * new_row)269 Status RandomDataOp::CreateRandomRow(int32_t worker_id, TensorRow *new_row) {
270   if (new_row == nullptr) {
271     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] Missing tensor row output.");
272   }
273 
274   // Create a tensor for each column, then add the tensor to the row
275   for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
276     const ColDescriptor current_col = data_schema_->Column(i);
277     std::vector<dsize_t> current_shape = current_col.Shape().AsVector();
278     std::unique_ptr<TensorShape> new_shape = nullptr;
279     std::unique_ptr<unsigned char[]> buf = nullptr;
280     std::shared_ptr<Tensor> new_tensor = nullptr;
281 
282     // We need to resolve the shape to fill in any unknown dimensions with random
283     // values, then use that as our shape for this tensor.
284     for (int j = 0; j < current_shape.size(); ++j) {
285       if (current_shape[j] == TensorShape::kDimUnknown) {
286         current_shape[j] = static_cast<dsize_t>(GenRandomInt(1, kMaxDimValue));
287       }
288     }
289 
290     new_shape = std::make_unique<TensorShape>(current_shape);
291     int64_t size_in_bytes = new_shape->NumOfElements() * current_col.Type().SizeInBytes();
292 
293     // Generate a random byte of data.  This may cause some funny data for things like doubles,floats, bools
294     // however the random data op is not too concerned about the physical data itself.
295     std::uniform_int_distribution<uint8_t> uniDist(0, UINT8_MAX);
296     uint8_t random_byte = uniDist(rand_gen_);
297 
298     // Now, create a chunk of memory for the entire tensor and copy this byte in repeatedly.
299     buf = std::make_unique<unsigned char[]>(size_in_bytes);
300     int ret_code = memset_s(buf.get(), size_in_bytes, random_byte, size_in_bytes);
301     if (ret_code != 0) {
302       return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Failed to set random bytes for a tensor.");
303     }
304 
305     RETURN_IF_NOT_OK(Tensor::CreateFromMemory(*new_shape, current_col.Type(), buf.get(), &new_tensor));
306 
307     // Add this tensor to the tensor row for output
308     (*new_row).push_back(std::move(new_tensor));
309   }
310   return Status::OK();
311 }
312 
313 // Overrides base class reset method.  When an operator does a reset, it cleans up any state
314 // info from it's previous execution and then initializes itself so that it can be executed
315 // again.
Reset()316 Status RandomDataOp::Reset() {
317   MS_LOG(DEBUG) << Name() << " performing a self-reset.";
318 
319   // Ensure all guys are in the waitpost
320   if (guys_in_ != num_workers_) {
321     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
322                   "Issuing a reset, but some workers are missing from epochSync!");
323   }
324 
325   // reset the row counters for all workers
326   for (int32_t w = 0; w < num_workers_; w++) {
327     worker_rows_packed_[w] = 0;
328     worker_max_rows_[w] = 0;
329   }
330 
331   // Re-assign round robin row counts, starting from the worker after the one that gave
332   // the eoe last time
333   int32_t currentWorker = (eoe_worker_id_ + 1) % num_workers_;
334   for (int64_t r = 0; r < total_rows_; r++) {
335     worker_max_rows_[currentWorker]++;
336     currentWorker = (currentWorker + 1) % num_workers_;
337   }
338 
339   // Compute which worker should get the eoe for the next epoch
340   eoe_worker_id_ = ((epoch_rows_sent_ % num_workers_) + eoe_worker_id_) % num_workers_;
341 
342   // Wake up the workers to get them going again in a new epoch
343   guys_out_ = 0;
344   epoch_sync_wait_post_.Set();
345 
346   return Status::OK();
347 }
348 
ComputeColMap()349 Status RandomDataOp::ComputeColMap() {
350   // Extract the column name mapping from the schema and save it in the class.
351   if (column_name_id_map_.empty()) {
352     RETURN_IF_NOT_OK(data_schema_->GetColumnNameMap(&(column_name_id_map_)));
353   } else {
354     MS_LOG(WARNING) << "Column name map is already set!";
355   }
356   return Status::OK();
357 }
358 
359 }  // namespace dataset
360 }  // namespace mindspore
361