1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include <fcntl.h> 18 #include <iostream> 19 #include <memory> 20 #include <vector> 21 #include <chrono> 22 #include <thread> 23 24 25 #include "common/common.h" 26 #include "minddata/dataset/engine/connector.h" 27 #include "minddata/dataset/util/task_manager.h" 28 #include "utils/log_adapter.h" 29 30 using namespace mindspore::dataset; 31 using mindspore::MsLogLevel::INFO; 32 using mindspore::ExceptionType::NoExceptionType; 33 using mindspore::LogStream; 34 35 class MindDataTestConnector : public UT::Common { 36 public: 37 38 MindDataTestConnector(); 39 40 // Test scenario: single producer, single consumer. 41 // This means there is only one queue in the connector. 42 Status Run_test_0(); 43 44 // Test scenario: multiple producers, multiple cosumers 45 // A chain of three layer of thread groups connected by two Connectors between 46 // two layer. You can set different num of threads on layer 1 and 2, and layer 3 47 // that does the serialization to _ouput vector needs to be single thread. 48 // A random sleep/delay can be introduced for each thread. See run(). 49 Status Run_test_1(); 50 51 void SetSleepMilliSec(uint32_t ms) { sleep_ms_ = ms; } 52 53 private: 54 std::unique_ptr<TaskGroup> tg_; 55 uint32_t last_input_; 56 uint32_t sleep_ms_ = 0; 57 std::vector<uint32_t> input_; 58 WaitPost wp; 59 60 // This worker loop is to be called by a single thread. It will pop my_conn Connector 61 // and populate output vector 62 Status SerialWorkerPull( 63 int tid, 64 std::shared_ptr<Connector<uint32_t>> my_conn, 65 std::vector<uint32_t> *output 66 ); 67 68 // This worker loop read from input_ vector that have complete list of tasks/elements. 69 // The assignment from the elements in input_ to each worker is ensured in RoundRobin, 70 // i.e., tid-0 will pick input_[0], tid-1 will pick input_[1], so-on circular. 71 Status FirstWorkerPush( 72 int tid, 73 std::shared_ptr<Connector<uint32_t> > my_conn, 74 int start_in, 75 int offset); 76 77 // This worker loop read from a Connector and put the result into another Connector. 78 Status MidWorkerJob( 79 int tid, 80 std::shared_ptr<Connector<uint32_t> > from_conn, 81 std::shared_ptr<Connector<uint32_t> > to_conn); 82 83 Status ValidateOutput(const std::vector<uint32_t> &output); 84 85 uint32_t GenRand(int max); 86 87 // Put the current thread to sleep mode for MaxDue milliseconds. 88 // (Imitating nondeterministic processing time) 89 void GoToSleep(int max_dur); 90 }; 91 92 93 // Test0 : single producer, single consumer which means there is only one queue in the connector 94 TEST_F(MindDataTestConnector, Test0) { 95 MS_LOG(INFO) << "MindDataTestConnector Test0: single producer, single consumer."; 96 Status rc = this->Run_test_0(); 97 ASSERT_TRUE(rc.IsOk()); 98 rc = TaskManager::GetMasterThreadRc(); 99 ASSERT_TRUE(rc.IsOk()); 100 } 101 102 // Test1: multiple producers, multiple consumers without random delay 103 // A chain of three layer of thread groups connected by two Connectors between 104 // two layer. 105 TEST_F(MindDataTestConnector, Test1) { 106 MS_LOG(INFO) << "MindDataTestConnector Test1."; 107 Status rc = this->Run_test_1(); 108 ASSERT_TRUE(rc.IsOk()); 109 rc = TaskManager::GetMasterThreadRc(); 110 ASSERT_TRUE(rc.IsOk()); 111 } 112 113 // Test1: multiple producers, multiple consumers with random delay after push/pop 114 // A chain of three layer of thread groups connected by two Connectors between 115 // two layer. 116 TEST_F(MindDataTestConnector, Test2) { 117 MS_LOG(INFO) << "MindDataTestConnector Test2."; 118 this->SetSleepMilliSec(30); 119 Status rc = this->Run_test_1(); 120 ASSERT_TRUE(rc.IsOk()); 121 rc = TaskManager::GetMasterThreadRc(); 122 ASSERT_TRUE(rc.IsOk()); 123 } 124 125 126 127 // Implementation of MindDataTestConnector class and the helper functions. 128 MindDataTestConnector::MindDataTestConnector() : tg_(new TaskGroup()) { 129 last_input_ = 150; 130 for (int i = 1; i <= last_input_; i++) { 131 input_.push_back(i); 132 } 133 wp.Register(tg_.get()); 134 } 135 136 Status MindDataTestConnector::Run_test_0() { 137 Status rc; 138 std::vector<uint32_t> output; 139 wp.Clear(); 140 auto my_conn = std::make_shared<Connector<uint32_t>>(1, // num of producers 141 1, // num of consumers 142 10); // capacity of each queue 143 MS_ASSERT(my_conn != nullptr); 144 145 rc = my_conn->Register(tg_.get()); 146 RETURN_IF_NOT_OK(rc); 147 148 // Spawn a thread to read input_ vector and put it in my_conn 149 rc = tg_->CreateAsyncTask("Worker Push", 150 std::bind(&MindDataTestConnector::FirstWorkerPush, 151 this, // passing this instance 152 0, // id = 0 for this simple one to one case 153 my_conn, // the connector 154 0, // start index to read from the input_ list 155 1)); // the offset to read the next index 156 RETURN_IF_NOT_OK(rc); 157 158 // Spawn another thread to read from my_conn and write to _output vector. 159 rc = tg_->CreateAsyncTask("Worker Pull", 160 std::bind(&MindDataTestConnector::SerialWorkerPull, 161 this, 162 0, 163 my_conn, 164 &output)); 165 RETURN_IF_NOT_OK(rc); 166 // Wait for the threads to finish. 167 rc = wp.Wait(); 168 EXPECT_TRUE(rc.IsOk()); 169 tg_->interrupt_all(); 170 tg_->join_all(Task::WaitFlag::kNonBlocking); 171 my_conn.reset(); 172 return ValidateOutput(output); 173 } 174 175 Status MindDataTestConnector::Run_test_1() { 176 std::vector<uint32_t> output; 177 Status rc; 178 wp.Clear(); 179 180 // number of threads in each layer 181 int l1_threads = 15; 182 int l2_threads = 20; 183 int l3_threads = 1; 184 185 // Capacity for the first and second connectors 186 int conn1_qcap = 5; 187 int conn2_qcap = 10; 188 189 auto conn1 = std::make_shared<Connector<uint32_t>>(l1_threads, // num of producers 190 l2_threads, // num of consumers 191 conn1_qcap); // the cap of each queue 192 193 auto conn2 = std::make_shared<Connector<uint32_t>>(l2_threads, 194 l3_threads, 195 conn2_qcap); 196 197 rc = conn1->Register(tg_.get()); 198 RETURN_IF_NOT_OK(rc); 199 rc = conn2->Register(tg_.get()); 200 RETURN_IF_NOT_OK(rc); 201 202 // Instantiating the threads in the first layer 203 for (int i = 0; i < l1_threads; i++) { 204 rc = tg_->CreateAsyncTask("First Worker Push", 205 std::bind(&MindDataTestConnector::FirstWorkerPush, 206 this, // passing this instance 207 i, // thread id in this group of thread 208 conn1, // the connector 209 i, // start index to read from the input_ list 210 l1_threads)); // the offset to read the next index 211 RETURN_IF_NOT_OK(rc); 212 } 213 214 // Instantiating the threads in the 2nd layer 215 for (int i = 0; i < l2_threads; i++) { 216 rc = tg_->CreateAsyncTask("Mid Worker Job", 217 std::bind(&MindDataTestConnector::MidWorkerJob, 218 this, // passing this instance 219 i, // thread id in this group of thread 220 conn1, // the 1st connector 221 conn2)); // the 2nd connector 222 RETURN_IF_NOT_OK(rc); 223 } 224 225 // Last layer doing serialization to one queue to check if the order is preserved 226 rc = tg_->CreateAsyncTask("Worker Pull", 227 std::bind(&MindDataTestConnector::SerialWorkerPull, 228 this, 229 0, // thread id = 0, since it's the only one 230 conn2, // popping the data from conn2 231 &output)); 232 RETURN_IF_NOT_OK(rc); 233 // Wait for the threads to finish. 234 rc = wp.Wait(); 235 EXPECT_TRUE(rc.IsOk()); 236 tg_->interrupt_all(); 237 tg_->join_all(Task::WaitFlag::kNonBlocking); 238 conn1.reset(); 239 conn2.reset(); 240 241 return ValidateOutput(output); 242 } 243 244 Status MindDataTestConnector::SerialWorkerPull( 245 int tid, 246 std::shared_ptr<Connector<uint32_t>> my_conn, 247 std::vector<uint32_t> *output 248 ) { 249 Status rc; 250 TaskManager::FindMe()->Post(); 251 while (1) { 252 uint32_t res; 253 rc = my_conn->Pop(tid, &res); 254 RETURN_IF_NOT_OK(rc); 255 256 output->push_back(res); 257 258 // Emulate different processing time for each thread 259 if (sleep_ms_ != 0) { 260 GoToSleep(sleep_ms_); 261 } 262 263 // Signal master thread after it processed the last_input_. 264 // This will trigger the MidWorkerJob threads to quit their worker loop. 265 if (res == last_input_) { 266 MS_LOG(INFO) << "All data is collected."; 267 wp.Set(); 268 break; 269 } 270 } 271 return Status::OK(); 272 } 273 274 Status MindDataTestConnector::FirstWorkerPush( 275 int tid, 276 std::shared_ptr<Connector<uint32_t> > my_conn, 277 int start_in, 278 int offset) { 279 TaskManager::FindMe()->Post(); 280 MS_ASSERT(my_conn != nullptr); 281 Status rc; 282 for (int i = start_in; i < input_.size(); i += offset) { 283 rc = my_conn->Push(tid, input_[i]); 284 285 // Emulate different processing time for each thread 286 if (sleep_ms_ != 0) 287 GoToSleep(sleep_ms_); 288 } 289 return Status::OK(); 290 } 291 292 // This worker loop read from a Connector and put the result into another Connector. 293 Status MindDataTestConnector::MidWorkerJob( 294 int tid, 295 std::shared_ptr<Connector<uint32_t> > from_conn, 296 std::shared_ptr<Connector<uint32_t> > to_conn) { 297 MS_ASSERT((from_conn != nullptr) && (to_conn != nullptr)); 298 Status rc; 299 TaskManager::FindMe()->Post(); 300 while (1) { 301 uint32_t el; 302 rc = from_conn->Pop(tid, &el); 303 RETURN_IF_NOT_OK(rc); 304 305 // Emulate different processing time for each thread 306 if (sleep_ms_ != 0) { 307 GoToSleep(sleep_ms_); 308 } 309 rc = to_conn->Push(tid, el); 310 RETURN_IF_NOT_OK(rc); 311 } 312 return Status::OK(); 313 } 314 315 Status MindDataTestConnector::ValidateOutput(const std::vector<uint32_t> &output) { 316 int prev = 0; 317 for (auto el : output) { 318 if (prev >= el) { 319 return Status(StatusCode::kMDUnexpectedError, "Output vector are not in-order."); 320 } 321 prev = el; 322 } 323 return Status::OK(); 324 } 325 326 uint32_t MindDataTestConnector::GenRand(int max) { 327 uint32_t r_int = 0; 328 if (max == 0) { 329 return r_int; 330 } 331 332 // open urandom not random 333 int fd = open("/dev/urandom", O_RDONLY); 334 if (fd > 0) { 335 if (read(fd, &r_int, sizeof(uint32_t)) != sizeof(uint32_t)) { 336 r_int = max / 2; 337 } 338 } 339 (void)close(fd); // close it! 340 341 return r_int % max; 342 } 343 344 // Put the current thread to sleep mode for MaxDue milliseconds. 345 // (Imitating nondeterministic processing time) 346 void MindDataTestConnector::GoToSleep(int max_dur) { 347 uint32_t duration = GenRand(max_dur); 348 std::this_thread::sleep_for(std::chrono::milliseconds(duration)); 349 } 350