1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/datasetops/source/random_data_op.h"
18
19 #include <algorithm>
20 #include <iomanip>
21 #include <random>
22 #include "minddata/dataset/engine/execution_tree.h"
23 #include "minddata/dataset/core/config_manager.h"
24 #include "minddata/dataset/util/random.h"
25 #include "minddata/dataset/util/wait_post.h"
26 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
27
28 namespace mindspore {
29 namespace dataset {
30 // Constructor for RandomDataOp
RandomDataOp(int32_t num_workers,int32_t op_connector_size,int64_t total_rows,std::unique_ptr<DataSchema> data_schema)31 RandomDataOp::RandomDataOp(int32_t num_workers, int32_t op_connector_size, int64_t total_rows,
32 std::unique_ptr<DataSchema> data_schema)
33 : ParallelOp(num_workers, op_connector_size),
34 total_rows_(total_rows),
35 epoch_rows_sent_(0),
36 guys_in_(0),
37 guys_out_(num_workers_),
38 eoe_worker_id_(0),
39 data_schema_(std::move(data_schema)) {
40 rand_gen_.seed(GetSeed()); // seed the random generator
41 // If total rows was not given, then randomly pick a number
42 if (total_rows_ == 0) {
43 total_rows_ = GenRandomInt(1, kMaxTotalRows);
44 }
45 // If the user did not provide a schema, then we will ask the op to generate a pseudo-random schema.
46 // See details of generateSchema function to learn what type of schema it will create.
47 if (data_schema_ == nullptr) {
48 GenerateSchema();
49 }
50 // Everyone is already out from the sync area.
51 all_out_.Set();
52 }
53
54 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const55 void RandomDataOp::Print(std::ostream &out, bool show_all) const {
56 if (!show_all) {
57 // Call the super class for displaying any common 1-liner info
58 ParallelOp::Print(out, show_all);
59 // Then show any custom derived-internal 1-liner info for this op
60 out << " [total rows: " << total_rows_ << "]\n";
61 } else {
62 // Call the super class for displaying any common detailed info
63 ParallelOp::Print(out, show_all);
64 // Then show any custom derived-internal stuff
65 out << "\nTotal_rows: " << total_rows_ << " \nSchema:\n" << *data_schema_ << "\n\n";
66 }
67 }
68
69 // Helper function to produce a default/random schema if one didn't exist
GenerateSchema()70 void RandomDataOp::GenerateSchema() {
71 const int32_t type_offset = 2;
72 // To randomly create a schema, we need to choose:
73 // a) how many columns
74 // b) the type of each column
75 // c) the shape of each column (number of dimensions i.e. rank)
76 // d) the shape of each column (dimension values)
77 data_schema_ = std::make_unique<DataSchema>();
78 std::unique_ptr<TensorShape> new_shape;
79 std::unique_ptr<ColDescriptor> new_col;
80
81 // Loop over the number of chosen columns
82 int32_t numColumns = GenRandomInt(1, kMaxNumColumns);
83 for (int32_t i = 0; i < numColumns; i++) {
84 // For each column:
85 // - choose a datatype
86 // - generate a shape that randomly chooses the number of dimensions and the dimension values.
87 DataType::Type newType = static_cast<DataType::Type>(GenRandomInt(1, DataType::NUM_OF_TYPES - type_offset));
88 int32_t rank = GenRandomInt(1, kMaxRank);
89 std::vector<dsize_t> dims;
90 for (int32_t d = 0; d < rank; d++) {
91 // 0 is not a valid dimension value. however, we can support "*" or unknown, so map the random
92 // 0 value to the unknown attribute if 0 is chosen
93 dsize_t dim_value = static_cast<dsize_t>(GenRandomInt(0, kMaxDimValue));
94 if (dim_value == 0) {
95 dim_value = TensorShape::kDimUnknown;
96 }
97 dims.push_back(dim_value);
98 }
99 new_shape = std::make_unique<TensorShape>(dims);
100
101 // Create the column descriptor
102 std::string col_name = "c" + std::to_string(i);
103 new_col =
104 std::make_unique<ColDescriptor>(col_name, DataType(newType), TensorImpl::kFlexible, rank, new_shape.get());
105
106 Status rc = data_schema_->AddColumn(*new_col);
107 if (rc.IsError()) MS_LOG(ERROR) << "Failed to generate a schema. Message:" << rc;
108 }
109 }
110
111 // Class functor operator () override.
112 // All DatasetOps operate by launching a thread (see ExecutionTree). This class functor will
113 // provide the master loop that drives the logic for performing the work.
operator ()()114 Status RandomDataOp::operator()() {
115 CHECK_FAIL_RETURN_UNEXPECTED(total_rows_ >= num_workers_,
116 "RandomDataOp expects total_rows < num_workers. Try adjust num_workers, total_row=" +
117 std::to_string(total_rows_) + ", num_workers=" + std::to_string(num_workers_) + " .");
118
119 // If the amount of workers we have exceeds the number of rows to produce, then we'll have
120 // idle workers doing nothing. In that case, let's throttle the worker count.
121 if (num_workers_ > total_rows_) {
122 MS_LOG(INFO) << "RandomDataOp throttling worker count from " << num_workers_ << "to " << total_rows_;
123 num_workers_ = total_rows_;
124 num_producers_ = num_workers_;
125 guys_out_ = num_workers_;
126 // The output connector was already created with a different worker count. We have to drop and recreate
127 // that connector.
128 DatasetOp::CreateConnector(num_producers_, num_workers_);
129 }
130
131 if (num_workers_ == 0) {
132 RETURN_STATUS_UNEXPECTED("Invalid data, num_workers_ is zero.");
133 }
134 // Assign the number of rows to each worker in a round robin fashion.
135 worker_max_rows_.reserve(num_workers_);
136 worker_rows_packed_.reserve(num_workers_);
137 // init the counts to zero to start.
138 for (int32_t w = 0; w < num_workers_; w++) {
139 worker_max_rows_.push_back(0);
140 worker_rows_packed_.push_back(0);
141 }
142 // then assign round robin row counts
143 int32_t currentWorker = 0;
144 for (int64_t r = 0; r < total_rows_; r++) {
145 worker_max_rows_[currentWorker]++;
146 currentWorker = (currentWorker + 1) % num_workers_;
147 }
148
149 // Next, compute the total rows count. This stat is needed during reset logic
150 for (int32_t w = 0; w < num_workers_; w++) {
151 epoch_rows_sent_ += worker_max_rows_[w];
152 }
153
154 // For the connector to work, we need to target the correct worker channel for the eoe.
155 // This will initialize it for the first one. reset() handles for the rest of the epochs.
156 eoe_worker_id_ = epoch_rows_sent_ % num_workers_;
157 epoch_rows_sent_++; // Add the eoe row to the count for subsequent epochs
158
159 // RandomDataOp doesn't need the master thread to stay around. Kick off the workers and then master exits.
160 RETURN_IF_NOT_OK(
161 tree_->LaunchWorkers(num_workers_, std::bind(&RandomDataOp::WorkerEntry, this, std::placeholders::_1), "", id()));
162
163 // required task group setup after launching workers
164 TaskManager::FindMe()->Post();
165 RETURN_IF_NOT_OK(epoch_sync_wait_post_.Register(tree_->AllTasks()));
166
167 return Status::OK();
168 }
169
170 // Performs a synchronization between workers at the end of an epoch
EpochSync(int32_t worker_id,bool * quitting)171 Status RandomDataOp::EpochSync(int32_t worker_id, bool *quitting) {
172 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " syncing at end of epoch";
173
174 // Sync on the guys_in counter
175 // We have to wait the last guy is out.
176 RETURN_IF_NOT_OK(all_out_.Wait());
177 // If we are not in a repeat loop, or that was the last repeat already, then setup our exit
178 // condition from the master loop.
179 if (IsLastIteration()) {
180 *quitting = true;
181 }
182
183 auto prev = guys_in_.fetch_add(1);
184 bool last_guy_in = (prev + 1) == num_workers_;
185 // If we are the last worker to hit this sync point, we have some extra tasks
186 if (last_guy_in) {
187 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " is the last one to sync. eoe sent as worker "
188 << eoe_worker_id_;
189 UpdateRepeatAndEpochCounter();
190 // Prepare for sync
191 all_out_.Clear();
192 // Always flow eoe at the end
193 RETURN_IF_NOT_OK(out_connector_->SendEOE(eoe_worker_id_));
194 // If we're done then also flow the eof
195 if (*quitting) {
196 // The eof needs to be sent from the next sender in the round robin, so +1
197 int32_t eof_worker_id = (eoe_worker_id_ + 1) % num_workers_;
198 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " has no more epochs. sending eof as worker "
199 << eof_worker_id;
200 RETURN_IF_NOT_OK(out_connector_->SendEOF(eof_worker_id));
201 }
202 }
203
204 if (!(*quitting)) {
205 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " entering sync wait.";
206 if (last_guy_in) {
207 // If we are the last worker, do reset to wake other workers up
208 RETURN_IF_NOT_OK(Reset());
209 } else {
210 // If we are not the last worker, wait for the reset
211 RETURN_IF_NOT_OK(epoch_sync_wait_post_.Wait());
212 }
213 prev = guys_out_.fetch_add(1);
214 bool last_guy_out = (prev + 1) == num_workers_;
215 // Last guy out will clear the wait post and set the row counts
216 if (last_guy_out) {
217 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " last guy out clearing wait post.";
218 epoch_sync_wait_post_.Clear();
219 guys_in_ = 0;
220 all_out_.Set();
221 }
222 }
223
224 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " epoch sync complete.";
225 return Status::OK();
226 }
227
228 // The entry point code for when workers are launched
WorkerEntry(int32_t worker_id)229 Status RandomDataOp::WorkerEntry(int32_t worker_id) {
230 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " entry";
231
232 // handshake with the master first to tell it we're alive
233 TaskManager::FindMe()->Post();
234
235 bool quitting = false;
236 std::unique_ptr<TensorQTable> new_tensor_table = nullptr;
237
238 // Loop until the quitting variable gets set to true
239 do {
240 // If we have not yet reached the row count for this worker then produce another record
241 if (worker_rows_packed_[worker_id] < worker_max_rows_[worker_id]) {
242 TensorRow new_row;
243
244 // Start a new tensor table if needed
245 if (new_tensor_table == nullptr) {
246 new_tensor_table = std::make_unique<TensorQTable>();
247 }
248
249 // Create the data for the row
250 RETURN_IF_NOT_OK(CreateRandomRow(worker_id, &new_row));
251
252 // Add the row to our table
253 worker_rows_packed_[worker_id]++;
254
255 // Send new_row out
256 RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), worker_id));
257 } else {
258 // Now, let's enter the epoch sync
259 RETURN_IF_NOT_OK(EpochSync(worker_id, &quitting));
260 }
261 } while (!quitting);
262
263 MS_LOG(INFO) << "RandomDataOp worker " << worker_id << " is now quitting.";
264
265 return Status::OK();
266 }
267
268 // A helper function to create random data for the row
CreateRandomRow(int32_t worker_id,TensorRow * new_row)269 Status RandomDataOp::CreateRandomRow(int32_t worker_id, TensorRow *new_row) {
270 if (new_row == nullptr) {
271 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] Missing tensor row output.");
272 }
273
274 // Create a tensor for each column, then add the tensor to the row
275 for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
276 const ColDescriptor current_col = data_schema_->Column(i);
277 std::vector<dsize_t> current_shape = current_col.Shape().AsVector();
278 std::unique_ptr<TensorShape> new_shape = nullptr;
279 std::unique_ptr<unsigned char[]> buf = nullptr;
280 std::shared_ptr<Tensor> new_tensor = nullptr;
281
282 // We need to resolve the shape to fill in any unknown dimensions with random
283 // values, then use that as our shape for this tensor.
284 for (int j = 0; j < current_shape.size(); ++j) {
285 if (current_shape[j] == TensorShape::kDimUnknown) {
286 current_shape[j] = static_cast<dsize_t>(GenRandomInt(1, kMaxDimValue));
287 }
288 }
289
290 new_shape = std::make_unique<TensorShape>(current_shape);
291 int64_t size_in_bytes = new_shape->NumOfElements() * current_col.Type().SizeInBytes();
292
293 // Generate a random byte of data. This may cause some funny data for things like doubles,floats, bools
294 // however the random data op is not too concerned about the physical data itself.
295 std::uniform_int_distribution<uint8_t> uniDist(0, UINT8_MAX);
296 uint8_t random_byte = uniDist(rand_gen_);
297
298 // Now, create a chunk of memory for the entire tensor and copy this byte in repeatedly.
299 buf = std::make_unique<unsigned char[]>(size_in_bytes);
300 int ret_code = memset_s(buf.get(), size_in_bytes, random_byte, size_in_bytes);
301 if (ret_code != 0) {
302 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Failed to set random bytes for a tensor.");
303 }
304
305 RETURN_IF_NOT_OK(Tensor::CreateFromMemory(*new_shape, current_col.Type(), buf.get(), &new_tensor));
306
307 // Add this tensor to the tensor row for output
308 (*new_row).push_back(std::move(new_tensor));
309 }
310 return Status::OK();
311 }
312
313 // Overrides base class reset method. When an operator does a reset, it cleans up any state
314 // info from it's previous execution and then initializes itself so that it can be executed
315 // again.
Reset()316 Status RandomDataOp::Reset() {
317 MS_LOG(DEBUG) << Name() << " performing a self-reset.";
318
319 // Ensure all guys are in the waitpost
320 if (guys_in_ != num_workers_) {
321 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
322 "Issuing a reset, but some workers are missing from epochSync!");
323 }
324
325 // reset the row counters for all workers
326 for (int32_t w = 0; w < num_workers_; w++) {
327 worker_rows_packed_[w] = 0;
328 worker_max_rows_[w] = 0;
329 }
330
331 // Re-assign round robin row counts, starting from the worker after the one that gave
332 // the eoe last time
333 int32_t currentWorker = (eoe_worker_id_ + 1) % num_workers_;
334 for (int64_t r = 0; r < total_rows_; r++) {
335 worker_max_rows_[currentWorker]++;
336 currentWorker = (currentWorker + 1) % num_workers_;
337 }
338
339 // Compute which worker should get the eoe for the next epoch
340 eoe_worker_id_ = ((epoch_rows_sent_ % num_workers_) + eoe_worker_id_) % num_workers_;
341
342 // Wake up the workers to get them going again in a new epoch
343 guys_out_ = 0;
344 epoch_sync_wait_post_.Set();
345
346 return Status::OK();
347 }
348
ComputeColMap()349 Status RandomDataOp::ComputeColMap() {
350 // Extract the column name mapping from the schema and save it in the class.
351 if (column_name_id_map_.empty()) {
352 RETURN_IF_NOT_OK(data_schema_->GetColumnNameMap(&(column_name_id_map_)));
353 } else {
354 MS_LOG(WARNING) << "Column name map is already set!";
355 }
356 return Status::OK();
357 }
358
359 } // namespace dataset
360 } // namespace mindspore
361