1 #include "../test.h" 2 #include <rxcpp/operators/rx-zip.hpp> 3 4 SCENARIO("zip never/never", "[zip][join][operators]"){ 5 GIVEN("2 hot observables of ints."){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto n1 = sc.make_hot_observable({ 11 on.next(150, 1) 12 }); 13 14 auto n2 = sc.make_hot_observable({ 15 on.next(150, 1) 16 }); 17 18 WHEN("each int is combined with the latest from the other source"){ 19 20 auto res = w.start( __anonc36241a60102() 21 [&]() { 22 return n1 23 | rxo::zip( 24 [](int v2, int v1){ 25 return v2 + v1; 26 }, 27 n2 28 ) 29 // forget type to workaround lambda deduction bug on msvc 2013 30 | rxo::as_dynamic(); 31 } 32 ); 33 34 THEN("the output is empty"){ 35 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 36 auto actual = res.get_observer().messages(); 37 REQUIRE(required == actual); 38 } 39 40 THEN("there was one subscription and one unsubscription to the n1"){ 41 auto required = rxu::to_vector({ 42 on.subscribe(200, 1000) 43 }); 44 auto actual = n1.subscriptions(); 45 REQUIRE(required == actual); 46 } 47 48 THEN("there was one subscription and one unsubscription to the n2"){ 49 auto required = rxu::to_vector({ 50 on.subscribe(200, 1000) 51 }); 52 auto actual = n2.subscriptions(); 53 REQUIRE(required == actual); 54 } 55 } 56 } 57 } 58 59 SCENARIO("zip never N", "[zip][join][operators]"){ 60 GIVEN("N never completed hot observables of ints."){ 61 auto sc = rxsc::make_test(); 62 auto w = sc.create_worker(); 63 const rxsc::test::messages<int> on; 64 65 const std::size_t N = 4; 66 67 std::vector<rxcpp::test::testable_observable<int>> n; 68 for (std::size_t i = 0; i < N; ++i) { 69 n.push_back( 70 sc.make_hot_observable({ 71 on.next(150, 1) 72 }) 73 ); 74 } 75 76 WHEN("each int is combined with the latest from the other source"){ 77 78 auto res = w.start( __anonc36241a60302() 79 [&]() { 80 return n[0] 81 | rxo::zip( 82 [](int v0, int v1, int v2, int v3){ 83 return v0 + v1 + v2 + v3; 84 }, 85 n[1], n[2], n[3] 86 ) 87 // forget type to workaround lambda deduction bug on msvc 2013 88 | rxo::as_dynamic(); 89 } 90 ); 91 92 THEN("the output is empty"){ 93 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 94 auto actual = res.get_observer().messages(); 95 REQUIRE(required == actual); 96 } 97 98 THEN("there was one subscription and one unsubscription to each observable"){ 99 __anonc36241a60502(rxcpp::test::testable_observable<int> &s)100 std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){ 101 auto required = rxu::to_vector({ 102 on.subscribe(200, 1000) 103 }); 104 auto actual = s.subscriptions(); 105 REQUIRE(required == actual); 106 }); 107 } 108 } 109 } 110 } 111 112 SCENARIO("zip never/empty", "[zip][join][operators]"){ 113 GIVEN("2 hot observables of ints."){ 114 auto sc = rxsc::make_test(); 115 auto w = sc.create_worker(); 116 const rxsc::test::messages<int> on; 117 118 auto n = sc.make_hot_observable({ 119 on.next(150, 1) 120 }); 121 122 auto e = sc.make_hot_observable({ 123 on.next(150, 1), 124 on.completed(210) 125 }); 126 127 WHEN("each int is combined with the latest from the other source"){ 128 129 auto res = w.start( __anonc36241a60602() 130 [&]() { 131 return n 132 .zip( 133 [](int v2, int v1){ 134 return v2 + v1; 135 }, 136 e 137 ) 138 // forget type to workaround lambda deduction bug on msvc 2013 139 .as_dynamic(); 140 } 141 ); 142 143 THEN("the output is empty"){ 144 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 145 auto actual = res.get_observer().messages(); 146 REQUIRE(required == actual); 147 } 148 149 THEN("there was one subscription and one unsubscription to the n"){ 150 auto required = rxu::to_vector({ 151 on.subscribe(200, 1000) 152 }); 153 auto actual = n.subscriptions(); 154 REQUIRE(required == actual); 155 } 156 157 THEN("there was one subscription and one unsubscription to the e"){ 158 auto required = rxu::to_vector({ 159 on.subscribe(200, 210) 160 }); 161 auto actual = e.subscriptions(); 162 REQUIRE(required == actual); 163 } 164 } 165 } 166 } 167 168 SCENARIO("zip empty/never", "[zip][join][operators]"){ 169 GIVEN("2 hot observables of ints."){ 170 auto sc = rxsc::make_test(); 171 auto w = sc.create_worker(); 172 const rxsc::test::messages<int> on; 173 174 auto e = sc.make_hot_observable({ 175 on.next(150, 1), 176 on.completed(210) 177 }); 178 179 auto n = sc.make_hot_observable({ 180 on.next(150, 1) 181 }); 182 183 WHEN("each int is combined with the latest from the other source"){ 184 185 auto res = w.start( __anonc36241a60802() 186 [&]() { 187 return e 188 .zip( 189 [](int v2, int v1){ 190 return v2 + v1; 191 }, 192 n 193 ) 194 // forget type to workaround lambda deduction bug on msvc 2013 195 .as_dynamic(); 196 } 197 ); 198 199 THEN("the output is empty"){ 200 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 201 auto actual = res.get_observer().messages(); 202 REQUIRE(required == actual); 203 } 204 205 THEN("there was one subscription and one unsubscription to the e"){ 206 auto required = rxu::to_vector({ 207 on.subscribe(200, 210) 208 }); 209 auto actual = e.subscriptions(); 210 REQUIRE(required == actual); 211 } 212 213 THEN("there was one subscription and one unsubscription to the n"){ 214 auto required = rxu::to_vector({ 215 on.subscribe(200, 1000) 216 }); 217 auto actual = n.subscriptions(); 218 REQUIRE(required == actual); 219 } 220 } 221 } 222 } 223 224 SCENARIO("zip empty/empty", "[zip][join][operators]"){ 225 GIVEN("2 hot observables of ints."){ 226 auto sc = rxsc::make_test(); 227 auto w = sc.create_worker(); 228 const rxsc::test::messages<int> on; 229 230 auto e1 = sc.make_hot_observable({ 231 on.next(150, 1), 232 on.completed(210) 233 }); 234 235 auto e2 = sc.make_hot_observable({ 236 on.next(150, 1), 237 on.completed(210) 238 }); 239 240 WHEN("each int is combined with the latest from the other source"){ 241 242 auto res = w.start( __anonc36241a60a02() 243 [&]() { 244 return e1 245 .zip( 246 [](int v2, int v1){ 247 return v2 + v1; 248 }, 249 e2 250 ) 251 // forget type to workaround lambda deduction bug on msvc 2013 252 .as_dynamic(); 253 } 254 ); 255 256 THEN("the output contains only complete message"){ 257 auto required = rxu::to_vector({ 258 on.completed(210) 259 }); 260 auto actual = res.get_observer().messages(); 261 REQUIRE(required == actual); 262 } 263 264 THEN("there was one subscription and one unsubscription to the e"){ 265 auto required = rxu::to_vector({ 266 on.subscribe(200, 210) 267 }); 268 auto actual = e1.subscriptions(); 269 REQUIRE(required == actual); 270 } 271 272 THEN("there was one subscription and one unsubscription to the n"){ 273 auto required = rxu::to_vector({ 274 on.subscribe(200, 210) 275 }); 276 auto actual = e2.subscriptions(); 277 REQUIRE(required == actual); 278 } 279 } 280 } 281 } 282 283 SCENARIO("zip empty N", "[zip][join][operators]"){ 284 GIVEN("N empty hot observables of ints."){ 285 auto sc = rxsc::make_test(); 286 auto w = sc.create_worker(); 287 const rxsc::test::messages<int> on; 288 289 const int N = 4; 290 291 std::vector<rxcpp::test::testable_observable<int>> e; 292 for (int i = 0; i < N; ++i) { 293 e.push_back( 294 sc.make_hot_observable({ 295 on.next(150, 1), 296 on.completed(210 + 10 * i) 297 }) 298 ); 299 } 300 301 WHEN("each int is combined with the latest from the other source"){ 302 303 auto res = w.start( __anonc36241a60c02() 304 [&]() { 305 return e[0] 306 .zip( 307 [](int v0, int v1, int v2, int v3){ 308 return v0 + v1 + v2 + v3; 309 }, 310 e[1], e[2], e[3] 311 ) 312 // forget type to workaround lambda deduction bug on msvc 2013 313 .as_dynamic(); 314 } 315 ); 316 317 THEN("the output contains only complete message"){ 318 auto required = rxu::to_vector({ 319 on.completed(200 + 10 * N) 320 }); 321 auto actual = res.get_observer().messages(); 322 REQUIRE(required == actual); 323 } 324 325 THEN("there was one subscription and one unsubscription to each observable"){ 326 327 int i = 0; __anonc36241a60e02(rxcpp::test::testable_observable<int> &s)328 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 329 auto required = rxu::to_vector({ 330 on.subscribe(200, 200 + 10 * ++i) 331 }); 332 auto actual = s.subscriptions(); 333 REQUIRE(required == actual); 334 }); 335 } 336 } 337 } 338 } 339 340 SCENARIO("zip empty/return", "[zip][join][operators]"){ 341 GIVEN("2 hot observables of ints."){ 342 auto sc = rxsc::make_test(); 343 auto w = sc.create_worker(); 344 const rxsc::test::messages<int> on; 345 346 auto e = sc.make_hot_observable({ 347 on.next(150, 1), 348 on.completed(210) 349 }); 350 351 auto o = sc.make_hot_observable({ 352 on.next(150, 1), 353 on.next(215, 2), 354 on.completed(220) 355 }); 356 357 WHEN("each int is combined with the latest from the other source"){ 358 359 auto res = w.start( __anonc36241a60f02() 360 [&]() { 361 return e 362 .zip( 363 [](int v2, int v1){ 364 return v2 + v1; 365 }, 366 o 367 ) 368 // forget type to workaround lambda deduction bug on msvc 2013 369 .as_dynamic(); 370 } 371 ); 372 373 THEN("the output contains only complete message"){ 374 auto required = rxu::to_vector({ 375 on.completed(215) 376 }); 377 auto actual = res.get_observer().messages(); 378 REQUIRE(required == actual); 379 } 380 381 THEN("there was one subscription and one unsubscription to the e"){ 382 auto required = rxu::to_vector({ 383 on.subscribe(200, 210) 384 }); 385 auto actual = e.subscriptions(); 386 REQUIRE(required == actual); 387 } 388 389 THEN("there was one subscription and one unsubscription to the o"){ 390 auto required = rxu::to_vector({ 391 on.subscribe(200, 215) 392 }); 393 auto actual = o.subscriptions(); 394 REQUIRE(required == actual); 395 } 396 } 397 } 398 } 399 400 SCENARIO("zip return/empty", "[zip][join][operators]"){ 401 GIVEN("2 hot observables of ints."){ 402 auto sc = rxsc::make_test(); 403 auto w = sc.create_worker(); 404 const rxsc::test::messages<int> on; 405 406 auto o = sc.make_hot_observable({ 407 on.next(150, 1), 408 on.next(215, 2), 409 on.completed(220) 410 }); 411 412 auto e = sc.make_hot_observable({ 413 on.next(150, 1), 414 on.completed(210) 415 }); 416 417 WHEN("each int is combined with the latest from the other source"){ 418 419 auto res = w.start( __anonc36241a61102() 420 [&]() { 421 return o 422 .zip( 423 [](int v2, int v1){ 424 return v2 + v1; 425 }, 426 e 427 ) 428 // forget type to workaround lambda deduction bug on msvc 2013 429 .as_dynamic(); 430 } 431 ); 432 433 THEN("the output contains only complete message"){ 434 auto required = rxu::to_vector({ 435 on.completed(215) 436 }); 437 auto actual = res.get_observer().messages(); 438 REQUIRE(required == actual); 439 } 440 441 THEN("there was one subscription and one unsubscription to the o"){ 442 auto required = rxu::to_vector({ 443 on.subscribe(200, 215) 444 }); 445 auto actual = o.subscriptions(); 446 REQUIRE(required == actual); 447 } 448 449 THEN("there was one subscription and one unsubscription to the e"){ 450 auto required = rxu::to_vector({ 451 on.subscribe(200, 210) 452 }); 453 auto actual = e.subscriptions(); 454 REQUIRE(required == actual); 455 } 456 } 457 } 458 } 459 460 SCENARIO("zip never/return", "[zip][join][operators]"){ 461 GIVEN("2 hot observables of ints."){ 462 auto sc = rxsc::make_test(); 463 auto w = sc.create_worker(); 464 const rxsc::test::messages<int> on; 465 466 auto n = sc.make_hot_observable({ 467 on.next(150, 1) 468 }); 469 470 auto o = sc.make_hot_observable({ 471 on.next(150, 1), 472 on.next(215, 2), 473 on.completed(220) 474 }); 475 476 WHEN("each int is combined with the latest from the other source"){ 477 478 auto res = w.start( __anonc36241a61302() 479 [&]() { 480 return n 481 .zip( 482 [](int v2, int v1){ 483 return v2 + v1; 484 }, 485 o 486 ) 487 // forget type to workaround lambda deduction bug on msvc 2013 488 .as_dynamic(); 489 } 490 ); 491 492 THEN("the output is empty"){ 493 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 494 auto actual = res.get_observer().messages(); 495 REQUIRE(required == actual); 496 } 497 498 THEN("there was one subscription and one unsubscription to the n"){ 499 auto required = rxu::to_vector({ 500 on.subscribe(200, 1000) 501 }); 502 auto actual = n.subscriptions(); 503 REQUIRE(required == actual); 504 } 505 506 THEN("there was one subscription and one unsubscription to the o"){ 507 auto required = rxu::to_vector({ 508 on.subscribe(200, 220) 509 }); 510 auto actual = o.subscriptions(); 511 REQUIRE(required == actual); 512 } 513 } 514 } 515 } 516 517 SCENARIO("zip return/never", "[zip][join][operators]"){ 518 GIVEN("2 hot observables of ints."){ 519 auto sc = rxsc::make_test(); 520 auto w = sc.create_worker(); 521 const rxsc::test::messages<int> on; 522 523 auto o = sc.make_hot_observable({ 524 on.next(150, 1), 525 on.next(215, 2), 526 on.completed(220) 527 }); 528 529 auto n = sc.make_hot_observable({ 530 on.next(150, 1) 531 }); 532 533 WHEN("each int is combined with the latest from the other source"){ 534 535 auto res = w.start( __anonc36241a61502() 536 [&]() { 537 return o 538 .zip( 539 [](int v2, int v1){ 540 return v2 + v1; 541 }, 542 n 543 ) 544 // forget type to workaround lambda deduction bug on msvc 2013 545 .as_dynamic(); 546 } 547 ); 548 549 THEN("the output is empty"){ 550 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 551 auto actual = res.get_observer().messages(); 552 REQUIRE(required == actual); 553 } 554 555 THEN("there was one subscription and one unsubscription to the n"){ 556 auto required = rxu::to_vector({ 557 on.subscribe(200, 1000) 558 }); 559 auto actual = n.subscriptions(); 560 REQUIRE(required == actual); 561 } 562 563 THEN("there was one subscription and one unsubscription to the o"){ 564 auto required = rxu::to_vector({ 565 on.subscribe(200, 220) 566 }); 567 auto actual = o.subscriptions(); 568 REQUIRE(required == actual); 569 } 570 } 571 } 572 } 573 574 SCENARIO("zip return/return", "[zip][join][operators]"){ 575 GIVEN("2 hot observables of ints."){ 576 auto sc = rxsc::make_test(); 577 auto w = sc.create_worker(); 578 const rxsc::test::messages<int> on; 579 580 auto o1 = sc.make_hot_observable({ 581 on.next(150, 1), 582 on.next(215, 2), 583 on.completed(230) 584 }); 585 586 auto o2 = sc.make_hot_observable({ 587 on.next(150, 1), 588 on.next(220, 3), 589 on.completed(240) 590 }); 591 592 WHEN("each int is combined with the latest from the other source"){ 593 594 auto res = w.start( __anonc36241a61702() 595 [&]() { 596 return o1 597 .zip( 598 [](int v2, int v1){ 599 return v2 + v1; 600 }, 601 o2 602 ) 603 // forget type to workaround lambda deduction bug on msvc 2013 604 .as_dynamic(); 605 } 606 ); 607 608 THEN("the output contains combined ints"){ 609 auto required = rxu::to_vector({ 610 on.next(220, 2 + 3), 611 on.completed(240) 612 }); 613 auto actual = res.get_observer().messages(); 614 REQUIRE(required == actual); 615 } 616 617 THEN("there was one subscription and one unsubscription to the o1"){ 618 auto required = rxu::to_vector({ 619 on.subscribe(200, 230) 620 }); 621 auto actual = o1.subscriptions(); 622 REQUIRE(required == actual); 623 } 624 625 THEN("there was one subscription and one unsubscription to the o2"){ 626 auto required = rxu::to_vector({ 627 on.subscribe(200, 240) 628 }); 629 auto actual = o2.subscriptions(); 630 REQUIRE(required == actual); 631 } 632 } 633 } 634 } 635 636 SCENARIO("zip empty/error", "[zip][join][operators]"){ 637 GIVEN("2 hot observables of ints."){ 638 auto sc = rxsc::make_test(); 639 auto w = sc.create_worker(); 640 const rxsc::test::messages<int> on; 641 642 std::runtime_error ex("zip on_error from source"); 643 644 auto emp = sc.make_hot_observable({ 645 on.next(150, 1), 646 on.completed(230) 647 }); 648 649 auto err = sc.make_hot_observable({ 650 on.next(150, 1), 651 on.error(220, ex) 652 }); 653 654 WHEN("each int is combined with the latest from the other source"){ 655 656 auto res = w.start( __anonc36241a61902() 657 [&]() { 658 return emp 659 .zip( 660 [](int v2, int v1){ 661 return v2 + v1; 662 }, 663 err 664 ) 665 // forget type to workaround lambda deduction bug on msvc 2013 666 .as_dynamic(); 667 } 668 ); 669 670 THEN("the output contains only error message"){ 671 auto required = rxu::to_vector({ 672 on.error(220, ex) 673 }); 674 auto actual = res.get_observer().messages(); 675 REQUIRE(required == actual); 676 } 677 678 THEN("there was one subscription and one unsubscription to the emp"){ 679 auto required = rxu::to_vector({ 680 on.subscribe(200, 220) 681 }); 682 auto actual = emp.subscriptions(); 683 REQUIRE(required == actual); 684 } 685 686 THEN("there was one subscription and one unsubscription to the err"){ 687 auto required = rxu::to_vector({ 688 on.subscribe(200, 220) 689 }); 690 auto actual = err.subscriptions(); 691 REQUIRE(required == actual); 692 } 693 } 694 } 695 } 696 697 SCENARIO("zip error/empty", "[zip][join][operators]"){ 698 GIVEN("2 hot observables of ints."){ 699 auto sc = rxsc::make_test(); 700 auto w = sc.create_worker(); 701 const rxsc::test::messages<int> on; 702 703 std::runtime_error ex("zip on_error from source"); 704 705 auto err = sc.make_hot_observable({ 706 on.next(150, 1), 707 on.error(220, ex) 708 }); 709 710 auto emp = sc.make_hot_observable({ 711 on.next(150, 1), 712 on.completed(230) 713 }); 714 715 WHEN("each int is combined with the latest from the other source"){ 716 717 auto res = w.start( __anonc36241a61b02() 718 [&]() { 719 return err 720 .zip( 721 [](int v2, int v1){ 722 return v2 + v1; 723 }, 724 emp 725 ) 726 // forget type to workaround lambda deduction bug on msvc 2013 727 .as_dynamic(); 728 } 729 ); 730 731 THEN("the output contains only error message"){ 732 auto required = rxu::to_vector({ 733 on.error(220, ex) 734 }); 735 auto actual = res.get_observer().messages(); 736 REQUIRE(required == actual); 737 } 738 739 THEN("there was one subscription and one unsubscription to the emp"){ 740 auto required = rxu::to_vector({ 741 on.subscribe(200, 220) 742 }); 743 auto actual = emp.subscriptions(); 744 REQUIRE(required == actual); 745 } 746 747 THEN("there was one subscription and one unsubscription to the err"){ 748 auto required = rxu::to_vector({ 749 on.subscribe(200, 220) 750 }); 751 auto actual = err.subscriptions(); 752 REQUIRE(required == actual); 753 } 754 } 755 } 756 } 757 758 SCENARIO("zip never/error", "[zip][join][operators]"){ 759 GIVEN("2 hot observables of ints."){ 760 auto sc = rxsc::make_test(); 761 auto w = sc.create_worker(); 762 const rxsc::test::messages<int> on; 763 764 std::runtime_error ex("zip on_error from source"); 765 766 auto n = sc.make_hot_observable({ 767 on.next(150, 1) 768 }); 769 770 auto err = sc.make_hot_observable({ 771 on.next(150, 1), 772 on.error(220, ex) 773 }); 774 775 WHEN("each int is combined with the latest from the other source"){ 776 777 auto res = w.start( __anonc36241a61d02() 778 [&]() { 779 return n 780 .zip( 781 [](int v2, int v1){ 782 return v2 + v1; 783 }, 784 err 785 ) 786 // forget type to workaround lambda deduction bug on msvc 2013 787 .as_dynamic(); 788 } 789 ); 790 791 THEN("the output contains only error message"){ 792 auto required = rxu::to_vector({ 793 on.error(220, ex) 794 }); 795 auto actual = res.get_observer().messages(); 796 REQUIRE(required == actual); 797 } 798 799 THEN("there was one subscription and one unsubscription to the n"){ 800 auto required = rxu::to_vector({ 801 on.subscribe(200, 220) 802 }); 803 auto actual = n.subscriptions(); 804 REQUIRE(required == actual); 805 } 806 807 THEN("there was one subscription and one unsubscription to the err"){ 808 auto required = rxu::to_vector({ 809 on.subscribe(200, 220) 810 }); 811 auto actual = err.subscriptions(); 812 REQUIRE(required == actual); 813 } 814 } 815 } 816 } 817 818 SCENARIO("zip error/never", "[zip][join][operators]"){ 819 GIVEN("2 hot observables of ints."){ 820 auto sc = rxsc::make_test(); 821 auto w = sc.create_worker(); 822 const rxsc::test::messages<int> on; 823 824 std::runtime_error ex("zip on_error from source"); 825 826 auto err = sc.make_hot_observable({ 827 on.next(150, 1), 828 on.error(220, ex) 829 }); 830 831 auto n = sc.make_hot_observable({ 832 on.next(150, 1) 833 }); 834 835 WHEN("each int is combined with the latest from the other source"){ 836 837 auto res = w.start( __anonc36241a61f02() 838 [&]() { 839 return err 840 .zip( 841 [](int v2, int v1){ 842 return v2 + v1; 843 }, 844 n 845 ) 846 // forget type to workaround lambda deduction bug on msvc 2013 847 .as_dynamic(); 848 } 849 ); 850 851 THEN("the output contains only error message"){ 852 auto required = rxu::to_vector({ 853 on.error(220, ex) 854 }); 855 auto actual = res.get_observer().messages(); 856 REQUIRE(required == actual); 857 } 858 859 THEN("there was one subscription and one unsubscription to the n"){ 860 auto required = rxu::to_vector({ 861 on.subscribe(200, 220) 862 }); 863 auto actual = n.subscriptions(); 864 REQUIRE(required == actual); 865 } 866 867 THEN("there was one subscription and one unsubscription to the err"){ 868 auto required = rxu::to_vector({ 869 on.subscribe(200, 220) 870 }); 871 auto actual = err.subscriptions(); 872 REQUIRE(required == actual); 873 } 874 } 875 } 876 } 877 878 SCENARIO("zip error/error", "[zip][join][operators]"){ 879 GIVEN("2 hot observables of ints."){ 880 auto sc = rxsc::make_test(); 881 auto w = sc.create_worker(); 882 const rxsc::test::messages<int> on; 883 884 std::runtime_error ex1("zip on_error from source 1"); 885 std::runtime_error ex2("zip on_error from source 2"); 886 887 auto err1 = sc.make_hot_observable({ 888 on.next(150, 1), 889 on.error(220, ex1) 890 }); 891 892 auto err2 = sc.make_hot_observable({ 893 on.next(150, 1), 894 on.error(230, ex2) 895 }); 896 897 WHEN("each int is combined with the latest from the other source"){ 898 899 auto res = w.start( __anonc36241a62102() 900 [&]() { 901 return err1 902 .zip( 903 [](int v2, int v1){ 904 return v2 + v1; 905 }, 906 err2 907 ) 908 // forget type to workaround lambda deduction bug on msvc 2013 909 .as_dynamic(); 910 } 911 ); 912 913 THEN("the output contains only error message"){ 914 auto required = rxu::to_vector({ 915 on.error(220, ex1) 916 }); 917 auto actual = res.get_observer().messages(); 918 REQUIRE(required == actual); 919 } 920 921 THEN("there was one subscription and one unsubscription to the err1"){ 922 auto required = rxu::to_vector({ 923 on.subscribe(200, 220) 924 }); 925 auto actual = err1.subscriptions(); 926 REQUIRE(required == actual); 927 } 928 929 THEN("there was one subscription and one unsubscription to the err2"){ 930 auto required = rxu::to_vector({ 931 on.subscribe(200, 220) 932 }); 933 auto actual = err2.subscriptions(); 934 REQUIRE(required == actual); 935 } 936 } 937 } 938 } 939 940 SCENARIO("zip return/error", "[zip][join][operators]"){ 941 GIVEN("2 hot observables of ints."){ 942 auto sc = rxsc::make_test(); 943 auto w = sc.create_worker(); 944 const rxsc::test::messages<int> on; 945 946 std::runtime_error ex("zip on_error from source"); 947 948 auto o = sc.make_hot_observable({ 949 on.next(150, 1), 950 on.next(210, 2), 951 on.completed(230) 952 }); 953 954 auto err = sc.make_hot_observable({ 955 on.next(150, 1), 956 on.error(220, ex) 957 }); 958 959 WHEN("each int is combined with the latest from the other source"){ 960 961 auto res = w.start( __anonc36241a62302() 962 [&]() { 963 return o 964 .zip( 965 [](int v2, int v1){ 966 return v2 + v1; 967 }, 968 err 969 ) 970 // forget type to workaround lambda deduction bug on msvc 2013 971 .as_dynamic(); 972 } 973 ); 974 975 THEN("the output contains only error message"){ 976 auto required = rxu::to_vector({ 977 on.error(220, ex) 978 }); 979 auto actual = res.get_observer().messages(); 980 REQUIRE(required == actual); 981 } 982 983 THEN("there was one subscription and one unsubscription to the ret"){ 984 auto required = rxu::to_vector({ 985 on.subscribe(200, 220) 986 }); 987 auto actual = o.subscriptions(); 988 REQUIRE(required == actual); 989 } 990 991 THEN("there was one subscription and one unsubscription to the err"){ 992 auto required = rxu::to_vector({ 993 on.subscribe(200, 220) 994 }); 995 auto actual = err.subscriptions(); 996 REQUIRE(required == actual); 997 } 998 } 999 } 1000 } 1001 1002 SCENARIO("zip error/return", "[zip][join][operators]"){ 1003 GIVEN("2 hot observables of ints."){ 1004 auto sc = rxsc::make_test(); 1005 auto w = sc.create_worker(); 1006 const rxsc::test::messages<int> on; 1007 1008 std::runtime_error ex("zip on_error from source"); 1009 1010 auto err = sc.make_hot_observable({ 1011 on.next(150, 1), 1012 on.error(220, ex) 1013 }); 1014 1015 auto ret = sc.make_hot_observable({ 1016 on.next(150, 1), 1017 on.next(210, 2), 1018 on.completed(230) 1019 }); 1020 1021 WHEN("each int is combined with the latest from the other source"){ 1022 1023 auto res = w.start( __anonc36241a62502() 1024 [&]() { 1025 return err 1026 .zip( 1027 [](int v2, int v1){ 1028 return v2 + v1; 1029 }, 1030 ret 1031 ) 1032 // forget type to workaround lambda deduction bug on msvc 2013 1033 .as_dynamic(); 1034 } 1035 ); 1036 1037 THEN("the output contains only error message"){ 1038 auto required = rxu::to_vector({ 1039 on.error(220, ex) 1040 }); 1041 auto actual = res.get_observer().messages(); 1042 REQUIRE(required == actual); 1043 } 1044 1045 THEN("there was one subscription and one unsubscription to the ret"){ 1046 auto required = rxu::to_vector({ 1047 on.subscribe(200, 220) 1048 }); 1049 auto actual = ret.subscriptions(); 1050 REQUIRE(required == actual); 1051 } 1052 1053 THEN("there was one subscription and one unsubscription to the err"){ 1054 auto required = rxu::to_vector({ 1055 on.subscribe(200, 220) 1056 }); 1057 auto actual = err.subscriptions(); 1058 REQUIRE(required == actual); 1059 } 1060 } 1061 } 1062 } 1063 1064 SCENARIO("zip left completes first", "[zip][join][operators]"){ 1065 GIVEN("2 hot observables of ints."){ 1066 auto sc = rxsc::make_test(); 1067 auto w = sc.create_worker(); 1068 const rxsc::test::messages<int> on; 1069 1070 auto o1 = sc.make_hot_observable({ 1071 on.next(150, 1), 1072 on.next(210, 2), 1073 on.completed(220) 1074 }); 1075 1076 auto o2 = sc.make_hot_observable({ 1077 on.next(150, 1), 1078 on.next(215, 4), 1079 on.completed(225) 1080 }); 1081 1082 WHEN("each int is combined with the latest from the other source"){ 1083 1084 auto res = w.start( __anonc36241a62702() 1085 [&]() { 1086 return o2 1087 .zip( 1088 [](int v2, int v1){ 1089 return v2 + v1; 1090 }, 1091 o1 1092 ) 1093 // forget type to workaround lambda deduction bug on msvc 2013 1094 .as_dynamic(); 1095 } 1096 ); 1097 1098 THEN("the output contains combined ints"){ 1099 auto required = rxu::to_vector({ 1100 on.next(215, 2 + 4), 1101 on.completed(225) 1102 }); 1103 auto actual = res.get_observer().messages(); 1104 REQUIRE(required == actual); 1105 } 1106 1107 THEN("there was one subscription and one unsubscription to the o1"){ 1108 auto required = rxu::to_vector({ 1109 on.subscribe(200, 220) 1110 }); 1111 auto actual = o1.subscriptions(); 1112 REQUIRE(required == actual); 1113 } 1114 1115 THEN("there was one subscription and one unsubscription to the o2"){ 1116 auto required = rxu::to_vector({ 1117 on.subscribe(200, 225) 1118 }); 1119 auto actual = o2.subscriptions(); 1120 REQUIRE(required == actual); 1121 } 1122 } 1123 } 1124 } 1125 1126 SCENARIO("zip right completes first", "[zip][join][operators]"){ 1127 GIVEN("2 hot observables of ints."){ 1128 auto sc = rxsc::make_test(); 1129 auto w = sc.create_worker(); 1130 const rxsc::test::messages<int> on; 1131 1132 auto o1 = sc.make_hot_observable({ 1133 on.next(150, 1), 1134 on.next(215, 4), 1135 on.completed(225) 1136 }); 1137 1138 auto o2 = sc.make_hot_observable({ 1139 on.next(150, 1), 1140 on.next(210, 2), 1141 on.completed(220) 1142 }); 1143 1144 WHEN("each int is combined with the latest from the other source"){ 1145 1146 auto res = w.start( __anonc36241a62902() 1147 [&]() { 1148 return o2 1149 .zip( 1150 [](int v2, int v1){ 1151 return v2 + v1; 1152 }, 1153 o1 1154 ) 1155 // forget type to workaround lambda deduction bug on msvc 2013 1156 .as_dynamic(); 1157 } 1158 ); 1159 1160 THEN("the output contains combined ints"){ 1161 auto required = rxu::to_vector({ 1162 on.next(215, 2 + 4), 1163 on.completed(225) 1164 }); 1165 auto actual = res.get_observer().messages(); 1166 REQUIRE(required == actual); 1167 } 1168 1169 THEN("there was one subscription and one unsubscription to the o1"){ 1170 auto required = rxu::to_vector({ 1171 on.subscribe(200, 225) 1172 }); 1173 auto actual = o1.subscriptions(); 1174 REQUIRE(required == actual); 1175 } 1176 1177 THEN("there was one subscription and one unsubscription to the o2"){ 1178 auto required = rxu::to_vector({ 1179 on.subscribe(200, 220) 1180 }); 1181 auto actual = o2.subscriptions(); 1182 REQUIRE(required == actual); 1183 } 1184 } 1185 } 1186 } 1187 1188 SCENARIO("zip selector throws", "[zip][join][operators][!throws]"){ 1189 GIVEN("2 hot observables of ints."){ 1190 auto sc = rxsc::make_test(); 1191 auto w = sc.create_worker(); 1192 const rxsc::test::messages<int> on; 1193 1194 std::runtime_error ex("zip on_error from source"); 1195 1196 auto o1 = sc.make_hot_observable({ 1197 on.next(150, 1), 1198 on.next(215, 2), 1199 on.completed(230) 1200 }); 1201 1202 auto o2 = sc.make_hot_observable({ 1203 on.next(150, 1), 1204 on.next(220, 3), 1205 on.completed(240) 1206 }); 1207 1208 WHEN("each int is combined with the latest from the other source"){ 1209 1210 auto res = w.start( __anonc36241a62b02() 1211 [&]() { 1212 return o1 1213 .zip( 1214 [&ex](int, int) -> int { 1215 rxu::throw_exception(ex); 1216 }, 1217 o2 1218 ) 1219 // forget type to workaround lambda deduction bug on msvc 2013 1220 .as_dynamic(); 1221 } 1222 ); 1223 1224 THEN("the output contains only error"){ 1225 auto required = rxu::to_vector({ 1226 on.error(220, ex) 1227 }); 1228 auto actual = res.get_observer().messages(); 1229 REQUIRE(required == actual); 1230 } 1231 1232 THEN("there was one subscription and one unsubscription to the o1"){ 1233 auto required = rxu::to_vector({ 1234 on.subscribe(200, 220) 1235 }); 1236 auto actual = o1.subscriptions(); 1237 REQUIRE(required == actual); 1238 } 1239 1240 THEN("there was one subscription and one unsubscription to the o2"){ 1241 auto required = rxu::to_vector({ 1242 on.subscribe(200, 220) 1243 }); 1244 auto actual = o2.subscriptions(); 1245 REQUIRE(required == actual); 1246 } 1247 } 1248 } 1249 } 1250 1251 SCENARIO("zip selector throws N", "[zip][join][operators][!throws]"){ 1252 GIVEN("N hot observables of ints."){ 1253 auto sc = rxsc::make_test(); 1254 auto w = sc.create_worker(); 1255 const rxsc::test::messages<int> on; 1256 1257 const int N = 4; 1258 1259 std::runtime_error ex("zip on_error from source"); 1260 1261 std::vector<rxcpp::test::testable_observable<int>> e; 1262 for (int i = 0; i < N; ++i) { 1263 e.push_back( 1264 sc.make_hot_observable({ 1265 on.next(210 + 10 * i, 1), 1266 on.completed(500) 1267 }) 1268 ); 1269 } 1270 1271 WHEN("each int is combined with the latest from the other source"){ 1272 1273 auto res = w.start( __anonc36241a62d02() 1274 [&]() { 1275 return e[0] 1276 .zip( 1277 [&ex](int, int, int, int) -> int { 1278 rxu::throw_exception(ex); 1279 }, 1280 e[1], e[2], e[3] 1281 ) 1282 // forget type to workaround lambda deduction bug on msvc 2013 1283 .as_dynamic(); 1284 } 1285 ); 1286 1287 THEN("the output contains only error"){ 1288 auto required = rxu::to_vector({ 1289 on.error(200 + 10 * N, ex) 1290 }); 1291 auto actual = res.get_observer().messages(); 1292 REQUIRE(required == actual); 1293 } 1294 1295 THEN("there was one subscription and one unsubscription to each observable"){ 1296 __anonc36241a62f02(rxcpp::test::testable_observable<int> &s)1297 std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ 1298 auto required = rxu::to_vector({ 1299 on.subscribe(200, 200 + 10 * N) 1300 }); 1301 auto actual = s.subscriptions(); 1302 REQUIRE(required == actual); 1303 }); 1304 } 1305 } 1306 } 1307 } 1308 1309 SCENARIO("zip typical N", "[zip][join][operators]"){ 1310 GIVEN("N hot observables of ints."){ 1311 auto sc = rxsc::make_test(); 1312 auto w = sc.create_worker(); 1313 const rxsc::test::messages<int> on; 1314 1315 const int N = 4; 1316 1317 std::vector<rxcpp::test::testable_observable<int>> o; 1318 for (int i = 0; i < N; ++i) { 1319 o.push_back( 1320 sc.make_hot_observable({ 1321 on.next(150, 1), 1322 on.next(210 + 10 * i, i + 1), 1323 on.next(410 + 10 * i, i + N + 1), 1324 on.completed(800) 1325 }) 1326 ); 1327 } 1328 1329 WHEN("each int is combined with the latest from the other source"){ 1330 1331 auto res = w.start( __anonc36241a63002() 1332 [&]() { 1333 return o[0] 1334 .zip( 1335 [](int v0, int v1, int v2, int v3) { 1336 return v0 + v1 + v2 + v3; 1337 }, 1338 o[1], o[2], o[3] 1339 ) 1340 // forget type to workaround lambda deduction bug on msvc 2013 1341 .as_dynamic(); 1342 } 1343 ); 1344 1345 THEN("the output contains combined ints"){ 1346 auto required = rxu::to_vector({ 1347 on.next(200 + 10 * N, N * (N + 1) / 2), 1348 on.next(400 + 10 * N, N * (3 * N + 1) / 2) 1349 }); 1350 required.push_back(on.completed(800)); 1351 auto actual = res.get_observer().messages(); 1352 REQUIRE(required == actual); 1353 } 1354 1355 THEN("there was one subscription and one unsubscription to each observable"){ 1356 __anonc36241a63202(rxcpp::test::testable_observable<int> &s)1357 std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){ 1358 auto required = rxu::to_vector({ 1359 on.subscribe(200, 800) 1360 }); 1361 auto actual = s.subscriptions(); 1362 REQUIRE(required == actual); 1363 }); 1364 } 1365 } 1366 } 1367 } 1368 1369 SCENARIO("zip interleaved with tail", "[zip][join][operators]"){ 1370 GIVEN("2 hot observables of ints."){ 1371 auto sc = rxsc::make_test(); 1372 auto w = sc.create_worker(); 1373 const rxsc::test::messages<int> on; 1374 1375 auto o1 = sc.make_hot_observable({ 1376 on.next(150, 1), 1377 on.next(215, 2), 1378 on.next(225, 4), 1379 on.completed(230) 1380 }); 1381 1382 auto o2 = sc.make_hot_observable({ 1383 on.next(150, 1), 1384 on.next(220, 3), 1385 on.next(230, 5), 1386 on.next(235, 6), 1387 on.next(240, 7), 1388 on.completed(250) 1389 }); 1390 1391 WHEN("each int is combined with the latest from the other source"){ 1392 1393 auto res = w.start( __anonc36241a63302() 1394 [&]() { 1395 return o2 1396 .zip( 1397 [](int v2, int v1){ 1398 return v2 + v1; 1399 }, 1400 o1 1401 ) 1402 // forget type to workaround lambda deduction bug on msvc 2013 1403 .as_dynamic(); 1404 } 1405 ); 1406 1407 THEN("the output contains combined ints"){ 1408 auto required = rxu::to_vector({ 1409 on.next(220, 2 + 3), 1410 on.next(230, 4 + 5), 1411 on.completed(230) 1412 }); 1413 auto actual = res.get_observer().messages(); 1414 REQUIRE(required == actual); 1415 } 1416 1417 THEN("there was one subscription and one unsubscription to the o1"){ 1418 auto required = rxu::to_vector({ 1419 on.subscribe(200, 230) 1420 }); 1421 auto actual = o1.subscriptions(); 1422 REQUIRE(required == actual); 1423 } 1424 1425 THEN("there was one subscription and one unsubscription to the o2"){ 1426 auto required = rxu::to_vector({ 1427 on.subscribe(200, 230) 1428 }); 1429 auto actual = o2.subscriptions(); 1430 REQUIRE(required == actual); 1431 } 1432 } 1433 } 1434 } 1435 1436 SCENARIO("zip consecutive", "[zip][join][operators]"){ 1437 GIVEN("2 hot observables of ints."){ 1438 auto sc = rxsc::make_test(); 1439 auto w = sc.create_worker(); 1440 const rxsc::test::messages<int> on; 1441 1442 auto o1 = sc.make_hot_observable({ 1443 on.next(150, 1), 1444 on.next(215, 2), 1445 on.next(225, 4), 1446 on.completed(230) 1447 }); 1448 1449 auto o2 = sc.make_hot_observable({ 1450 on.next(150, 1), 1451 on.next(235, 6), 1452 on.next(240, 7), 1453 on.completed(250) 1454 }); 1455 1456 WHEN("each int is combined with the latest from the other source"){ 1457 1458 auto res = w.start( __anonc36241a63502() 1459 [&]() { 1460 return o2 1461 .zip( 1462 [](int v2, int v1){ 1463 return v2 + v1; 1464 }, 1465 o1 1466 ) 1467 // forget type to workaround lambda deduction bug on msvc 2013 1468 .as_dynamic(); 1469 } 1470 ); 1471 1472 THEN("the output contains combined ints"){ 1473 auto required = rxu::to_vector({ 1474 on.next(235, 2 + 6), 1475 on.next(240, 4 + 7), 1476 on.completed(240) 1477 }); 1478 auto actual = res.get_observer().messages(); 1479 REQUIRE(required == actual); 1480 } 1481 1482 THEN("there was one subscription and one unsubscription to the o1"){ 1483 auto required = rxu::to_vector({ 1484 on.subscribe(200, 230) 1485 }); 1486 auto actual = o1.subscriptions(); 1487 REQUIRE(required == actual); 1488 } 1489 1490 THEN("there was one subscription and one unsubscription to the o2"){ 1491 auto required = rxu::to_vector({ 1492 on.subscribe(200, 240) 1493 }); 1494 auto actual = o2.subscriptions(); 1495 REQUIRE(required == actual); 1496 } 1497 } 1498 } 1499 } 1500 1501 SCENARIO("zip consecutive ends with error left", "[zip][join][operators]"){ 1502 GIVEN("2 hot observables of ints."){ 1503 auto sc = rxsc::make_test(); 1504 auto w = sc.create_worker(); 1505 const rxsc::test::messages<int> on; 1506 1507 std::runtime_error ex("zip on_error from source"); 1508 1509 auto o1 = sc.make_hot_observable({ 1510 on.next(150, 1), 1511 on.next(215, 2), 1512 on.next(225, 4), 1513 on.error(230, ex) 1514 }); 1515 1516 auto o2 = sc.make_hot_observable({ 1517 on.next(150, 1), 1518 on.next(235, 6), 1519 on.next(240, 7), 1520 on.completed(250) 1521 }); 1522 1523 WHEN("each int is combined with the latest from the other source"){ 1524 1525 auto res = w.start( __anonc36241a63702() 1526 [&]() { 1527 return o2 1528 .zip( 1529 [](int v2, int v1){ 1530 return v2 + v1; 1531 }, 1532 o1 1533 ) 1534 // forget type to workaround lambda deduction bug on msvc 2013 1535 .as_dynamic(); 1536 } 1537 ); 1538 1539 THEN("the output contains only an error"){ 1540 auto required = rxu::to_vector({ 1541 on.error(230, ex) 1542 }); 1543 auto actual = res.get_observer().messages(); 1544 REQUIRE(required == actual); 1545 } 1546 1547 THEN("there was one subscription and one unsubscription to the o1"){ 1548 auto required = rxu::to_vector({ 1549 on.subscribe(200, 230) 1550 }); 1551 auto actual = o1.subscriptions(); 1552 REQUIRE(required == actual); 1553 } 1554 1555 THEN("there was one subscription and one unsubscription to the o2"){ 1556 auto required = rxu::to_vector({ 1557 on.subscribe(200, 230) 1558 }); 1559 auto actual = o2.subscriptions(); 1560 REQUIRE(required == actual); 1561 } 1562 } 1563 } 1564 } 1565 1566 SCENARIO("zip consecutive ends with error right", "[zip][join][operators]"){ 1567 GIVEN("2 hot observables of ints."){ 1568 auto sc = rxsc::make_test(); 1569 auto w = sc.create_worker(); 1570 const rxsc::test::messages<int> on; 1571 1572 std::runtime_error ex("zip on_error from source"); 1573 1574 auto o1 = sc.make_hot_observable({ 1575 on.next(150, 1), 1576 on.next(215, 2), 1577 on.next(225, 4), 1578 on.completed(250) 1579 }); 1580 1581 auto o2 = sc.make_hot_observable({ 1582 on.next(150, 1), 1583 on.next(235, 6), 1584 on.next(240, 7), 1585 on.error(245, ex) 1586 }); 1587 1588 WHEN("each int is combined with the latest from the other source"){ 1589 1590 auto res = w.start( __anonc36241a63902() 1591 [&]() { 1592 return o2 1593 .zip( 1594 [](int v2, int v1){ 1595 return v2 + v1; 1596 }, 1597 o1 1598 ) 1599 // forget type to workaround lambda deduction bug on msvc 2013 1600 .as_dynamic(); 1601 } 1602 ); 1603 1604 THEN("the output contains combined ints followed by an error"){ 1605 auto required = rxu::to_vector({ 1606 on.next(235, 2 + 6), 1607 on.next(240, 4 + 7), 1608 on.error(245, ex) 1609 }); 1610 auto actual = res.get_observer().messages(); 1611 REQUIRE(required == actual); 1612 } 1613 1614 THEN("there was one subscription and one unsubscription to the o1"){ 1615 auto required = rxu::to_vector({ 1616 on.subscribe(200, 245) 1617 }); 1618 auto actual = o1.subscriptions(); 1619 REQUIRE(required == actual); 1620 } 1621 1622 THEN("there was one subscription and one unsubscription to the o2"){ 1623 auto required = rxu::to_vector({ 1624 on.subscribe(200, 245) 1625 }); 1626 auto actual = o2.subscriptions(); 1627 REQUIRE(required == actual); 1628 } 1629 } 1630 } 1631 } 1632 1633 SCENARIO("zip next+error/error", "[zip][join][operators]"){ 1634 GIVEN("2 hot observables of ints."){ 1635 auto sc = rxsc::make_test(); 1636 auto w = sc.create_worker(); 1637 const rxsc::test::messages<int> on; 1638 1639 std::runtime_error ex1("zip on_error from source 1"); 1640 std::runtime_error ex2("zip on_error from source 2"); 1641 1642 auto err1 = sc.make_hot_observable({ 1643 on.next(150, 1), 1644 on.next(210, 2), 1645 on.error(220, ex1) 1646 }); 1647 1648 auto err2 = sc.make_hot_observable({ 1649 on.next(150, 1), 1650 on.error(230, ex2) 1651 }); 1652 1653 WHEN("each int is combined with the latest from the other source"){ 1654 1655 auto res = w.start( __anonc36241a63b02() 1656 [&]() { 1657 return err1 1658 .zip( 1659 [](int v2, int v1){ 1660 return v2 + v1; 1661 }, 1662 err2 1663 ) 1664 // forget type to workaround lambda deduction bug on msvc 2013 1665 .as_dynamic(); 1666 } 1667 ); 1668 1669 THEN("the output contains only error message"){ 1670 auto required = rxu::to_vector({ 1671 on.error(220, ex1) 1672 }); 1673 auto actual = res.get_observer().messages(); 1674 REQUIRE(required == actual); 1675 } 1676 1677 THEN("there was one subscription and one unsubscription to the err1"){ 1678 auto required = rxu::to_vector({ 1679 on.subscribe(200, 220) 1680 }); 1681 auto actual = err1.subscriptions(); 1682 REQUIRE(required == actual); 1683 } 1684 1685 THEN("there was one subscription and one unsubscription to the err2"){ 1686 auto required = rxu::to_vector({ 1687 on.subscribe(200, 220) 1688 }); 1689 auto actual = err2.subscriptions(); 1690 REQUIRE(required == actual); 1691 } 1692 } 1693 } 1694 } 1695 1696 SCENARIO("zip error/next+error", "[zip][join][operators]"){ 1697 GIVEN("2 hot observables of ints."){ 1698 auto sc = rxsc::make_test(); 1699 auto w = sc.create_worker(); 1700 const rxsc::test::messages<int> on; 1701 1702 std::runtime_error ex1("zip on_error from source 1"); 1703 std::runtime_error ex2("zip on_error from source 2"); 1704 1705 auto err1 = sc.make_hot_observable({ 1706 on.next(150, 1), 1707 on.error(230, ex1) 1708 }); 1709 1710 auto err2 = sc.make_hot_observable({ 1711 on.next(150, 1), 1712 on.next(210, 2), 1713 on.error(220, ex2) 1714 }); 1715 1716 WHEN("each int is combined with the latest from the other source"){ 1717 1718 auto res = w.start( __anonc36241a63d02() 1719 [&]() { 1720 return err1 1721 .zip( 1722 [](int v2, int v1){ 1723 return v2 + v1; 1724 }, 1725 err2 1726 ) 1727 // forget type to workaround lambda deduction bug on msvc 2013 1728 .as_dynamic(); 1729 } 1730 ); 1731 1732 THEN("the output contains only error message"){ 1733 auto required = rxu::to_vector({ 1734 on.error(220, ex2) 1735 }); 1736 auto actual = res.get_observer().messages(); 1737 REQUIRE(required == actual); 1738 } 1739 1740 THEN("there was one subscription and one unsubscription to the err1"){ 1741 auto required = rxu::to_vector({ 1742 on.subscribe(200, 220) 1743 }); 1744 auto actual = err1.subscriptions(); 1745 REQUIRE(required == actual); 1746 } 1747 1748 THEN("there was one subscription and one unsubscription to the err2"){ 1749 auto required = rxu::to_vector({ 1750 on.subscribe(200, 220) 1751 }); 1752 auto actual = err2.subscriptions(); 1753 REQUIRE(required == actual); 1754 } 1755 } 1756 } 1757 } 1758 1759 SCENARIO("zip error after completed left", "[zip][join][operators]"){ 1760 GIVEN("2 hot observables of ints."){ 1761 auto sc = rxsc::make_test(); 1762 auto w = sc.create_worker(); 1763 const rxsc::test::messages<int> on; 1764 1765 std::runtime_error ex("zip on_error from source"); 1766 1767 auto ret = sc.make_hot_observable({ 1768 on.next(150, 1), 1769 on.next(210, 2), 1770 on.completed(215) 1771 }); 1772 1773 auto err = sc.make_hot_observable({ 1774 on.next(150, 1), 1775 on.error(220, ex) 1776 }); 1777 1778 WHEN("each int is combined with the latest from the other source"){ 1779 1780 auto res = w.start( __anonc36241a63f02() 1781 [&]() { 1782 return ret 1783 .zip( 1784 [](int v2, int v1){ 1785 return v2 + v1; 1786 }, 1787 err 1788 ) 1789 // forget type to workaround lambda deduction bug on msvc 2013 1790 .as_dynamic(); 1791 } 1792 ); 1793 1794 THEN("the output contains only error message"){ 1795 auto required = rxu::to_vector({ 1796 on.error(220, ex) 1797 }); 1798 auto actual = res.get_observer().messages(); 1799 REQUIRE(required == actual); 1800 } 1801 1802 THEN("there was one subscription and one unsubscription to the ret"){ 1803 auto required = rxu::to_vector({ 1804 on.subscribe(200, 215) 1805 }); 1806 auto actual = ret.subscriptions(); 1807 REQUIRE(required == actual); 1808 } 1809 1810 THEN("there was one subscription and one unsubscription to the err"){ 1811 auto required = rxu::to_vector({ 1812 on.subscribe(200, 220) 1813 }); 1814 auto actual = err.subscriptions(); 1815 REQUIRE(required == actual); 1816 } 1817 } 1818 } 1819 } 1820 1821 SCENARIO("zip error after completed right", "[zip][join][operators]"){ 1822 GIVEN("2 hot observables of ints."){ 1823 auto sc = rxsc::make_test(); 1824 auto w = sc.create_worker(); 1825 const rxsc::test::messages<int> on; 1826 1827 std::runtime_error ex("zip on_error from source"); 1828 1829 auto err = sc.make_hot_observable({ 1830 on.next(150, 1), 1831 on.error(220, ex) 1832 }); 1833 1834 auto ret = sc.make_hot_observable({ 1835 on.next(150, 1), 1836 on.next(210, 2), 1837 on.completed(215) 1838 }); 1839 1840 WHEN("each int is combined with the latest from the other source"){ 1841 1842 auto res = w.start( __anonc36241a64102() 1843 [&]() { 1844 return err 1845 .zip( 1846 [](int v2, int v1){ 1847 return v2 + v1; 1848 }, 1849 ret 1850 ) 1851 // forget type to workaround lambda deduction bug on msvc 2013 1852 .as_dynamic(); 1853 } 1854 ); 1855 1856 THEN("the output contains only error message"){ 1857 auto required = rxu::to_vector({ 1858 on.error(220, ex) 1859 }); 1860 auto actual = res.get_observer().messages(); 1861 REQUIRE(required == actual); 1862 } 1863 1864 THEN("there was one subscription and one unsubscription to the ret"){ 1865 auto required = rxu::to_vector({ 1866 on.subscribe(200, 215) 1867 }); 1868 auto actual = ret.subscriptions(); 1869 REQUIRE(required == actual); 1870 } 1871 1872 THEN("there was one subscription and one unsubscription to the err"){ 1873 auto required = rxu::to_vector({ 1874 on.subscribe(200, 220) 1875 }); 1876 auto actual = err.subscriptions(); 1877 REQUIRE(required == actual); 1878 } 1879 } 1880 } 1881 } 1882