1 #include "../test.h" 2 #include "rxcpp/operators/rx-amb.hpp" 3 4 SCENARIO("amb never 3", "[amb][join][operators]"){ 5 GIVEN("1 cold observable with 3 hot observables of ints."){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 const rxsc::test::messages<rx::observable<int>> o_on; 10 11 auto ys1 = sc.make_hot_observable({ 12 on.next(100, 1) 13 }); 14 15 auto ys2 = sc.make_hot_observable({ 16 on.next(110, 2) 17 }); 18 19 auto ys3 = sc.make_hot_observable({ 20 on.next(120, 3) 21 }); 22 23 auto xs = sc.make_cold_observable({ 24 o_on.next(100, ys1), 25 o_on.next(100, ys2), 26 o_on.next(100, ys3), 27 o_on.completed(200) 28 }); 29 30 WHEN("the first observable is selected to produce ints"){ 31 32 auto res = w.start( __anon371989c60102() 33 [&]() { 34 return xs 35 | rxo::amb() 36 // forget type to workaround lambda deduction bug on msvc 2013 37 | rxo::as_dynamic(); 38 } 39 ); 40 41 THEN("the output is empty"){ 42 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 43 auto actual = res.get_observer().messages(); 44 REQUIRE(required == actual); 45 } 46 47 THEN("there was one subscription and one unsubscription to the xs"){ 48 auto required = rxu::to_vector({ 49 on.subscribe(200, 400) 50 }); 51 auto actual = xs.subscriptions(); 52 REQUIRE(required == actual); 53 } 54 55 THEN("there was one subscription and one unsubscription to the ys1"){ 56 auto required = rxu::to_vector({ 57 on.subscribe(300, 1000) 58 }); 59 auto actual = ys1.subscriptions(); 60 REQUIRE(required == actual); 61 } 62 63 THEN("there was one subscription and one unsubscription to the ys2"){ 64 auto required = rxu::to_vector({ 65 on.subscribe(300, 1000) 66 }); 67 auto actual = ys2.subscriptions(); 68 REQUIRE(required == actual); 69 } 70 71 THEN("there was one subscription and one unsubscription to the ys3"){ 72 auto required = rxu::to_vector({ 73 on.subscribe(300, 1000) 74 }); 75 auto actual = ys3.subscriptions(); 76 REQUIRE(required == actual); 77 } 78 } 79 } 80 } 81 82 SCENARIO("amb never empty", "[amb][join][operators]"){ 83 GIVEN("1 cold observable with 2 hot observables of ints."){ 84 auto sc = rxsc::make_test(); 85 auto w = sc.create_worker(); 86 const rxsc::test::messages<int> on; 87 const rxsc::test::messages<rx::observable<int>> o_on; 88 89 auto ys1 = sc.make_hot_observable({ 90 on.next(100, 1) 91 }); 92 93 auto ys2 = sc.make_hot_observable({ 94 on.next(110, 2), 95 on.completed(400) 96 }); 97 98 auto xs = sc.make_cold_observable({ 99 o_on.next(100, ys1), 100 o_on.next(100, ys2), 101 o_on.completed(150) 102 }); 103 104 WHEN("the first observable is selected to produce ints"){ 105 106 auto res = w.start( __anon371989c60202() 107 [&]() { 108 return xs 109 .amb() 110 // forget type to workaround lambda deduction bug on msvc 2013 111 .as_dynamic(); 112 } 113 ); 114 115 THEN("the output contains only complete message"){ 116 auto required = rxu::to_vector({ 117 on.completed(400) 118 }); 119 auto actual = res.get_observer().messages(); 120 REQUIRE(required == actual); 121 } 122 123 THEN("there was one subscription and one unsubscription to the xs"){ 124 auto required = rxu::to_vector({ 125 on.subscribe(200, 350) 126 }); 127 auto actual = xs.subscriptions(); 128 REQUIRE(required == actual); 129 } 130 131 THEN("there was one subscription and one unsubscription to the ys1"){ 132 auto required = rxu::to_vector({ 133 on.subscribe(300, 400) 134 }); 135 auto actual = ys1.subscriptions(); 136 REQUIRE(required == actual); 137 } 138 139 THEN("there was one subscription and one unsubscription to the ys2"){ 140 auto required = rxu::to_vector({ 141 on.subscribe(300, 400) 142 }); 143 auto actual = ys2.subscriptions(); 144 REQUIRE(required == actual); 145 } 146 } 147 } 148 } 149 150 SCENARIO("amb completes", "[amb][join][operators]"){ 151 GIVEN("1 cold observable with 3 cold observables of ints."){ 152 auto sc = rxsc::make_test(); 153 auto w = sc.create_worker(); 154 const rxsc::test::messages<int> on; 155 const rxsc::test::messages<rx::observable<int>> o_on; 156 157 auto ys1 = sc.make_cold_observable({ 158 on.next(10, 101), 159 on.next(110, 102), 160 on.next(210, 103), 161 on.completed(310) 162 }); 163 164 auto ys2 = sc.make_cold_observable({ 165 on.next(20, 201), 166 on.next(120, 202), 167 on.next(220, 203), 168 on.completed(320) 169 }); 170 171 auto ys3 = sc.make_cold_observable({ 172 on.next(30, 301), 173 on.next(130, 302), 174 on.next(230, 303), 175 on.completed(330) 176 }); 177 178 auto xs = sc.make_cold_observable({ 179 o_on.next(100, ys1), 180 o_on.next(100, ys2), 181 o_on.next(100, ys3), 182 o_on.completed(100) 183 }); 184 185 WHEN("the first observable is selected to produce ints"){ 186 187 auto res = w.start( __anon371989c60302() 188 [&]() { 189 return xs 190 .amb() 191 // forget type to workaround lambda deduction bug on msvc 2013 192 .as_dynamic(); 193 } 194 ); 195 196 THEN("the output contains ints from the first observable"){ 197 auto required = rxu::to_vector({ 198 on.next(310, 101), 199 on.next(410, 102), 200 on.next(510, 103), 201 on.completed(610) 202 }); 203 auto actual = res.get_observer().messages(); 204 REQUIRE(required == actual); 205 } 206 207 THEN("there was one subscription and one unsubscription to the xs"){ 208 auto required = rxu::to_vector({ 209 on.subscribe(200, 300) 210 }); 211 auto actual = xs.subscriptions(); 212 REQUIRE(required == actual); 213 } 214 215 THEN("there was one subscription and one unsubscription to the ys1"){ 216 auto required = rxu::to_vector({ 217 on.subscribe(300, 610) 218 }); 219 auto actual = ys1.subscriptions(); 220 REQUIRE(required == actual); 221 } 222 223 THEN("there was one subscription and one unsubscription to the ys2"){ 224 auto required = rxu::to_vector({ 225 on.subscribe(300, 310) 226 }); 227 auto actual = ys2.subscriptions(); 228 REQUIRE(required == actual); 229 } 230 231 THEN("there was one subscription and one unsubscription to the ys3"){ 232 auto required = rxu::to_vector({ 233 on.subscribe(300, 310) 234 }); 235 auto actual = ys3.subscriptions(); 236 REQUIRE(required == actual); 237 } 238 } 239 } 240 } 241 242 SCENARIO("amb winner throws", "[amb][join][operators]"){ 243 GIVEN("1 cold observable with 3 cold observables of ints."){ 244 auto sc = rxsc::make_test(); 245 auto w = sc.create_worker(); 246 const rxsc::test::messages<int> on; 247 const rxsc::test::messages<rx::observable<int>> o_on; 248 249 std::runtime_error ex("amb on_error from source"); 250 251 auto ys1 = sc.make_cold_observable({ 252 on.next(10, 101), 253 on.next(110, 102), 254 on.next(210, 103), 255 on.error(310, ex) 256 }); 257 258 auto ys2 = sc.make_cold_observable({ 259 on.next(20, 201), 260 on.next(120, 202), 261 on.next(220, 203), 262 on.completed(320) 263 }); 264 265 auto ys3 = sc.make_cold_observable({ 266 on.next(30, 301), 267 on.next(130, 302), 268 on.next(230, 303), 269 on.completed(330) 270 }); 271 272 auto xs = sc.make_cold_observable({ 273 o_on.next(100, ys1), 274 o_on.next(100, ys2), 275 o_on.next(100, ys3), 276 o_on.completed(100) 277 }); 278 279 WHEN("the first observable is selected to produce ints"){ 280 281 auto res = w.start( __anon371989c60402() 282 [&]() { 283 return xs 284 .amb() 285 // forget type to workaround lambda deduction bug on msvc 2013 286 .as_dynamic(); 287 } 288 ); 289 290 THEN("the output contains ints from the first observable"){ 291 auto required = rxu::to_vector({ 292 on.next(310, 101), 293 on.next(410, 102), 294 on.next(510, 103), 295 on.error(610, ex) 296 }); 297 auto actual = res.get_observer().messages(); 298 REQUIRE(required == actual); 299 } 300 301 THEN("there was one subscription and one unsubscription to the xs"){ 302 auto required = rxu::to_vector({ 303 on.subscribe(200, 300) 304 }); 305 auto actual = xs.subscriptions(); 306 REQUIRE(required == actual); 307 } 308 309 THEN("there was one subscription and one unsubscription to the ys1"){ 310 auto required = rxu::to_vector({ 311 on.subscribe(300, 610) 312 }); 313 auto actual = ys1.subscriptions(); 314 REQUIRE(required == actual); 315 } 316 317 THEN("there was one subscription and one unsubscription to the ys2"){ 318 auto required = rxu::to_vector({ 319 on.subscribe(300, 310) 320 }); 321 auto actual = ys2.subscriptions(); 322 REQUIRE(required == actual); 323 } 324 325 THEN("there was one subscription and one unsubscription to the ys3"){ 326 auto required = rxu::to_vector({ 327 on.subscribe(300, 310) 328 }); 329 auto actual = ys3.subscriptions(); 330 REQUIRE(required == actual); 331 } 332 } 333 } 334 } 335 336 SCENARIO("amb loser throws", "[amb][join][operators]"){ 337 GIVEN("1 cold observable with 3 cold observables of ints."){ 338 auto sc = rxsc::make_test(); 339 auto w = sc.create_worker(); 340 const rxsc::test::messages<int> on; 341 const rxsc::test::messages<rx::observable<int>> o_on; 342 343 std::runtime_error ex("amb on_error from source"); 344 345 auto ys1 = sc.make_cold_observable({ 346 on.next(10, 101), 347 on.next(110, 102), 348 on.next(210, 103), 349 on.completed(310) 350 }); 351 352 auto ys2 = sc.make_cold_observable({ 353 on.error(20, ex) 354 }); 355 356 auto ys3 = sc.make_cold_observable({ 357 on.next(30, 301), 358 on.next(130, 302), 359 on.next(230, 303), 360 on.completed(330) 361 }); 362 363 auto xs = sc.make_cold_observable({ 364 o_on.next(100, ys1), 365 o_on.next(100, ys2), 366 o_on.next(100, ys3), 367 o_on.completed(100) 368 }); 369 370 WHEN("the first observable is selected to produce ints"){ 371 372 auto res = w.start( __anon371989c60502() 373 [&]() { 374 return xs 375 .amb() 376 // forget type to workaround lambda deduction bug on msvc 2013 377 .as_dynamic(); 378 } 379 ); 380 381 THEN("the output contains ints from the first observable"){ 382 auto required = rxu::to_vector({ 383 on.next(310, 101), 384 on.next(410, 102), 385 on.next(510, 103), 386 on.completed(610) 387 }); 388 auto actual = res.get_observer().messages(); 389 REQUIRE(required == actual); 390 } 391 392 THEN("there was one subscription and one unsubscription to the xs"){ 393 auto required = rxu::to_vector({ 394 on.subscribe(200, 300) 395 }); 396 auto actual = xs.subscriptions(); 397 REQUIRE(required == actual); 398 } 399 400 THEN("there was one subscription and one unsubscription to the ys1"){ 401 auto required = rxu::to_vector({ 402 on.subscribe(300, 610) 403 }); 404 auto actual = ys1.subscriptions(); 405 REQUIRE(required == actual); 406 } 407 408 THEN("there was one subscription and one unsubscription to the ys2"){ 409 auto required = rxu::to_vector({ 410 on.subscribe(300, 310) 411 }); 412 auto actual = ys2.subscriptions(); 413 REQUIRE(required == actual); 414 } 415 416 THEN("there was one subscription and one unsubscription to the ys3"){ 417 auto required = rxu::to_vector({ 418 on.subscribe(300, 310) 419 }); 420 auto actual = ys3.subscriptions(); 421 REQUIRE(required == actual); 422 } 423 } 424 } 425 } 426 427 SCENARIO("amb throws before selection", "[amb][join][operators]"){ 428 GIVEN("1 cold observable with 3 cold observables of ints."){ 429 auto sc = rxsc::make_test(); 430 auto w = sc.create_worker(); 431 const rxsc::test::messages<int> on; 432 const rxsc::test::messages<rx::observable<int>> o_on; 433 434 std::runtime_error ex("amb on_error from source"); 435 436 auto ys1 = sc.make_cold_observable({ 437 on.next(110, 1), 438 on.completed(200) 439 }); 440 441 auto ys2 = sc.make_cold_observable({ 442 on.error(50, ex) 443 }); 444 445 auto ys3 = sc.make_cold_observable({ 446 on.next(130, 3), 447 on.completed(300) 448 }); 449 450 auto xs = sc.make_cold_observable({ 451 o_on.next(100, ys1), 452 o_on.next(100, ys2), 453 o_on.next(100, ys3), 454 o_on.completed(100) 455 }); 456 457 WHEN("the first observable is selected to produce ints"){ 458 459 auto res = w.start( __anon371989c60602() 460 [&]() { 461 return xs 462 .amb() 463 // forget type to workaround lambda deduction bug on msvc 2013 464 .as_dynamic(); 465 } 466 ); 467 468 THEN("the output contains only an error"){ 469 auto required = rxu::to_vector({ 470 on.error(350, ex) 471 }); 472 auto actual = res.get_observer().messages(); 473 REQUIRE(required == actual); 474 } 475 476 THEN("there was one subscription and one unsubscription to the xs"){ 477 auto required = rxu::to_vector({ 478 on.subscribe(200, 300) 479 }); 480 auto actual = xs.subscriptions(); 481 REQUIRE(required == actual); 482 } 483 484 THEN("there was one subscription and one unsubscription to the ys1"){ 485 auto required = rxu::to_vector({ 486 on.subscribe(300, 350) 487 }); 488 auto actual = ys1.subscriptions(); 489 REQUIRE(required == actual); 490 } 491 492 THEN("there was one subscription and one unsubscription to the ys2"){ 493 auto required = rxu::to_vector({ 494 on.subscribe(300, 350) 495 }); 496 auto actual = ys2.subscriptions(); 497 REQUIRE(required == actual); 498 } 499 500 THEN("there was one subscription and one unsubscription to the ys3"){ 501 auto required = rxu::to_vector({ 502 on.subscribe(300, 350) 503 }); 504 auto actual = ys3.subscriptions(); 505 REQUIRE(required == actual); 506 } 507 } 508 } 509 } 510 511 SCENARIO("amb throws before selection and emission end", "[amb][join][operators]"){ 512 GIVEN("1 cold observable with 3 cold observables of ints."){ 513 auto sc = rxsc::make_test(); 514 auto w = sc.create_worker(); 515 const rxsc::test::messages<int> on; 516 const rxsc::test::messages<rx::observable<int>> o_on; 517 518 std::runtime_error ex("amb on_error from source"); 519 520 auto ys1 = sc.make_cold_observable({ 521 on.next(110, 1), 522 on.completed(200) 523 }); 524 525 auto ys2 = sc.make_cold_observable({ 526 on.error(50, ex) 527 }); 528 529 auto ys3 = sc.make_cold_observable({ 530 on.next(130, 3), 531 on.completed(300) 532 }); 533 534 auto xs = sc.make_cold_observable({ 535 o_on.next(100, ys1), 536 o_on.next(100, ys2), 537 o_on.next(100, ys3), 538 o_on.completed(500) 539 }); 540 541 WHEN("the first observable is selected to produce ints"){ 542 543 auto res = w.start( __anon371989c60702() 544 [&]() { 545 return xs 546 .amb() 547 // forget type to workaround lambda deduction bug on msvc 2013 548 .as_dynamic(); 549 } 550 ); 551 552 THEN("the output contains only an error"){ 553 auto required = rxu::to_vector({ 554 on.error(350, ex) 555 }); 556 auto actual = res.get_observer().messages(); 557 REQUIRE(required == actual); 558 } 559 560 THEN("there was one subscription and one unsubscription to the xs"){ 561 auto required = rxu::to_vector({ 562 on.subscribe(200, 350) 563 }); 564 auto actual = xs.subscriptions(); 565 REQUIRE(required == actual); 566 } 567 568 THEN("there was one subscription and one unsubscription to the ys1"){ 569 auto required = rxu::to_vector({ 570 on.subscribe(300, 350) 571 }); 572 auto actual = ys1.subscriptions(); 573 REQUIRE(required == actual); 574 } 575 576 THEN("there was one subscription and one unsubscription to the ys2"){ 577 auto required = rxu::to_vector({ 578 on.subscribe(300, 350) 579 }); 580 auto actual = ys2.subscriptions(); 581 REQUIRE(required == actual); 582 } 583 584 THEN("there was one subscription and one unsubscription to the ys3"){ 585 auto required = rxu::to_vector({ 586 on.subscribe(300, 350) 587 }); 588 auto actual = ys3.subscriptions(); 589 REQUIRE(required == actual); 590 } 591 } 592 } 593 } 594 595 SCENARIO("amb loser comes when winner has already emitted", "[amb][join][operators]"){ 596 GIVEN("1 cold observable with 3 cold observables of ints."){ 597 auto sc = rxsc::make_test(); 598 auto w = sc.create_worker(); 599 const rxsc::test::messages<int> on; 600 const rxsc::test::messages<rx::observable<int>> o_on; 601 602 auto ys1 = sc.make_cold_observable({ 603 on.next(10, 101), 604 on.next(110, 102), 605 on.next(210, 103), 606 on.completed(310) 607 }); 608 609 auto ys2 = sc.make_cold_observable({ 610 on.next(20, 201), 611 on.next(120, 202), 612 on.next(220, 203), 613 on.completed(320) 614 }); 615 616 auto ys3 = sc.make_cold_observable({ 617 on.next(30, 301), 618 on.next(130, 302), 619 on.next(230, 303), 620 on.completed(330) 621 }); 622 623 auto xs = sc.make_cold_observable({ 624 o_on.next(100, ys1), 625 o_on.next(100, ys2), 626 o_on.next(200, ys3), 627 o_on.completed(200) 628 }); 629 630 WHEN("the first observable is selected to produce ints"){ 631 632 auto res = w.start( __anon371989c60802() 633 [&]() { 634 return xs 635 .amb() 636 // forget type to workaround lambda deduction bug on msvc 2013 637 .as_dynamic(); 638 } 639 ); 640 641 THEN("the output contains ints from the first observable"){ 642 auto required = rxu::to_vector({ 643 on.next(310, 101), 644 on.next(410, 102), 645 on.next(510, 103), 646 on.completed(610) 647 }); 648 auto actual = res.get_observer().messages(); 649 REQUIRE(required == actual); 650 } 651 652 THEN("there was one subscription and one unsubscription to the xs"){ 653 auto required = rxu::to_vector({ 654 on.subscribe(200, 400) 655 }); 656 auto actual = xs.subscriptions(); 657 REQUIRE(required == actual); 658 } 659 660 THEN("there was one subscription and one unsubscription to the ys1"){ 661 auto required = rxu::to_vector({ 662 on.subscribe(300, 610) 663 }); 664 auto actual = ys1.subscriptions(); 665 REQUIRE(required == actual); 666 } 667 668 THEN("there was one subscription and one unsubscription to the ys2"){ 669 auto required = rxu::to_vector({ 670 on.subscribe(300, 310) 671 }); 672 auto actual = ys2.subscriptions(); 673 REQUIRE(required == actual); 674 } 675 676 THEN("there were no subscriptions to the ys3"){ 677 auto required = std::vector<rxn::subscription>(); 678 auto actual = ys3.subscriptions(); 679 REQUIRE(required == actual); 680 } 681 } 682 } 683 } 684 685 SCENARIO("amb empty list", "[amb][join][operators]"){ 686 GIVEN("1 empty cold observable of observables of ints."){ 687 auto sc = rxsc::make_test(); 688 auto w = sc.create_worker(); 689 const rxsc::test::messages<int> on; 690 const rxsc::test::messages<rx::observable<int>> o_on; 691 692 auto xs = sc.make_cold_observable({ 693 o_on.completed(200) 694 }); 695 696 WHEN("the first observable is selected to produce ints"){ 697 698 auto res = w.start( __anon371989c60902() 699 [&]() { 700 return xs 701 .amb() 702 // forget type to workaround lambda deduction bug on msvc 2013 703 .as_dynamic(); 704 } 705 ); 706 707 THEN("the output contains only comlpete message"){ 708 auto required = rxu::to_vector({ 709 on.completed(400) 710 }); 711 auto actual = res.get_observer().messages(); 712 REQUIRE(required == actual); 713 } 714 715 THEN("there was one subscription and one unsubscription to the xs"){ 716 auto required = rxu::to_vector({ 717 on.subscribe(200, 400) 718 }); 719 auto actual = xs.subscriptions(); 720 REQUIRE(required == actual); 721 } 722 } 723 } 724 } 725 726 SCENARIO("amb source throws before selection", "[amb][join][operators]"){ 727 GIVEN("1 cold observable with 3 cold observables of ints."){ 728 auto sc = rxsc::make_test(); 729 auto w = sc.create_worker(); 730 const rxsc::test::messages<int> on; 731 const rxsc::test::messages<rx::observable<int>> o_on; 732 733 std::runtime_error ex("amb on_error from source"); 734 735 auto ys1 = sc.make_cold_observable({ 736 on.next(10, 101), 737 on.next(110, 102), 738 on.next(210, 103), 739 on.completed(310) 740 }); 741 742 auto ys2 = sc.make_cold_observable({ 743 on.next(20, 201), 744 on.next(120, 202), 745 on.next(220, 203), 746 on.completed(320) 747 }); 748 749 auto ys3 = sc.make_cold_observable({ 750 on.next(30, 301), 751 on.next(130, 302), 752 on.next(230, 303), 753 on.completed(330) 754 }); 755 756 auto xs = sc.make_cold_observable({ 757 o_on.next(100, ys1), 758 o_on.next(100, ys2), 759 o_on.next(100, ys3), 760 o_on.error(100, ex) 761 }); 762 763 WHEN("the first observable is selected to produce ints"){ 764 765 auto res = w.start( __anon371989c60a02() 766 [&]() { 767 return xs 768 .amb() 769 // forget type to workaround lambda deduction bug on msvc 2013 770 .as_dynamic(); 771 } 772 ); 773 774 THEN("the output contains only an error"){ 775 auto required = rxu::to_vector({ 776 on.error(300, ex) 777 }); 778 auto actual = res.get_observer().messages(); 779 REQUIRE(required == actual); 780 } 781 782 THEN("there was one subscription and one unsubscription to the xs"){ 783 auto required = rxu::to_vector({ 784 on.subscribe(200, 300) 785 }); 786 auto actual = xs.subscriptions(); 787 REQUIRE(required == actual); 788 } 789 790 THEN("there was one subscription and one unsubscription to the ys1"){ 791 auto required = rxu::to_vector({ 792 on.subscribe(300, 300) 793 }); 794 auto actual = ys1.subscriptions(); 795 REQUIRE(required == actual); 796 } 797 798 THEN("there was one subscription and one unsubscription to the ys2"){ 799 auto required = rxu::to_vector({ 800 on.subscribe(300, 300) 801 }); 802 auto actual = ys2.subscriptions(); 803 REQUIRE(required == actual); 804 } 805 806 THEN("there was one subscription and one unsubscription to the ys3"){ 807 auto required = rxu::to_vector({ 808 on.subscribe(300, 300) 809 }); 810 auto actual = ys3.subscriptions(); 811 REQUIRE(required == actual); 812 } 813 } 814 } 815 } 816 817 SCENARIO("amb source throws after selection", "[amb][join][operators]"){ 818 GIVEN("1 cold observable with 3 cold observables of ints."){ 819 auto sc = rxsc::make_test(); 820 auto w = sc.create_worker(); 821 const rxsc::test::messages<int> on; 822 const rxsc::test::messages<rx::observable<int>> o_on; 823 824 std::runtime_error ex("amb on_error from source"); 825 826 auto ys1 = sc.make_cold_observable({ 827 on.next(10, 101), 828 on.next(110, 102), 829 on.next(210, 103), 830 on.completed(310) 831 }); 832 833 auto ys2 = sc.make_cold_observable({ 834 on.next(20, 201), 835 on.next(120, 202), 836 on.next(220, 203), 837 on.completed(320) 838 }); 839 840 auto ys3 = sc.make_cold_observable({ 841 on.next(30, 301), 842 on.next(130, 302), 843 on.next(230, 303), 844 on.completed(330) 845 }); 846 847 auto xs = sc.make_cold_observable({ 848 o_on.next(100, ys1), 849 o_on.next(100, ys2), 850 o_on.next(100, ys3), 851 o_on.error(300, ex) 852 }); 853 854 WHEN("the first observable is selected to produce ints"){ 855 856 auto res = w.start( __anon371989c60b02() 857 [&]() { 858 return xs 859 .amb() 860 // forget type to workaround lambda deduction bug on msvc 2013 861 .as_dynamic(); 862 } 863 ); 864 865 THEN("the output contains ints from the first observable"){ 866 auto required = rxu::to_vector({ 867 on.next(310, 101), 868 on.next(410, 102), 869 on.error(500, ex) 870 }); 871 auto actual = res.get_observer().messages(); 872 REQUIRE(required == actual); 873 } 874 875 THEN("there was one subscription and one unsubscription to the xs"){ 876 auto required = rxu::to_vector({ 877 on.subscribe(200, 500) 878 }); 879 auto actual = xs.subscriptions(); 880 REQUIRE(required == actual); 881 } 882 883 THEN("there was one subscription and one unsubscription to the ys1"){ 884 auto required = rxu::to_vector({ 885 on.subscribe(300, 500) 886 }); 887 auto actual = ys1.subscriptions(); 888 REQUIRE(required == actual); 889 } 890 891 THEN("there was one subscription and one unsubscription to the ys2"){ 892 auto required = rxu::to_vector({ 893 on.subscribe(300, 310) 894 }); 895 auto actual = ys2.subscriptions(); 896 REQUIRE(required == actual); 897 } 898 899 THEN("there was one subscription and one unsubscription to the ys3"){ 900 auto required = rxu::to_vector({ 901 on.subscribe(300, 310) 902 }); 903 auto actual = ys3.subscriptions(); 904 REQUIRE(required == actual); 905 } 906 } 907 } 908 } 909 910 SCENARIO("amb never empty, custom coordination", "[amb][join][operators]"){ 911 GIVEN("1 cold observable with 2 hot observables of ints."){ 912 auto sc = rxsc::make_test(); 913 auto so = rx::synchronize_in_one_worker(sc); 914 auto w = sc.create_worker(); 915 const rxsc::test::messages<int> on; 916 const rxsc::test::messages<rx::observable<int>> o_on; 917 918 auto ys1 = sc.make_hot_observable({ 919 on.next(100, 1) 920 }); 921 922 auto ys2 = sc.make_hot_observable({ 923 on.next(110, 2), 924 on.completed(400) 925 }); 926 927 auto xs = sc.make_cold_observable({ 928 o_on.next(100, ys1), 929 o_on.next(100, ys2), 930 o_on.completed(150) 931 }); 932 933 WHEN("the first observable is selected to produce ints"){ 934 935 auto res = w.start( __anon371989c60c02() 936 [&]() { 937 return xs 938 .amb(so) 939 // forget type to workaround lambda deduction bug on msvc 2013 940 .as_dynamic(); 941 } 942 ); 943 944 THEN("the output contains only complete message"){ 945 auto required = rxu::to_vector({ 946 on.completed(401) 947 }); 948 auto actual = res.get_observer().messages(); 949 REQUIRE(required == actual); 950 } 951 } 952 } 953 } 954