1 #define RXCPP_SUBJECT_TEST_ASYNC 1 2 3 #include "../test.h" 4 5 #include <rxcpp/operators/rx-finally.hpp> 6 7 #include <future> 8 9 10 const int static_onnextcalls = 10000000; 11 static int aliased = 0; 12 13 SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){ 14 const int& onnextcalls = static_onnextcalls; 15 GIVEN("a for loop"){ 16 WHEN("locking mutex 100 million times"){ 17 using namespace std::chrono; 18 typedef steady_clock clock; 19 20 int c = 0; 21 int n = 1; 22 auto start = clock::now(); 23 std::mutex m; 24 for (int i = 0; i < onnextcalls; i++) { 25 std::unique_lock<std::mutex> guard(m); 26 ++c; 27 } 28 auto finish = clock::now(); 29 auto msElapsed = duration_cast<milliseconds>(finish-start); 30 std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 31 32 } 33 } 34 } 35 36 namespace syncwithvoid { 37 template<class T, class OnNext> 38 class sync_subscriber 39 { 40 public: 41 OnNext onnext; 42 bool issubscribed; sync_subscriber(OnNext on)43 explicit sync_subscriber(OnNext on) 44 : onnext(on) 45 , issubscribed(true) 46 { 47 } is_subscribed()48 bool is_subscribed() {return issubscribed;} unsubscribe()49 void unsubscribe() {issubscribed = false;} on_next(T v)50 void on_next(T v) { 51 onnext(v); 52 } 53 }; 54 } 55 SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){ 56 const int& onnextcalls = static_onnextcalls; 57 GIVEN("a for loop"){ 58 WHEN("calling on_next 100 million times"){ 59 using namespace std::chrono; 60 typedef steady_clock clock; 61 62 auto c = std::addressof(aliased); 63 *c = 0; 64 int n = 1; 65 auto start = clock::now(); __anon9dc5bb690102(int)66 auto onnext = [c](int){++*c;}; 67 syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext); 68 for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { 69 scbr.on_next(i); 70 } 71 auto finish = clock::now(); 72 auto msElapsed = duration_cast<milliseconds>(finish-start); 73 std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 74 75 } 76 } 77 } 78 79 namespace asyncwithready { 80 // ready is an immutable class. 81 class ready 82 { 83 public: 84 typedef std::function<void()> onthen_type; 85 private: 86 std::function<void(onthen_type)> setthen; 87 public: ready()88 ready() {} ready(std::function<void (onthen_type)> st)89 ready(std::function<void(onthen_type)> st) : setthen(st) {} is_ready()90 bool is_ready() {return !setthen;} then(onthen_type ot)91 void then(onthen_type ot) { 92 if (is_ready()) { 93 abort(); 94 } 95 setthen(ot); 96 } 97 }; 98 template<class T, class OnNext> 99 class async_subscriber 100 { 101 public: 102 OnNext onnext; 103 bool issubscribed; 104 int count; async_subscriber(OnNext on)105 explicit async_subscriber(OnNext on) 106 : onnext(on) 107 , issubscribed(true) 108 , count(0) 109 { 110 } is_subscribed()111 bool is_subscribed() {return issubscribed;} unsubscribe()112 void unsubscribe() {issubscribed = false;} on_next(T v)113 ready on_next(T v) { 114 // push v onto queue 115 116 // under some condition pop v off of queue and pass it on 117 onnext(v); 118 119 // for demo purposes 120 // simulate queue full every 100000 items 121 if (count == 100000) { 122 // 'queue is full' 123 ready no([this](ready::onthen_type ot){ 124 // full version will sync producer and consumer (in producer push and consumer pop) 125 // and decide when to restart the producer 126 if (!this->count) { 127 ot(); 128 } 129 }); 130 // set queue empty since the demo has no separate consumer thread 131 count = 0; 132 // 'queue is empty' 133 return no; 134 } 135 static const ready yes; 136 return yes; 137 } 138 }; 139 } 140 SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){ 141 static const int& onnextcalls = static_onnextcalls; 142 GIVEN("a for loop"){ 143 WHEN("calling on_next 100 million times"){ 144 using namespace std::chrono; 145 typedef steady_clock clock; 146 147 auto c = std::addressof(aliased); 148 *c = 0; 149 int n = 1; 150 auto start = clock::now(); __anon9dc5bb690302(int)151 auto onnext = [&c](int){++*c;}; 152 asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext); 153 asyncwithready::ready::onthen_type chunk; 154 int i = 0; __anon9dc5bb690402() 155 chunk = [&chunk, scbr, i]() mutable { 156 for (; i < onnextcalls && scbr.is_subscribed(); i++) { 157 auto controller = scbr.on_next(i); 158 if (!controller.is_ready()) { 159 controller.then(chunk); 160 return; 161 } 162 } 163 }; 164 chunk(); 165 auto finish = clock::now(); 166 auto msElapsed = duration_cast<milliseconds>(finish-start); 167 std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 168 169 } 170 } 171 } 172 173 namespace asyncwithfuture { 174 class unit {}; 175 template<class T, class OnNext> 176 class async_subscriber 177 { 178 public: 179 OnNext onnext; 180 bool issubscribed; async_subscriber(OnNext on)181 explicit async_subscriber(OnNext on) 182 : onnext(on) 183 , issubscribed(true) 184 { 185 } is_subscribed()186 bool is_subscribed() {return issubscribed;} unsubscribe()187 void unsubscribe() {issubscribed = false;} on_next(T v)188 std::future<unit> on_next(T v) { 189 std::promise<unit> ready; 190 ready.set_value(unit()); 191 onnext(v); return ready.get_future();} 192 }; 193 } 194 SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){ 195 const int& onnextcalls = static_onnextcalls; 196 GIVEN("a for loop"){ 197 WHEN("calling on_next 100 million times"){ 198 using namespace std::chrono; 199 typedef steady_clock clock; 200 201 auto c = std::addressof(aliased); 202 *c = 0; 203 int n = 1; 204 auto start = clock::now(); __anon9dc5bb690502(int)205 auto onnext = [&c](int){++*c;}; 206 asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext); 207 for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { 208 auto isready = scbr.on_next(i); 209 if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { 210 isready.wait(); 211 } 212 } 213 auto finish = clock::now(); 214 auto msElapsed = duration_cast<milliseconds>(finish-start); 215 std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 216 217 } 218 } 219 } 220 221 SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){ 222 const int& onnextcalls = static_onnextcalls; 223 GIVEN("a for loop"){ 224 WHEN("observing 100 million ints"){ 225 using namespace std::chrono; 226 typedef steady_clock clock; 227 228 static int& c = aliased; 229 int n = 1; 230 231 c = 0; 232 auto start = clock::now(); 233 auto o = rx::make_observer<int>( __anon9dc5bb690602(int)234 [](int){++c;}, __anon9dc5bb690702(rxu::error_ptr)235 [](rxu::error_ptr){abort();}); 236 for (int i = 0; i < onnextcalls; i++) { 237 o.on_next(i); 238 } 239 o.on_completed(); 240 auto finish = clock::now(); 241 auto msElapsed = duration_cast<milliseconds>(finish-start); 242 std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; 243 } 244 } 245 } 246 247 SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){ 248 const int& onnextcalls = static_onnextcalls; 249 GIVEN("a for loop"){ 250 WHEN("observing 100 million ints"){ 251 using namespace std::chrono; 252 typedef steady_clock clock; 253 254 static int& c = aliased; 255 int n = 1; 256 257 c = 0; 258 auto start = clock::now(); 259 auto o = rx::make_subscriber<int>( __anon9dc5bb690802(int)260 [](int){++c;}, __anon9dc5bb690902(rxu::error_ptr)261 [](rxu::error_ptr){abort();}); 262 for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { 263 o.on_next(i); 264 } 265 o.on_completed(); 266 auto finish = clock::now(); 267 auto msElapsed = duration_cast<milliseconds>(finish-start); 268 std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 269 } 270 } 271 } 272 273 SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){ 274 const int& onnextcalls = static_onnextcalls; 275 GIVEN("a range"){ 276 WHEN("observing 100 million ints"){ 277 using namespace std::chrono; 278 typedef steady_clock clock; 279 280 static int& c = aliased; 281 int n = 1; 282 283 c = 0; 284 auto start = clock::now(); 285 286 rxs::range<int>(1, onnextcalls).subscribe( __anon9dc5bb690a02(int)287 [](int){ 288 ++c; 289 }, __anon9dc5bb690b02(rxu::error_ptr)290 [](rxu::error_ptr){abort();}); 291 292 auto finish = clock::now(); 293 auto msElapsed = duration_cast<milliseconds>(finish-start); 294 std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 295 } 296 } 297 } 298 299 SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){ 300 static const int& onnextcalls = static_onnextcalls; 301 GIVEN("a for loop and a subject"){ 302 WHEN("multicasting a million ints"){ 303 using namespace std::chrono; 304 typedef steady_clock clock; 305 306 for (int n = 0; n < 10; n++) 307 { 308 auto p = std::make_shared<int>(0); 309 auto c = std::make_shared<int>(0); 310 rxsub::subject<int> sub; 311 312 #if RXCPP_SUBJECT_TEST_ASYNC 313 std::vector<std::future<int>> f(n); 314 std::atomic<int> asyncUnsubscriptions{0}; 315 #endif 316 317 auto o = sub.get_subscriber(); 318 __anon9dc5bb690c02()319 o.add(rx::make_subscription([c, n](){ 320 auto expected = n * onnextcalls; 321 REQUIRE(*c == expected); 322 })); 323 324 for (int i = 0; i < n; i++) { 325 #if RXCPP_SUBJECT_TEST_ASYNC __anon9dc5bb690d02() 326 f[i] = std::async([sub, o, &asyncUnsubscriptions]() { 327 auto source = sub.get_observable(); 328 while(o.is_subscribed()) { 329 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 330 rx::composite_subscription cs; 331 source 332 .finally([&asyncUnsubscriptions](){ 333 ++asyncUnsubscriptions;}) 334 .subscribe( 335 rx::make_subscriber<int>( 336 cs, 337 [cs](int){ 338 cs.unsubscribe(); 339 }, 340 [](rxu::error_ptr){abort();})); 341 } 342 return 0; 343 }); 344 #endif 345 sub.get_observable().subscribe( __anon9dc5bb691102(int)346 [c, p](int){ 347 ++(*c); 348 }, __anon9dc5bb691202(rxu::error_ptr)349 [](rxu::error_ptr){abort();}); 350 } 351 352 auto start = clock::now(); 353 for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { 354 #if RXCPP_DEBUG_SUBJECT_RACE 355 if (*p != *c) abort(); 356 (*p) += n; 357 #endif 358 o.on_next(i); 359 } 360 o.on_completed(); 361 auto finish = clock::now(); 362 auto msElapsed = duration_cast<milliseconds>(finish-start); 363 std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; 364 #if RXCPP_SUBJECT_TEST_ASYNC 365 std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; 366 #endif 367 std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 368 } 369 } 370 } 371 } 372 373 SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){ 374 static const int& onnextcalls = static_onnextcalls; 375 GIVEN("a range and a subject"){ 376 WHEN("multicasting a million ints"){ 377 using namespace std::chrono; 378 typedef steady_clock clock; 379 for (int n = 0; n < 10; n++) 380 { 381 auto p = std::make_shared<int>(0); 382 auto c = std::make_shared<int>(0); 383 rxsub::subject<int> sub; 384 385 #if RXCPP_SUBJECT_TEST_ASYNC 386 std::vector<std::future<int>> f(n); 387 std::atomic<int> asyncUnsubscriptions{0}; 388 #endif 389 390 auto o = sub.get_subscriber(); 391 __anon9dc5bb691302()392 o.add(rx::make_subscription([c, n](){ 393 auto expected = n * onnextcalls; 394 REQUIRE(*c == expected); 395 })); 396 397 for (int i = 0; i < n; i++) { 398 #if RXCPP_SUBJECT_TEST_ASYNC __anon9dc5bb691402() 399 f[i] = std::async([sub, o, &asyncUnsubscriptions]() { 400 while(o.is_subscribed()) { 401 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 402 rx::composite_subscription cs; 403 sub.get_observable() 404 .finally([&asyncUnsubscriptions](){ 405 ++asyncUnsubscriptions;}) 406 .subscribe(cs, 407 [cs](int){ 408 cs.unsubscribe(); 409 }, 410 [](rxu::error_ptr){abort();}); 411 } 412 return 0; 413 }); 414 #endif 415 sub.get_observable() 416 .subscribe( __anon9dc5bb691802(int)417 [c, p](int){ 418 ++(*c); 419 }, __anon9dc5bb691902(rxu::error_ptr)420 [](rxu::error_ptr){abort();} 421 ); 422 } 423 424 auto start = clock::now(); 425 rxs::range<int>(1, onnextcalls) 426 #if RXCPP_DEBUG_SUBJECT_RACE __anon9dc5bb691a02(int)427 .filter([c, p, n](int){ 428 if (*p != *c) abort(); 429 (*p) += n; 430 return true; 431 }) 432 #endif 433 .subscribe(o); 434 auto finish = clock::now(); 435 auto msElapsed = duration_cast<milliseconds>(finish-start); 436 std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; 437 #if RXCPP_SUBJECT_TEST_ASYNC 438 std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; 439 #endif 440 std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; 441 } 442 } 443 } 444 } 445 446 447 SCENARIO("subject - infinite source", "[subject][subjects]"){ 448 GIVEN("a subject and an infinite source"){ 449 450 auto sc = rxsc::make_test(); 451 auto w = sc.create_worker(); 452 const rxsc::test::messages<int> on; 453 const rxsc::test::messages<bool> check; 454 455 auto xs = sc.make_hot_observable({ 456 on.next(70, 1), 457 on.next(110, 2), 458 on.next(220, 3), 459 on.next(270, 4), 460 on.next(340, 5), 461 on.next(410, 6), 462 on.next(520, 7), 463 on.next(630, 8), 464 on.next(710, 9), 465 on.next(870, 10), 466 on.next(940, 11), 467 on.next(1020, 12) 468 }); 469 470 rxsub::subject<int> s; 471 472 auto results1 = w.make_subscriber<int>(); 473 474 auto results2 = w.make_subscriber<int>(); 475 476 auto results3 = w.make_subscriber<int>(); 477 478 WHEN("multicasting an infinite source"){ 479 480 auto checks = rxu::to_vector({ 481 check.next(0, false) 482 }); 483 __anon9dc5bb691b02(long at) 484 auto record = [&s, &check, &checks](long at) -> void { 485 checks.push_back(check.next(at, s.has_observers())); 486 }; 487 488 auto o = s.get_subscriber(); 489 __anon9dc5bb691c02(const rxsc::schedulable&)490 w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){ 491 s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);}); __anon9dc5bb691d02(const rxsc::schedulable&)492 w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){ 493 xs.subscribe(o); record(200);}); __anon9dc5bb691e02(const rxsc::schedulable&)494 w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){ 495 o.unsubscribe(); record(1000);}); 496 __anon9dc5bb691f02(const rxsc::schedulable&)497 w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){ 498 s.get_observable().subscribe(results1); record(300);}); __anon9dc5bb692002(const rxsc::schedulable&)499 w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){ 500 s.get_observable().subscribe(results2); record(400);}); __anon9dc5bb692102(const rxsc::schedulable&)501 w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){ 502 s.get_observable().subscribe(results3); record(900);}); 503 __anon9dc5bb692202(const rxsc::schedulable&)504 w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){ 505 results1.unsubscribe(); record(600);}); __anon9dc5bb692302(const rxsc::schedulable&)506 w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){ 507 results2.unsubscribe(); record(700);}); __anon9dc5bb692402(const rxsc::schedulable&)508 w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){ 509 results1.unsubscribe(); record(800);}); __anon9dc5bb692502(const rxsc::schedulable&)510 w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){ 511 results3.unsubscribe(); record(950);}); 512 513 w.start(); 514 515 THEN("result1 contains expected messages"){ 516 auto required = rxu::to_vector({ 517 on.next(340, 5), 518 on.next(410, 6), 519 on.next(520, 7) 520 }); 521 auto actual = results1.get_observer().messages(); 522 REQUIRE(required == actual); 523 } 524 525 THEN("result2 contains expected messages"){ 526 auto required = rxu::to_vector({ 527 on.next(410, 6), 528 on.next(520, 7), 529 on.next(630, 8) 530 }); 531 auto actual = results2.get_observer().messages(); 532 REQUIRE(required == actual); 533 } 534 535 THEN("result3 contains expected messages"){ 536 auto required = rxu::to_vector({ 537 on.next(940, 11) 538 }); 539 auto actual = results3.get_observer().messages(); 540 REQUIRE(required == actual); 541 } 542 543 THEN("checks contains expected messages"){ 544 auto required = rxu::to_vector({ 545 check.next(100, false), 546 check.next(200, false), 547 check.next(300, true), 548 check.next(400, true), 549 check.next(600, true), 550 check.next(700, false), 551 check.next(800, false), 552 check.next(900, true), 553 check.next(950, false), 554 check.next(1000, false) 555 }); 556 auto actual = checks; 557 REQUIRE(required == actual); 558 } 559 560 } 561 } 562 } 563 564 SCENARIO("subject - finite source", "[subject][subjects]"){ 565 GIVEN("a subject and an finite source"){ 566 567 auto sc = rxsc::make_test(); 568 auto w = sc.create_worker(); 569 const rxsc::test::messages<int> on; 570 571 auto xs = sc.make_hot_observable({ 572 on.next(70, 1), 573 on.next(110, 2), 574 on.next(220, 3), 575 on.next(270, 4), 576 on.next(340, 5), 577 on.next(410, 6), 578 on.next(520, 7), 579 on.completed(630), 580 on.next(640, 9), 581 on.completed(650), 582 on.error(660, std::runtime_error("error on unsubscribed stream")) 583 }); 584 585 rxsub::subject<int> s; 586 587 auto results1 = w.make_subscriber<int>(); 588 589 auto results2 = w.make_subscriber<int>(); 590 591 auto results3 = w.make_subscriber<int>(); 592 593 WHEN("multicasting an infinite source"){ 594 595 auto o = s.get_subscriber(); 596 __anon9dc5bb692602(const rxsc::schedulable&)597 w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ 598 s = rxsub::subject<int>(); o = s.get_subscriber();}); __anon9dc5bb692702(const rxsc::schedulable&)599 w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ 600 xs.subscribe(o);}); __anon9dc5bb692802(const rxsc::schedulable&)601 w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ 602 o.unsubscribe();}); 603 __anon9dc5bb692902(const rxsc::schedulable&)604 w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ 605 s.get_observable().subscribe(results1);}); __anon9dc5bb692a02(const rxsc::schedulable&)606 w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ 607 s.get_observable().subscribe(results2);}); __anon9dc5bb692b02(const rxsc::schedulable&)608 w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ 609 s.get_observable().subscribe(results3);}); 610 __anon9dc5bb692c02(const rxsc::schedulable&)611 w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ 612 results1.unsubscribe();}); __anon9dc5bb692d02(const rxsc::schedulable&)613 w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ 614 results2.unsubscribe();}); __anon9dc5bb692e02(const rxsc::schedulable&)615 w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ 616 results1.unsubscribe();}); __anon9dc5bb692f02(const rxsc::schedulable&)617 w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ 618 results3.unsubscribe();}); 619 620 w.start(); 621 622 THEN("result1 contains expected messages"){ 623 auto required = rxu::to_vector({ 624 on.next(340, 5), 625 on.next(410, 6), 626 on.next(520, 7) 627 }); 628 auto actual = results1.get_observer().messages(); 629 REQUIRE(required == actual); 630 } 631 632 THEN("result2 contains expected messages"){ 633 auto required = rxu::to_vector({ 634 on.next(410, 6), 635 on.next(520, 7), 636 on.completed(630) 637 }); 638 auto actual = results2.get_observer().messages(); 639 REQUIRE(required == actual); 640 } 641 642 THEN("result3 contains expected messages"){ 643 auto required = rxu::to_vector({ 644 on.completed(900) 645 }); 646 auto actual = results3.get_observer().messages(); 647 REQUIRE(required == actual); 648 } 649 650 } 651 } 652 } 653 654 655 SCENARIO("subject - on_error in source", "[subject][subjects]"){ 656 GIVEN("a subject and a source with an error"){ 657 658 auto sc = rxsc::make_test(); 659 auto w = sc.create_worker(); 660 const rxsc::test::messages<int> on; 661 662 std::runtime_error ex("subject on_error in stream"); 663 664 auto xs = sc.make_hot_observable({ 665 on.next(70, 1), 666 on.next(110, 2), 667 on.next(220, 3), 668 on.next(270, 4), 669 on.next(340, 5), 670 on.next(410, 6), 671 on.next(520, 7), 672 on.error(630, ex), 673 on.next(640, 9), 674 on.completed(650), 675 on.error(660, std::runtime_error("error on unsubscribed stream")) 676 }); 677 678 rxsub::subject<int> s; 679 680 auto results1 = w.make_subscriber<int>(); 681 682 auto results2 = w.make_subscriber<int>(); 683 684 auto results3 = w.make_subscriber<int>(); 685 686 WHEN("multicasting an infinite source"){ 687 688 auto o = s.get_subscriber(); 689 __anon9dc5bb693002(const rxsc::schedulable&)690 w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ 691 s = rxsub::subject<int>(); o = s.get_subscriber();}); __anon9dc5bb693102(const rxsc::schedulable&)692 w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ 693 xs.subscribe(o);}); __anon9dc5bb693202(const rxsc::schedulable&)694 w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ 695 o.unsubscribe();}); 696 __anon9dc5bb693302(const rxsc::schedulable&)697 w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ 698 s.get_observable().subscribe(results1);}); __anon9dc5bb693402(const rxsc::schedulable&)699 w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ 700 s.get_observable().subscribe(results2);}); __anon9dc5bb693502(const rxsc::schedulable&)701 w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ 702 s.get_observable().subscribe(results3);}); 703 __anon9dc5bb693602(const rxsc::schedulable&)704 w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ 705 results1.unsubscribe();}); __anon9dc5bb693702(const rxsc::schedulable&)706 w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ 707 results2.unsubscribe();}); __anon9dc5bb693802(const rxsc::schedulable&)708 w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ 709 results1.unsubscribe();}); __anon9dc5bb693902(const rxsc::schedulable&)710 w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ 711 results3.unsubscribe();}); 712 713 w.start(); 714 715 THEN("result1 contains expected messages"){ 716 auto required = rxu::to_vector({ 717 on.next(340, 5), 718 on.next(410, 6), 719 on.next(520, 7) 720 }); 721 auto actual = results1.get_observer().messages(); 722 REQUIRE(required == actual); 723 } 724 725 THEN("result2 contains expected messages"){ 726 auto required = rxu::to_vector({ 727 on.next(410, 6), 728 on.next(520, 7), 729 on.error(630, ex) 730 }); 731 auto actual = results2.get_observer().messages(); 732 REQUIRE(required == actual); 733 } 734 735 THEN("result3 contains expected messages"){ 736 auto required = rxu::to_vector({ 737 on.error(900, ex) 738 }); 739 auto actual = results3.get_observer().messages(); 740 REQUIRE(required == actual); 741 } 742 743 } 744 } 745 } 746