1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <fcntl.h>
18 #include <iostream>
19 #include <memory>
20 #include <vector>
21 #include <chrono>
22 #include <thread>
23
24
25 #include "common/common.h"
26 #include "minddata/dataset/engine/connector.h"
27 #include "minddata/dataset/util/task_manager.h"
28 #include "utils/log_adapter.h"
29
30 using namespace mindspore::dataset;
31 using mindspore::MsLogLevel::INFO;
32 using mindspore::ExceptionType::NoExceptionType;
33 using mindspore::LogStream;
34
35 class MindDataTestConnector : public UT::Common {
36 public:
37
38 MindDataTestConnector();
39
40 // Test scenario: single producer, single consumer.
41 // This means there is only one queue in the connector.
42 Status Run_test_0();
43
44 // Test scenario: multiple producers, multiple cosumers
45 // A chain of three layer of thread groups connected by two Connectors between
46 // two layer. You can set different num of threads on layer 1 and 2, and layer 3
47 // that does the serialization to _ouput vector needs to be single thread.
48 // A random sleep/delay can be introduced for each thread. See run().
49 Status Run_test_1();
50
SetSleepMilliSec(uint32_t ms)51 void SetSleepMilliSec(uint32_t ms) { sleep_ms_ = ms; }
52
53 private:
54 std::unique_ptr<TaskGroup> tg_;
55 uint32_t last_input_;
56 uint32_t sleep_ms_ = 0;
57 std::vector<uint32_t> input_;
58 WaitPost wp;
59
60 // This worker loop is to be called by a single thread. It will pop my_conn Connector
61 // and populate output vector
62 Status SerialWorkerPull(
63 int tid,
64 std::shared_ptr<Connector<uint32_t>> my_conn,
65 std::vector<uint32_t> *output
66 );
67
68 // This worker loop read from input_ vector that have complete list of tasks/elements.
69 // The assignment from the elements in input_ to each worker is ensured in RoundRobin,
70 // i.e., tid-0 will pick input_[0], tid-1 will pick input_[1], so-on circular.
71 Status FirstWorkerPush(
72 int tid,
73 std::shared_ptr<Connector<uint32_t> > my_conn,
74 int start_in,
75 int offset);
76
77 // This worker loop read from a Connector and put the result into another Connector.
78 Status MidWorkerJob(
79 int tid,
80 std::shared_ptr<Connector<uint32_t> > from_conn,
81 std::shared_ptr<Connector<uint32_t> > to_conn);
82
83 Status ValidateOutput(const std::vector<uint32_t> &output);
84
85 uint32_t GenRand(int max);
86
87 // Put the current thread to sleep mode for MaxDue milliseconds.
88 // (Imitating nondeterministic processing time)
89 void GoToSleep(int max_dur);
90 };
91
92
93 // Test0 : single producer, single consumer which means there is only one queue in the connector
TEST_F(MindDataTestConnector,Test0)94 TEST_F(MindDataTestConnector, Test0) {
95 MS_LOG(INFO) << "MindDataTestConnector Test0: single producer, single consumer.";
96 Status rc = this->Run_test_0();
97 ASSERT_TRUE(rc.IsOk());
98 rc = TaskManager::GetMasterThreadRc();
99 ASSERT_TRUE(rc.IsOk());
100 }
101
102 // Test1: multiple producers, multiple consumers without random delay
103 // A chain of three layer of thread groups connected by two Connectors between
104 // two layer.
TEST_F(MindDataTestConnector,Test1)105 TEST_F(MindDataTestConnector, Test1) {
106 MS_LOG(INFO) << "MindDataTestConnector Test1.";
107 Status rc = this->Run_test_1();
108 ASSERT_TRUE(rc.IsOk());
109 rc = TaskManager::GetMasterThreadRc();
110 ASSERT_TRUE(rc.IsOk());
111 }
112
113 // Test1: multiple producers, multiple consumers with random delay after push/pop
114 // A chain of three layer of thread groups connected by two Connectors between
115 // two layer.
TEST_F(MindDataTestConnector,Test2)116 TEST_F(MindDataTestConnector, Test2) {
117 MS_LOG(INFO) << "MindDataTestConnector Test2.";
118 this->SetSleepMilliSec(30);
119 Status rc = this->Run_test_1();
120 ASSERT_TRUE(rc.IsOk());
121 rc = TaskManager::GetMasterThreadRc();
122 ASSERT_TRUE(rc.IsOk());
123 }
124
125
126
127 // Implementation of MindDataTestConnector class and the helper functions.
MindDataTestConnector()128 MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) {
129 last_input_ = 150;
130 for (int i = 1; i <= last_input_; i++) {
131 input_.push_back(i);
132 }
133 wp.Register(tg_.get());
134 }
135
Run_test_0()136 Status MindDataTestConnector::Run_test_0() {
137 Status rc;
138 std::vector<uint32_t> output;
139 wp.Clear();
140 auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers
141 1, // num of consumers
142 10); // capacity of each queue
143 MS_ASSERT(my_conn != nullptr);
144
145 rc = my_conn->Register(tg_.get());
146 RETURN_IF_NOT_OK(rc);
147
148 // Spawn a thread to read input_ vector and put it in my_conn
149 rc = tg_->CreateAsyncTask("Worker Push",
150 std::bind(&MindDataTestConnector::FirstWorkerPush,
151 this, // passing this instance
152 0, // id = 0 for this simple one to one case
153 my_conn, // the connector
154 0, // start index to read from the input_ list
155 1)); // the offset to read the next index
156 RETURN_IF_NOT_OK(rc);
157
158 // Spawn another thread to read from my_conn and write to _output vector.
159 rc = tg_->CreateAsyncTask("Worker Pull",
160 std::bind(&MindDataTestConnector::SerialWorkerPull,
161 this,
162 0,
163 my_conn,
164 &output));
165 RETURN_IF_NOT_OK(rc);
166 // Wait for the threads to finish.
167 rc = wp.Wait();
168 EXPECT_TRUE(rc.IsOk());
169 tg_->interrupt_all();
170 tg_->join_all(Task::WaitFlag::kNonBlocking);
171 my_conn.reset();
172 return ValidateOutput(output);
173 }
174
Run_test_1()175 Status MindDataTestConnector::Run_test_1() {
176 std::vector<uint32_t> output;
177 Status rc;
178 wp.Clear();
179
180 // number of threads in each layer
181 int l1_threads = 15;
182 int l2_threads = 20;
183 int l3_threads = 1;
184
185 // Capacity for the first and second connectors
186 int conn1_qcap = 5;
187 int conn2_qcap = 10;
188
189 auto conn1 = std::make_shared<Connector<uint32_t>>(l1_threads, // num of producers
190 l2_threads, // num of consumers
191 conn1_qcap); // the cap of each queue
192
193 auto conn2 = std::make_shared<Connector<uint32_t>>(l2_threads,
194 l3_threads,
195 conn2_qcap);
196
197 rc = conn1->Register(tg_.get());
198 RETURN_IF_NOT_OK(rc);
199 rc = conn2->Register(tg_.get());
200 RETURN_IF_NOT_OK(rc);
201
202 // Instantiating the threads in the first layer
203 for (int i = 0; i < l1_threads; i++) {
204 rc = tg_->CreateAsyncTask("First Worker Push",
205 std::bind(&MindDataTestConnector::FirstWorkerPush,
206 this, // passing this instance
207 i, // thread id in this group of thread
208 conn1, // the connector
209 i, // start index to read from the input_ list
210 l1_threads)); // the offset to read the next index
211 RETURN_IF_NOT_OK(rc);
212 }
213
214 // Instantiating the threads in the 2nd layer
215 for (int i = 0; i < l2_threads; i++) {
216 rc = tg_->CreateAsyncTask("Mid Worker Job",
217 std::bind(&MindDataTestConnector::MidWorkerJob,
218 this, // passing this instance
219 i, // thread id in this group of thread
220 conn1, // the 1st connector
221 conn2)); // the 2nd connector
222 RETURN_IF_NOT_OK(rc);
223 }
224
225 // Last layer doing serialization to one queue to check if the order is preserved
226 rc = tg_->CreateAsyncTask("Worker Pull",
227 std::bind(&MindDataTestConnector::SerialWorkerPull,
228 this,
229 0, // thread id = 0, since it's the only one
230 conn2, // popping the data from conn2
231 &output));
232 RETURN_IF_NOT_OK(rc);
233 // Wait for the threads to finish.
234 rc = wp.Wait();
235 EXPECT_TRUE(rc.IsOk());
236 tg_->interrupt_all();
237 tg_->join_all(Task::WaitFlag::kNonBlocking);
238 conn1.reset();
239 conn2.reset();
240
241 return ValidateOutput(output);
242 }
243
SerialWorkerPull(int tid,std::shared_ptr<Connector<uint32_t>> my_conn,std::vector<uint32_t> * output)244 Status MindDataTestConnector::SerialWorkerPull(
245 int tid,
246 std::shared_ptr<Connector<uint32_t>> my_conn,
247 std::vector<uint32_t> *output
248 ) {
249 Status rc;
250 TaskManager::FindMe()->Post();
251 while (1) {
252 uint32_t res;
253 rc = my_conn->Pop(tid, &res);
254 RETURN_IF_NOT_OK(rc);
255
256 output->push_back(res);
257
258 // Emulate different processing time for each thread
259 if (sleep_ms_ != 0) {
260 GoToSleep(sleep_ms_);
261 }
262
263 // Signal master thread after it processed the last_input_.
264 // This will trigger the MidWorkerJob threads to quit their worker loop.
265 if (res == last_input_) {
266 MS_LOG(INFO) << "All data is collected.";
267 wp.Set();
268 break;
269 }
270 }
271 return Status::OK();
272 }
273
FirstWorkerPush(int tid,std::shared_ptr<Connector<uint32_t>> my_conn,int start_in,int offset)274 Status MindDataTestConnector::FirstWorkerPush(
275 int tid,
276 std::shared_ptr<Connector<uint32_t> > my_conn,
277 int start_in,
278 int offset) {
279 TaskManager::FindMe()->Post();
280 MS_ASSERT(my_conn != nullptr);
281 Status rc;
282 for (int i = start_in; i < input_.size(); i += offset) {
283 rc = my_conn->Push(tid, input_[i]);
284
285 // Emulate different processing time for each thread
286 if (sleep_ms_ != 0)
287 GoToSleep(sleep_ms_);
288 }
289 return Status::OK();
290 }
291
292 // This worker loop read from a Connector and put the result into another Connector.
MidWorkerJob(int tid,std::shared_ptr<Connector<uint32_t>> from_conn,std::shared_ptr<Connector<uint32_t>> to_conn)293 Status MindDataTestConnector::MidWorkerJob(
294 int tid,
295 std::shared_ptr<Connector<uint32_t> > from_conn,
296 std::shared_ptr<Connector<uint32_t> > to_conn) {
297 MS_ASSERT((from_conn != nullptr) && (to_conn != nullptr));
298 Status rc;
299 TaskManager::FindMe()->Post();
300 while (1) {
301 uint32_t el;
302 rc = from_conn->Pop(tid, &el);
303 RETURN_IF_NOT_OK(rc);
304
305 // Emulate different processing time for each thread
306 if (sleep_ms_ != 0) {
307 GoToSleep(sleep_ms_);
308 }
309 rc = to_conn->Push(tid, el);
310 RETURN_IF_NOT_OK(rc);
311 }
312 return Status::OK();
313 }
314
ValidateOutput(const std::vector<uint32_t> & output)315 Status MindDataTestConnector::ValidateOutput(const std::vector<uint32_t> &output) {
316 int prev = 0;
317 for (auto el : output) {
318 if (prev >= el) {
319 return Status(StatusCode::kMDUnexpectedError, "Output vector are not in-order.");
320 }
321 prev = el;
322 }
323 return Status::OK();
324 }
325
GenRand(int max)326 uint32_t MindDataTestConnector::GenRand(int max) {
327 uint32_t r_int = 0;
328 if (max == 0) {
329 return r_int;
330 }
331
332 // open urandom not random
333 int fd = open("/dev/urandom", O_RDONLY);
334 if (fd > 0) {
335 if (read(fd, &r_int, sizeof(uint32_t)) != sizeof(uint32_t)) {
336 r_int = max / 2;
337 }
338 }
339 (void)close(fd); // close it!
340
341 return r_int % max;
342 }
343
344 // Put the current thread to sleep mode for MaxDue milliseconds.
345 // (Imitating nondeterministic processing time)
GoToSleep(int max_dur)346 void MindDataTestConnector::GoToSleep(int max_dur) {
347 uint32_t duration = GenRand(max_dur);
348 std::this_thread::sleep_for(std::chrono::milliseconds(duration));
349 }
350