1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-filter.hpp> 4 #include <rxcpp/operators/rx-map.hpp> 5 #include <rxcpp/operators/rx-take.hpp> 6 #include <rxcpp/operators/rx-flat_map.hpp> 7 #include <rxcpp/operators/rx-observe_on.hpp> 8 9 static const int static_tripletCount = 100; 10 11 SCENARIO("pythagorian for loops", "[!hide][for][pythagorian][perf]"){ 12 const int& tripletCount = static_tripletCount; 13 GIVEN("a for loop"){ 14 WHEN("generating pythagorian triplets"){ 15 using namespace std::chrono; 16 typedef steady_clock clock; 17 18 int c = 0; 19 int ct = 0; 20 int n = 1; 21 auto start = clock::now(); 22 for(int z = 1;; ++z) 23 { 24 for(int x = 1; x <= z; ++x) 25 { 26 for(int y = x; y <= z; ++y) 27 { 28 ++c; 29 if(x*x + y*y == z*z) 30 { 31 ++ct; 32 if(ct == tripletCount) 33 goto done; 34 } 35 } 36 } 37 } 38 done: 39 auto finish = clock::now(); 40 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 41 duration_cast<milliseconds>(start.time_since_epoch()); 42 std::cout << "pythagorian for : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 43 44 } 45 } 46 } 47 48 SCENARIO("merge_transform pythagorian ranges", "[!hide][range][merge_transform][pythagorian][perf]"){ 49 const int& tripletCount = static_tripletCount; 50 GIVEN("some ranges"){ 51 WHEN("generating pythagorian triplets"){ 52 using namespace std::chrono; 53 typedef steady_clock clock; 54 55 auto so = rx::identity_immediate(); 56 57 int c = 0; 58 int ct = 0; 59 int n = 1; 60 auto start = clock::now(); 61 auto triples = 62 rxs::range(1, so) 63 .merge_transform( __anon63d55a350102(int z)64 [&c, so](int z){ 65 return rxs::range(1, z, 1, so) 66 .flat_map( 67 [&c, so, z](int x){ 68 return rxs::range(x, z, 1, so) 69 .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) 70 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 71 // forget type to workaround lambda deduction bug on msvc 2013 72 .as_dynamic();}, 73 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}) 74 // forget type to workaround lambda deduction bug on msvc 2013 75 .as_dynamic();}, __anon63d55a350602(int , std::tuple<int,int,int> triplet)76 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}); 77 triples 78 .take(tripletCount) 79 .subscribe( __anon63d55a350702(int ,int ,int )80 rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){ 81 ++ct; 82 })); 83 auto finish = clock::now(); 84 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 85 duration_cast<milliseconds>(start.time_since_epoch()); 86 std::cout << "merge pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 87 88 } 89 } 90 } 91 92 SCENARIO("synchronize merge_transform pythagorian ranges", "[!hide][range][merge_transform][synchronize][pythagorian][perf]"){ 93 const int& tripletCount = static_tripletCount; 94 GIVEN("some ranges"){ 95 WHEN("generating pythagorian triplets"){ 96 using namespace std::chrono; 97 typedef steady_clock clock; 98 99 auto so = rx::synchronize_event_loop(); 100 101 int c = 0; 102 int n = 1; 103 auto start = clock::now(); 104 auto triples = 105 rxs::range(1, so) 106 .merge_transform( __anon63d55a350802(int z)107 [&c, so](int z){ 108 return rxs::range(1, z, 1, so) 109 .merge_transform( 110 [&c, so, z](int x){ 111 return rxs::range(x, z, 1, so) 112 .filter([&c, z, x](int y){ 113 ++c; 114 if (x*x + y*y == z*z) { 115 return true;} 116 else { 117 return false;}}) 118 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 119 // forget type to workaround lambda deduction bug on msvc 2013 120 .as_dynamic();}, 121 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 122 so) 123 // forget type to workaround lambda deduction bug on msvc 2013 124 .as_dynamic();}, __anon63d55a350d02(int , std::tuple<int,int,int> triplet)125 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 126 so); 127 int ct = triples 128 .take(tripletCount) 129 .as_blocking() 130 .count(); 131 132 auto finish = clock::now(); 133 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 134 duration_cast<milliseconds>(start.time_since_epoch()); 135 std::cout << "merge sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 136 } 137 } 138 } 139 140 SCENARIO("observe_on merge_transform pythagorian ranges", "[!hide][range][merge_transform][observe_on][pythagorian][perf]"){ 141 const int& tripletCount = static_tripletCount; 142 GIVEN("some ranges"){ 143 WHEN("generating pythagorian triplets"){ 144 using namespace std::chrono; 145 typedef steady_clock clock; 146 147 auto so = rx::observe_on_event_loop(); 148 149 int c = 0; 150 int n = 1; 151 auto start = clock::now(); 152 auto triples = 153 rxs::range(1, so) 154 .merge_transform( __anon63d55a350e02(int z)155 [&c, so](int z){ 156 return rxs::range(1, z, 1, so) 157 .merge_transform( 158 [&c, so, z](int x){ 159 return rxs::range(x, z, 1, so) 160 .filter([&c, z, x](int y){ 161 ++c; 162 if (x*x + y*y == z*z) { 163 return true;} 164 else { 165 return false;}}) 166 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 167 // forget type to workaround lambda deduction bug on msvc 2013 168 .as_dynamic();}, 169 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 170 so) 171 // forget type to workaround lambda deduction bug on msvc 2013 172 .as_dynamic();}, __anon63d55a351302(int , std::tuple<int,int,int> triplet)173 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 174 so); 175 int ct = triples 176 .take(tripletCount) 177 .as_blocking() 178 .count(); 179 180 auto finish = clock::now(); 181 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 182 duration_cast<milliseconds>(start.time_since_epoch()); 183 std::cout << "merge observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 184 } 185 } 186 } 187 188 SCENARIO("serialize merge_transform pythagorian ranges", "[!hide][range][merge_transform][serialize][pythagorian][perf]"){ 189 const int& tripletCount = static_tripletCount; 190 GIVEN("some ranges"){ 191 WHEN("generating pythagorian triplets"){ 192 using namespace std::chrono; 193 typedef steady_clock clock; 194 195 auto so = rx::serialize_event_loop(); 196 197 int c = 0; 198 int n = 1; 199 auto start = clock::now(); 200 auto triples = 201 rxs::range(1, so) 202 .merge_transform( __anon63d55a351402(int z)203 [&c, so](int z){ 204 return rxs::range(1, z, 1, so) 205 .merge_transform( 206 [&c, so, z](int x){ 207 return rxs::range(x, z, 1, so) 208 .filter([&c, z, x](int y){ 209 ++c; 210 if (x*x + y*y == z*z) { 211 return true;} 212 else { 213 return false;}}) 214 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 215 // forget type to workaround lambda deduction bug on msvc 2013 216 .as_dynamic();}, 217 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 218 so) 219 // forget type to workaround lambda deduction bug on msvc 2013 220 .as_dynamic();}, __anon63d55a351902(int , std::tuple<int,int,int> triplet)221 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 222 so); 223 int ct = triples 224 .take(tripletCount) 225 .as_blocking() 226 .count(); 227 228 auto finish = clock::now(); 229 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 230 duration_cast<milliseconds>(start.time_since_epoch()); 231 std::cout << "merge serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 232 } 233 } 234 } 235 236 SCENARIO("flat_map completes", "[flat_map][map][operators]"){ 237 GIVEN("two cold observables. one of ints. one of strings."){ 238 auto sc = rxsc::make_test(); 239 auto w = sc.create_worker(); 240 const rxsc::test::messages<int> i_on; 241 const rxsc::test::messages<std::string> s_on; 242 243 auto xs = sc.make_cold_observable({ 244 i_on.next(100, 4), 245 i_on.next(200, 2), 246 i_on.next(300, 3), 247 i_on.next(400, 1), 248 i_on.completed(500) 249 }); 250 251 auto ys = sc.make_cold_observable({ 252 s_on.next(50, "foo"), 253 s_on.next(100, "bar"), 254 s_on.next(150, "baz"), 255 s_on.next(200, "qux"), 256 s_on.completed(250) 257 }); 258 259 WHEN("each int is mapped to the strings"){ 260 261 auto res = w.start( __anon63d55a351a02() 262 [&]() { 263 return xs 264 | rxo::flat_map( 265 [&](int){ 266 return ys;}, 267 [](int, std::string s){ 268 return s;}) 269 // forget type to workaround lambda deduction bug on msvc 2013 270 | rxo::as_dynamic(); 271 } 272 ); 273 274 THEN("the output contains strings repeated for each int"){ 275 auto required = rxu::to_vector({ 276 s_on.next(350, "foo"), 277 s_on.next(400, "bar"), 278 s_on.next(450, "baz"), 279 s_on.next(450, "foo"), 280 s_on.next(500, "qux"), 281 s_on.next(500, "bar"), 282 s_on.next(550, "baz"), 283 s_on.next(550, "foo"), 284 s_on.next(600, "qux"), 285 s_on.next(600, "bar"), 286 s_on.next(650, "baz"), 287 s_on.next(650, "foo"), 288 s_on.next(700, "qux"), 289 s_on.next(700, "bar"), 290 s_on.next(750, "baz"), 291 s_on.next(800, "qux"), 292 s_on.completed(850) 293 }); 294 auto actual = res.get_observer().messages(); 295 REQUIRE(required == actual); 296 } 297 298 THEN("there was one subscription and one unsubscription to the ints"){ 299 auto required = rxu::to_vector({ 300 i_on.subscribe(200, 700) 301 }); 302 auto actual = xs.subscriptions(); 303 REQUIRE(required == actual); 304 } 305 306 THEN("there were four subscription and unsubscription to the strings"){ 307 auto required = rxu::to_vector({ 308 s_on.subscribe(300, 550), 309 s_on.subscribe(400, 650), 310 s_on.subscribe(500, 750), 311 s_on.subscribe(600, 850) 312 }); 313 auto actual = ys.subscriptions(); 314 REQUIRE(required == actual); 315 } 316 } 317 } 318 } 319 320 SCENARIO("merge_transform completes", "[merge_transform][transform][map][operators]"){ 321 GIVEN("two cold observables. one of ints. one of strings."){ 322 auto sc = rxsc::make_test(); 323 auto w = sc.create_worker(); 324 const rxsc::test::messages<int> i_on; 325 const rxsc::test::messages<std::string> s_on; 326 327 auto xs = sc.make_cold_observable({ 328 i_on.next(100, 4), 329 i_on.next(200, 2), 330 i_on.next(300, 3), 331 i_on.next(400, 1), 332 i_on.completed(500) 333 }); 334 335 auto ys = sc.make_cold_observable({ 336 s_on.next(50, "foo"), 337 s_on.next(100, "bar"), 338 s_on.next(150, "baz"), 339 s_on.next(200, "qux"), 340 s_on.completed(250) 341 }); 342 343 WHEN("each int is mapped to the strings"){ 344 345 auto res = w.start( __anon63d55a351d02() 346 [&]() { 347 return xs 348 | rxo::merge_transform( 349 [&](int){ 350 return ys;}, 351 [](int, std::string s){ 352 return s;}) 353 // forget type to workaround lambda deduction bug on msvc 2013 354 | rxo::as_dynamic(); 355 } 356 ); 357 358 THEN("the output contains strings repeated for each int"){ 359 auto required = rxu::to_vector({ 360 s_on.next(350, "foo"), 361 s_on.next(400, "bar"), 362 s_on.next(450, "baz"), 363 s_on.next(450, "foo"), 364 s_on.next(500, "qux"), 365 s_on.next(500, "bar"), 366 s_on.next(550, "baz"), 367 s_on.next(550, "foo"), 368 s_on.next(600, "qux"), 369 s_on.next(600, "bar"), 370 s_on.next(650, "baz"), 371 s_on.next(650, "foo"), 372 s_on.next(700, "qux"), 373 s_on.next(700, "bar"), 374 s_on.next(750, "baz"), 375 s_on.next(800, "qux"), 376 s_on.completed(850) 377 }); 378 auto actual = res.get_observer().messages(); 379 REQUIRE(required == actual); 380 } 381 382 THEN("there was one subscription and one unsubscription to the ints"){ 383 auto required = rxu::to_vector({ 384 i_on.subscribe(200, 700) 385 }); 386 auto actual = xs.subscriptions(); 387 REQUIRE(required == actual); 388 } 389 390 THEN("there were four subscription and unsubscription to the strings"){ 391 auto required = rxu::to_vector({ 392 s_on.subscribe(300, 550), 393 s_on.subscribe(400, 650), 394 s_on.subscribe(500, 750), 395 s_on.subscribe(600, 850) 396 }); 397 auto actual = ys.subscriptions(); 398 REQUIRE(required == actual); 399 } 400 } 401 402 WHEN("each int is mapped to the strings with coordinator"){ 403 404 auto res = w.start( __anon63d55a352002() 405 [&]() { 406 return xs 407 .merge_transform( 408 [&](int){ 409 return ys;}, 410 [](int, std::string s){ 411 return s;}, 412 rx::identity_current_thread()) 413 // forget type to workaround lambda deduction bug on msvc 2013 414 .as_dynamic(); 415 } 416 ); 417 418 THEN("the output contains strings repeated for each int"){ 419 auto required = rxu::to_vector({ 420 s_on.next(350, "foo"), 421 s_on.next(400, "bar"), 422 s_on.next(450, "baz"), 423 s_on.next(450, "foo"), 424 s_on.next(500, "qux"), 425 s_on.next(500, "bar"), 426 s_on.next(550, "baz"), 427 s_on.next(550, "foo"), 428 s_on.next(600, "qux"), 429 s_on.next(600, "bar"), 430 s_on.next(650, "baz"), 431 s_on.next(650, "foo"), 432 s_on.next(700, "qux"), 433 s_on.next(700, "bar"), 434 s_on.next(750, "baz"), 435 s_on.next(800, "qux"), 436 s_on.completed(850) 437 }); 438 auto actual = res.get_observer().messages(); 439 REQUIRE(required == actual); 440 } 441 442 THEN("there was one subscription and one unsubscription to the ints"){ 443 auto required = rxu::to_vector({ 444 i_on.subscribe(200, 700) 445 }); 446 auto actual = xs.subscriptions(); 447 REQUIRE(required == actual); 448 } 449 450 THEN("there were four subscription and unsubscription to the strings"){ 451 auto required = rxu::to_vector({ 452 s_on.subscribe(300, 550), 453 s_on.subscribe(400, 650), 454 s_on.subscribe(500, 750), 455 s_on.subscribe(600, 850) 456 }); 457 auto actual = ys.subscriptions(); 458 REQUIRE(required == actual); 459 } 460 } 461 } 462 } 463 464 SCENARIO("merge_transform source never ends", "[merge_transform][transform][map][operators]"){ 465 GIVEN("two cold observables. one of ints. one of strings."){ 466 auto sc = rxsc::make_test(); 467 auto w = sc.create_worker(); 468 const rxsc::test::messages<int> i_on; 469 const rxsc::test::messages<std::string> s_on; 470 471 auto xs = sc.make_cold_observable({ 472 i_on.next(100, 4), 473 i_on.next(200, 2), 474 i_on.next(300, 3), 475 i_on.next(400, 1), 476 i_on.next(500, 5), 477 i_on.next(700, 0) 478 }); 479 480 auto ys = sc.make_cold_observable({ 481 s_on.next(50, "foo"), 482 s_on.next(100, "bar"), 483 s_on.next(150, "baz"), 484 s_on.next(200, "qux"), 485 s_on.completed(250) 486 }); 487 488 WHEN("each int is mapped to the strings"){ 489 490 auto res = w.start( __anon63d55a352302() 491 [&]() { 492 return xs 493 .merge_transform([&](int){return ys;}, [](int, std::string s){return s;}) 494 // forget type to workaround lambda deduction bug on msvc 2013 495 .as_dynamic(); 496 } 497 ); 498 499 THEN("the output contains strings repeated for each int"){ 500 auto required = rxu::to_vector({ 501 s_on.next(350, "foo"), 502 s_on.next(400, "bar"), 503 s_on.next(450, "baz"), 504 s_on.next(450, "foo"), 505 s_on.next(500, "qux"), 506 s_on.next(500, "bar"), 507 s_on.next(550, "baz"), 508 s_on.next(550, "foo"), 509 s_on.next(600, "qux"), 510 s_on.next(600, "bar"), 511 s_on.next(650, "baz"), 512 s_on.next(650, "foo"), 513 s_on.next(700, "qux"), 514 s_on.next(700, "bar"), 515 s_on.next(750, "baz"), 516 s_on.next(750, "foo"), 517 s_on.next(800, "qux"), 518 s_on.next(800, "bar"), 519 s_on.next(850, "baz"), 520 s_on.next(900, "qux"), 521 s_on.next(950, "foo") 522 }); 523 auto actual = res.get_observer().messages(); 524 REQUIRE(required == actual); 525 } 526 527 THEN("there was one subscription and one unsubscription to the ints"){ 528 auto required = rxu::to_vector({ 529 i_on.subscribe(200, 1000) 530 }); 531 auto actual = xs.subscriptions(); 532 REQUIRE(required == actual); 533 } 534 535 THEN("there were four subscription and unsubscription to the strings"){ 536 auto required = rxu::to_vector({ 537 s_on.subscribe(300, 550), 538 s_on.subscribe(400, 650), 539 s_on.subscribe(500, 750), 540 s_on.subscribe(600, 850), 541 s_on.subscribe(700, 950), 542 s_on.subscribe(900, 1000) 543 }); 544 auto actual = ys.subscriptions(); 545 REQUIRE(required == actual); 546 } 547 } 548 } 549 } 550 551 SCENARIO("merge_transform inner error", "[merge_transform][transform][map][operators]"){ 552 GIVEN("two cold observables. one of ints. one of strings."){ 553 auto sc = rxsc::make_test(); 554 auto w = sc.create_worker(); 555 const rxsc::test::messages<int> i_on; 556 const rxsc::test::messages<std::string> s_on; 557 558 auto xs = sc.make_cold_observable({ 559 i_on.next(100, 4), 560 i_on.next(200, 2), 561 i_on.next(300, 3), 562 i_on.next(400, 1), 563 i_on.completed(500) 564 }); 565 566 std::runtime_error ex("filter on_error from inner source"); 567 568 auto ys = sc.make_cold_observable({ 569 s_on.next(55, "foo"), 570 s_on.next(104, "bar"), 571 s_on.next(153, "baz"), 572 s_on.next(202, "qux"), 573 s_on.error(301, ex) 574 }); 575 576 WHEN("each int is mapped to the strings"){ 577 578 auto res = w.start( __anon63d55a352602() 579 [&]() { 580 return xs 581 .merge_transform([&](int){return ys;}, [](int, std::string s){return s;}) 582 // forget type to workaround lambda deduction bug on msvc 2013 583 .as_dynamic(); 584 } 585 ); 586 587 THEN("the output contains strings repeated for each int"){ 588 auto required = rxu::to_vector({ 589 s_on.next(355, "foo"), 590 s_on.next(404, "bar"), 591 s_on.next(453, "baz"), 592 s_on.next(455, "foo"), 593 s_on.next(502, "qux"), 594 s_on.next(504, "bar"), 595 s_on.next(553, "baz"), 596 s_on.next(555, "foo"), 597 s_on.error(601, ex) 598 }); 599 auto actual = res.get_observer().messages(); 600 REQUIRE(required == actual); 601 } 602 603 THEN("there was one subscription and one unsubscription to the ints"){ 604 auto required = rxu::to_vector({ 605 i_on.subscribe(200, 601) 606 }); 607 auto actual = xs.subscriptions(); 608 REQUIRE(required == actual); 609 } 610 611 THEN("there were four subscription and unsubscription to the strings"){ 612 auto required = rxu::to_vector({ 613 s_on.subscribe(300, 601), 614 s_on.subscribe(400, 601), 615 s_on.subscribe(500, 601), 616 s_on.subscribe(600, 601) 617 }); 618 auto actual = ys.subscriptions(); 619 REQUIRE(required == actual); 620 } 621 } 622 } 623 } 624 625 SCENARIO("merge_transform, no result selector, no coordination", "[merge_transform][transform][map][operators]"){ 626 GIVEN("two cold observables. one of ints. one of strings."){ 627 auto sc = rxsc::make_test(); 628 auto w = sc.create_worker(); 629 const rxsc::test::messages<int> i_on; 630 const rxsc::test::messages<std::string> s_on; 631 632 auto xs = sc.make_cold_observable({ 633 i_on.next(100, 4), 634 i_on.next(200, 2), 635 i_on.next(300, 3), 636 i_on.next(400, 1), 637 i_on.completed(500) 638 }); 639 640 auto ys = sc.make_cold_observable({ 641 s_on.next(50, "foo"), 642 s_on.next(100, "bar"), 643 s_on.next(150, "baz"), 644 s_on.next(200, "qux"), 645 s_on.completed(250) 646 }); 647 648 WHEN("each int is mapped to the strings"){ 649 650 auto res = w.start( __anon63d55a352902() 651 [&]() { 652 return xs 653 .merge_transform( 654 [&](int){ 655 return ys;}) 656 // forget type to workaround lambda deduction bug on msvc 2013 657 .as_dynamic(); 658 } 659 ); 660 661 THEN("the output contains strings repeated for each int"){ 662 auto required = rxu::to_vector({ 663 s_on.next(350, "foo"), 664 s_on.next(400, "bar"), 665 s_on.next(450, "baz"), 666 s_on.next(450, "foo"), 667 s_on.next(500, "qux"), 668 s_on.next(500, "bar"), 669 s_on.next(550, "baz"), 670 s_on.next(550, "foo"), 671 s_on.next(600, "qux"), 672 s_on.next(600, "bar"), 673 s_on.next(650, "baz"), 674 s_on.next(650, "foo"), 675 s_on.next(700, "qux"), 676 s_on.next(700, "bar"), 677 s_on.next(750, "baz"), 678 s_on.next(800, "qux"), 679 s_on.completed(850) 680 }); 681 auto actual = res.get_observer().messages(); 682 REQUIRE(required == actual); 683 } 684 685 THEN("there was one subscription and one unsubscription to the ints"){ 686 auto required = rxu::to_vector({ 687 i_on.subscribe(200, 700) 688 }); 689 auto actual = xs.subscriptions(); 690 REQUIRE(required == actual); 691 } 692 693 THEN("there were four subscription and unsubscription to the strings"){ 694 auto required = rxu::to_vector({ 695 s_on.subscribe(300, 550), 696 s_on.subscribe(400, 650), 697 s_on.subscribe(500, 750), 698 s_on.subscribe(600, 850) 699 }); 700 auto actual = ys.subscriptions(); 701 REQUIRE(required == actual); 702 } 703 } 704 } 705 } 706 707 SCENARIO("merge_transform, no result selector, with coordination", "[merge_transform][transform][map][operators]"){ 708 GIVEN("two cold observables. one of ints. one of strings."){ 709 auto sc = rxsc::make_test(); 710 auto w = sc.create_worker(); 711 const rxsc::test::messages<int> i_on; 712 const rxsc::test::messages<std::string> s_on; 713 714 auto xs = sc.make_cold_observable({ 715 i_on.next(100, 4), 716 i_on.next(200, 2), 717 i_on.next(300, 3), 718 i_on.next(400, 1), 719 i_on.completed(500) 720 }); 721 722 auto ys = sc.make_cold_observable({ 723 s_on.next(50, "foo"), 724 s_on.next(100, "bar"), 725 s_on.next(150, "baz"), 726 s_on.next(200, "qux"), 727 s_on.completed(250) 728 }); 729 730 WHEN("each int is mapped to the strings"){ 731 732 auto res = w.start( __anon63d55a352b02() 733 [&]() { 734 return xs 735 .merge_transform( 736 [&](int){ 737 return ys;}, 738 rx::identity_current_thread()) 739 // forget type to workaround lambda deduction bug on msvc 2013 740 .as_dynamic(); 741 } 742 ); 743 744 THEN("the output contains strings repeated for each int"){ 745 auto required = rxu::to_vector({ 746 s_on.next(350, "foo"), 747 s_on.next(400, "bar"), 748 s_on.next(450, "baz"), 749 s_on.next(450, "foo"), 750 s_on.next(500, "qux"), 751 s_on.next(500, "bar"), 752 s_on.next(550, "baz"), 753 s_on.next(550, "foo"), 754 s_on.next(600, "qux"), 755 s_on.next(600, "bar"), 756 s_on.next(650, "baz"), 757 s_on.next(650, "foo"), 758 s_on.next(700, "qux"), 759 s_on.next(700, "bar"), 760 s_on.next(750, "baz"), 761 s_on.next(800, "qux"), 762 s_on.completed(850) 763 }); 764 auto actual = res.get_observer().messages(); 765 REQUIRE(required == actual); 766 } 767 768 THEN("there was one subscription and one unsubscription to the ints"){ 769 auto required = rxu::to_vector({ 770 i_on.subscribe(200, 700) 771 }); 772 auto actual = xs.subscriptions(); 773 REQUIRE(required == actual); 774 } 775 776 THEN("there were four subscription and unsubscription to the strings"){ 777 auto required = rxu::to_vector({ 778 s_on.subscribe(300, 550), 779 s_on.subscribe(400, 650), 780 s_on.subscribe(500, 750), 781 s_on.subscribe(600, 850) 782 }); 783 auto actual = ys.subscriptions(); 784 REQUIRE(required == actual); 785 } 786 } 787 } 788 } 789