• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <fcntl.h>
18 #include <iostream>
19 #include <memory>
20 #include <vector>
21 #include <chrono>
22 #include <thread>
23 
24 
25 #include "common/common.h"
26 #include "minddata/dataset/engine/connector.h"
27 #include "minddata/dataset/util/task_manager.h"
28 #include "utils/log_adapter.h"
29 
30 using namespace mindspore::dataset;
31 using mindspore::MsLogLevel::INFO;
32 using mindspore::ExceptionType::NoExceptionType;
33 using mindspore::LogStream;
34 
35 class MindDataTestConnector : public UT::Common {
36 public:
37 
38   MindDataTestConnector();
39 
40   // Test scenario: single producer, single consumer.
41   // This means there is only one queue in the connector.
42   Status Run_test_0();
43 
44   // Test scenario: multiple producers, multiple cosumers
45   // A chain of three layer of thread groups connected by two Connectors between
46   // two layer. You can set different num of threads on layer 1 and 2, and layer 3
47   // that does the serialization to _ouput vector needs to be single thread.
48   // A random sleep/delay can be introduced for each thread. See run().
49   Status Run_test_1();
50 
51   void SetSleepMilliSec(uint32_t ms) { sleep_ms_ = ms; }
52 
53 private:
54   std::unique_ptr<TaskGroup> tg_;
55   uint32_t last_input_;
56   uint32_t sleep_ms_ = 0;
57   std::vector<uint32_t> input_;
58   WaitPost wp;
59 
60   // This worker loop is to be called by a single thread. It will pop my_conn Connector
61   // and populate output vector
62   Status SerialWorkerPull(
63                           int tid,
64                           std::shared_ptr<Connector<uint32_t>> my_conn,
65                           std::vector<uint32_t> *output
66                           );
67 
68   // This worker loop read from input_ vector that have complete list of tasks/elements.
69   // The assignment from the elements in input_ to each worker is ensured in RoundRobin,
70   // i.e., tid-0 will pick input_[0], tid-1 will pick input_[1], so-on circular.
71   Status FirstWorkerPush(
72                          int tid,
73                          std::shared_ptr<Connector<uint32_t> > my_conn,
74                          int start_in,
75                          int offset);
76 
77   // This worker loop read from a Connector and put the result into another Connector.
78   Status MidWorkerJob(
79                       int tid,
80                       std::shared_ptr<Connector<uint32_t> > from_conn,
81                       std::shared_ptr<Connector<uint32_t> > to_conn);
82 
83   Status ValidateOutput(const std::vector<uint32_t> &output);
84 
85   uint32_t GenRand(int max);
86 
87   // Put the current thread to sleep mode for MaxDue milliseconds.
88   // (Imitating nondeterministic processing time)
89   void GoToSleep(int max_dur);
90 };
91 
92 
93 // Test0 : single producer, single consumer which means there is only one queue in the connector
94 TEST_F(MindDataTestConnector, Test0) {
95   MS_LOG(INFO) << "MindDataTestConnector Test0: single producer, single consumer.";
96   Status rc = this->Run_test_0();
97   ASSERT_TRUE(rc.IsOk());
98   rc = TaskManager::GetMasterThreadRc();
99   ASSERT_TRUE(rc.IsOk());
100 }
101 
102 // Test1: multiple producers, multiple consumers without random delay
103 // A chain of three layer of thread groups connected by two Connectors between
104 // two layer.
105 TEST_F(MindDataTestConnector, Test1) {
106   MS_LOG(INFO) << "MindDataTestConnector Test1.";
107   Status rc = this->Run_test_1();
108   ASSERT_TRUE(rc.IsOk());
109   rc = TaskManager::GetMasterThreadRc();
110   ASSERT_TRUE(rc.IsOk());
111 }
112 
113 // Test1: multiple producers, multiple consumers with random delay after push/pop
114 // A chain of three layer of thread groups connected by two Connectors between
115 // two layer.
116 TEST_F(MindDataTestConnector, Test2) {
117   MS_LOG(INFO) << "MindDataTestConnector Test2.";
118   this->SetSleepMilliSec(30);
119   Status rc = this->Run_test_1();
120   ASSERT_TRUE(rc.IsOk());
121   rc = TaskManager::GetMasterThreadRc();
122   ASSERT_TRUE(rc.IsOk());
123 }
124 
125 
126 
127 // Implementation of MindDataTestConnector class and the helper functions.
128 MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) {
129   last_input_ = 150;
130   for (int i = 1; i <= last_input_; i++) {
131     input_.push_back(i);
132   }
133   wp.Register(tg_.get());
134 }
135 
136 Status MindDataTestConnector::Run_test_0() {
137   Status rc;
138   std::vector<uint32_t> output;
139   wp.Clear();
140   auto my_conn = std::make_shared<Connector<uint32_t>>(1,  // num of producers
141                                                       1,  // num of consumers
142                                                       10);  // capacity of each queue
143   MS_ASSERT(my_conn != nullptr);
144 
145   rc = my_conn->Register(tg_.get());
146   RETURN_IF_NOT_OK(rc);
147 
148   // Spawn a thread to read input_ vector and put it in my_conn
149   rc = tg_->CreateAsyncTask("Worker Push",
150                             std::bind(&MindDataTestConnector::FirstWorkerPush,
151                                       this,  // passing this instance
152                                       0,  // id = 0 for this simple one to one case
153                                       my_conn,  // the connector
154                                       0,  // start index to read from the input_ list
155                                       1));  // the offset to read the next index
156   RETURN_IF_NOT_OK(rc);
157 
158   // Spawn another thread to read from my_conn and write to _output vector.
159   rc = tg_->CreateAsyncTask("Worker Pull",
160                             std::bind(&MindDataTestConnector::SerialWorkerPull,
161                                       this,
162                                       0,
163                                       my_conn,
164                                       &output));
165   RETURN_IF_NOT_OK(rc);
166   // Wait for the threads to finish.
167   rc = wp.Wait();
168   EXPECT_TRUE(rc.IsOk());
169   tg_->interrupt_all();
170   tg_->join_all(Task::WaitFlag::kNonBlocking);
171   my_conn.reset();
172   return ValidateOutput(output);
173 }
174 
175 Status MindDataTestConnector::Run_test_1() {
176   std::vector<uint32_t> output;
177   Status rc;
178   wp.Clear();
179 
180   // number of threads in each layer
181   int l1_threads = 15;
182   int l2_threads = 20;
183   int l3_threads = 1;
184 
185   // Capacity for the first and second connectors
186   int conn1_qcap = 5;
187   int conn2_qcap = 10;
188 
189   auto conn1 = std::make_shared<Connector<uint32_t>>(l1_threads,  // num of producers
190                                                      l2_threads,  // num of consumers
191                                                      conn1_qcap);  // the cap of each queue
192 
193   auto conn2 = std::make_shared<Connector<uint32_t>>(l2_threads,
194                                                      l3_threads,
195                                                      conn2_qcap);
196 
197   rc = conn1->Register(tg_.get());
198   RETURN_IF_NOT_OK(rc);
199   rc = conn2->Register(tg_.get());
200   RETURN_IF_NOT_OK(rc);
201 
202   // Instantiating the threads in the first layer
203   for (int i = 0; i < l1_threads; i++) {
204     rc = tg_->CreateAsyncTask("First Worker Push",
205                               std::bind(&MindDataTestConnector::FirstWorkerPush,
206                                         this,  // passing this instance
207                                         i,  // thread id in this group of thread
208                                         conn1,  // the connector
209                                         i,  // start index to read from the input_ list
210                                         l1_threads));   // the offset to read the next index
211     RETURN_IF_NOT_OK(rc);
212   }
213 
214   // Instantiating the threads in the 2nd layer
215   for (int i = 0; i < l2_threads; i++) {
216     rc = tg_->CreateAsyncTask("Mid Worker Job",
217                               std::bind(&MindDataTestConnector::MidWorkerJob,
218                                         this,  // passing this instance
219                                         i,  // thread id in this group of thread
220                                         conn1,  // the 1st connector
221                                         conn2));    // the 2nd connector
222     RETURN_IF_NOT_OK(rc);
223   }
224 
225   // Last layer doing serialization to one queue to check if the order is preserved
226   rc = tg_->CreateAsyncTask("Worker Pull",
227                             std::bind(&MindDataTestConnector::SerialWorkerPull,
228                                       this,
229                                       0,  // thread id = 0, since it's the only one
230                                       conn2,  // popping the data from conn2
231                                       &output));
232   RETURN_IF_NOT_OK(rc);
233   // Wait for the threads to finish.
234   rc = wp.Wait();
235   EXPECT_TRUE(rc.IsOk());
236   tg_->interrupt_all();
237   tg_->join_all(Task::WaitFlag::kNonBlocking);
238   conn1.reset();
239   conn2.reset();
240 
241   return ValidateOutput(output);
242 }
243 
244 Status MindDataTestConnector::SerialWorkerPull(
245                                                int tid,
246                                                std::shared_ptr<Connector<uint32_t>> my_conn,
247                                                std::vector<uint32_t> *output
248                                        ) {
249   Status rc;
250   TaskManager::FindMe()->Post();
251   while (1) {
252     uint32_t res;
253     rc = my_conn->Pop(tid, &res);
254     RETURN_IF_NOT_OK(rc);
255 
256     output->push_back(res);
257 
258     // Emulate different processing time for each thread
259     if (sleep_ms_ != 0) {
260       GoToSleep(sleep_ms_);
261     }
262 
263     // Signal master thread after it processed the last_input_.
264     // This will trigger the MidWorkerJob threads to quit their worker loop.
265     if (res == last_input_) {
266       MS_LOG(INFO) << "All data is collected.";
267       wp.Set();
268       break;
269     }
270   }
271   return Status::OK();
272 }
273 
274 Status MindDataTestConnector::FirstWorkerPush(
275                                       int tid,
276                                       std::shared_ptr<Connector<uint32_t> > my_conn,
277                                       int start_in,
278                                       int offset)  {
279   TaskManager::FindMe()->Post();
280   MS_ASSERT(my_conn != nullptr);
281   Status rc;
282   for (int i = start_in; i < input_.size(); i += offset) {
283     rc = my_conn->Push(tid, input_[i]);
284 
285     // Emulate different processing time for each thread
286     if (sleep_ms_ != 0)
287       GoToSleep(sleep_ms_);
288   }
289   return Status::OK();
290 }
291 
292 // This worker loop read from a Connector and put the result into another Connector.
293 Status MindDataTestConnector::MidWorkerJob(
294                                    int tid,
295                                    std::shared_ptr<Connector<uint32_t> > from_conn,
296                                    std::shared_ptr<Connector<uint32_t> > to_conn) {
297   MS_ASSERT((from_conn != nullptr) && (to_conn != nullptr));
298   Status rc;
299   TaskManager::FindMe()->Post();
300   while (1) {
301     uint32_t el;
302     rc = from_conn->Pop(tid, &el);
303     RETURN_IF_NOT_OK(rc);
304 
305     // Emulate different processing time for each thread
306     if (sleep_ms_ != 0) {
307       GoToSleep(sleep_ms_);
308     }
309     rc = to_conn->Push(tid, el);
310     RETURN_IF_NOT_OK(rc);
311   }
312   return Status::OK();
313 }
314 
315 Status MindDataTestConnector::ValidateOutput(const std::vector<uint32_t> &output) {
316   int prev = 0;
317   for (auto el : output) {
318     if (prev >= el) {
319       return Status(StatusCode::kMDUnexpectedError, "Output vector are not in-order.");
320     }
321     prev = el;
322   }
323   return Status::OK();
324 }
325 
326 uint32_t MindDataTestConnector::GenRand(int max) {
327   uint32_t r_int = 0;
328   if (max == 0) {
329     return r_int;
330   }
331 
332   // open urandom not random
333   int fd = open("/dev/urandom", O_RDONLY);
334   if (fd > 0) {
335     if (read(fd, &r_int, sizeof(uint32_t)) != sizeof(uint32_t)) {
336       r_int = max / 2;
337     }
338   }
339   (void)close(fd);  // close it!
340 
341   return r_int % max;
342 }
343 
344 // Put the current thread to sleep mode for MaxDue milliseconds.
345 // (Imitating nondeterministic processing time)
346 void MindDataTestConnector::GoToSleep(int max_dur) {
347   uint32_t duration = GenRand(max_dur);
348   std::this_thread::sleep_for(std::chrono::milliseconds(duration));
349 }
350