1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-map.hpp> 4 #include <rxcpp/operators/rx-merge.hpp> 5 #include <rxcpp/operators/rx-window.hpp> 6 #include <rxcpp/operators/rx-window_time.hpp> 7 #include <rxcpp/operators/rx-window_time_count.hpp> 8 9 SCENARIO("window count, basic", "[window][operators]"){ 10 GIVEN("1 hot observable of ints."){ 11 auto sc = rxsc::make_test(); 12 auto w = sc.create_worker(); 13 const rxsc::test::messages<int> on; 14 const rxsc::test::messages<rx::observable<int>> o_on; 15 16 auto xs = sc.make_hot_observable({ 17 on.next(100, 1), 18 on.next(210, 2), 19 on.next(240, 3), 20 on.next(280, 4), 21 on.next(320, 5), 22 on.next(350, 6), 23 on.next(380, 7), 24 on.next(420, 8), 25 on.next(470, 9), 26 on.completed(600) 27 }); 28 29 WHEN("group each int with the next 2 ints"){ 30 auto res = w.start( __anon38a1318b0102() 31 [&]() { 32 return xs 33 .window(3, 2) 34 .merge() 35 // forget type to workaround lambda deduction bug on msvc 2013 36 .as_dynamic(); 37 } 38 ); 39 40 THEN("the output contains merged groups of ints"){ 41 auto required = rxu::to_vector({ 42 on.next(210, 2), 43 on.next(240, 3), 44 on.next(280, 4), 45 on.next(280, 4), 46 on.next(320, 5), 47 on.next(350, 6), 48 on.next(350, 6), 49 on.next(380, 7), 50 on.next(420, 8), 51 on.next(420, 8), 52 on.next(470, 9), 53 on.completed(600) 54 }); 55 auto actual = res.get_observer().messages(); 56 REQUIRE(required == actual); 57 } 58 59 THEN("there was one subscription and one unsubscription to the observable"){ 60 auto required = rxu::to_vector({ 61 o_on.subscribe(200, 600) 62 }); 63 auto actual = xs.subscriptions(); 64 REQUIRE(required == actual); 65 } 66 } 67 } 68 } 69 70 SCENARIO("window count, inner timings", "[window][operators]"){ 71 GIVEN("1 hot observable of ints."){ 72 auto sc = rxsc::make_test(); 73 auto w = sc.create_worker(); 74 const rxsc::test::messages<int> on; 75 const rxsc::test::messages<rx::observable<int>> o_on; 76 77 auto xs = sc.make_hot_observable({ 78 on.next(100, 1), 79 on.next(210, 2), 80 on.next(240, 3), 81 on.next(280, 4), 82 on.next(320, 5), 83 on.next(350, 6), 84 on.next(380, 7), 85 on.next(420, 8), 86 on.next(470, 9), 87 on.completed(600) 88 }); 89 90 WHEN("group each int with the next 2 ints"){ 91 auto res = rxcpp::observable<rxcpp::observable<int>>(); 92 auto windows = std::vector<rxcpp::observable<int>>(); 93 auto observers = std::vector<rxt::testable_observer<int>>(); 94 95 w.schedule_absolute( 96 rxsc::test::created_time, __anon38a1318b0202(const rxsc::schedulable&) 97 [&](const rxsc::schedulable&) { 98 res = xs 99 | rxo::window(3, 2); 100 } 101 ); 102 103 w.schedule_absolute( 104 rxsc::test::subscribed_time, __anon38a1318b0302(const rxsc::schedulable&) 105 [&](const rxsc::schedulable&) { 106 res.subscribe( 107 // on_next 108 [&](rx::observable<int> window) { 109 auto result = w.make_subscriber<int>(); 110 windows.push_back(window); 111 observers.push_back(result.get_observer()); 112 window.subscribe(result); 113 } 114 ); 115 } 116 ); 117 118 w.start(); 119 120 THEN("the output contains 5 windows"){ 121 REQUIRE(5 == observers.size()); 122 } 123 124 THEN("the 1st output window contains ints"){ 125 auto required = rxu::to_vector({ 126 on.next(210, 2), 127 on.next(240, 3), 128 on.next(280, 4), 129 on.completed(280) 130 }); 131 auto actual = observers[0].messages(); 132 REQUIRE(required == actual); 133 } 134 135 THEN("the 2nd output window contains ints"){ 136 auto required = rxu::to_vector({ 137 on.next(280, 4), 138 on.next(320, 5), 139 on.next(350, 6), 140 on.completed(350) 141 }); 142 auto actual = observers[1].messages(); 143 REQUIRE(required == actual); 144 } 145 146 THEN("the 3rd output window contains ints"){ 147 auto required = rxu::to_vector({ 148 on.next(350, 6), 149 on.next(380, 7), 150 on.next(420, 8), 151 on.completed(420) 152 }); 153 auto actual = observers[2].messages(); 154 REQUIRE(required == actual); 155 } 156 157 THEN("the 4th output window contains ints"){ 158 auto required = rxu::to_vector({ 159 on.next(420, 8), 160 on.next(470, 9), 161 on.completed(600) 162 }); 163 auto actual = observers[3].messages(); 164 REQUIRE(required == actual); 165 } 166 167 THEN("the 5th output window only contains complete message"){ 168 auto required = rxu::to_vector({ 169 on.completed(600) 170 }); 171 auto actual = observers[4].messages(); 172 REQUIRE(required == actual); 173 } 174 175 THEN("there was one subscription and one unsubscription to the observable"){ 176 auto required = rxu::to_vector({ 177 o_on.subscribe(200, 600) 178 }); 179 auto actual = xs.subscriptions(); 180 REQUIRE(required == actual); 181 } 182 } 183 } 184 } 185 186 SCENARIO("window count, dispose", "[window][operators]"){ 187 GIVEN("1 hot observable of ints."){ 188 auto sc = rxsc::make_test(); 189 auto w = sc.create_worker(); 190 const rxsc::test::messages<int> on; 191 const rxsc::test::messages<rx::observable<int>> o_on; 192 193 auto xs = sc.make_hot_observable({ 194 on.next(100, 1), 195 on.next(210, 2), 196 on.next(240, 3), 197 on.next(280, 4), 198 on.next(320, 5), 199 on.next(350, 6), 200 on.next(380, 7), 201 on.next(420, 8), 202 on.next(470, 9), 203 on.completed(600) 204 }); 205 206 WHEN("group each int with the next 2 ints"){ 207 auto res = w.start( __anon38a1318b0502() 208 [&]() { 209 return xs 210 .window(3, 2) 211 .merge() 212 // forget type to workaround lambda deduction bug on msvc 2013 213 .as_dynamic(); 214 }, 215 370 216 ); 217 218 THEN("the output contains merged groups of ints"){ 219 auto required = rxu::to_vector({ 220 on.next(210, 2), 221 on.next(240, 3), 222 on.next(280, 4), 223 on.next(280, 4), 224 on.next(320, 5), 225 on.next(350, 6), 226 on.next(350, 6) 227 }); 228 auto actual = res.get_observer().messages(); 229 REQUIRE(required == actual); 230 } 231 232 THEN("there was one subscription and one unsubscription to the observable"){ 233 auto required = rxu::to_vector({ 234 o_on.subscribe(200, 370) 235 }); 236 auto actual = xs.subscriptions(); 237 REQUIRE(required == actual); 238 } 239 } 240 } 241 } 242 243 SCENARIO("window count, error", "[window][operators]"){ 244 GIVEN("1 hot observable of ints."){ 245 auto sc = rxsc::make_test(); 246 auto w = sc.create_worker(); 247 const rxsc::test::messages<int> on; 248 const rxsc::test::messages<rx::observable<int>> o_on; 249 250 std::runtime_error ex("window on_error from source"); 251 252 auto xs = sc.make_hot_observable({ 253 on.next(100, 1), 254 on.next(210, 2), 255 on.next(240, 3), 256 on.next(280, 4), 257 on.next(320, 5), 258 on.next(350, 6), 259 on.next(380, 7), 260 on.next(420, 8), 261 on.next(470, 9), 262 on.error(600, ex) 263 }); 264 265 WHEN("group each int with the next 2 ints"){ 266 auto res = w.start( __anon38a1318b0602() 267 [&]() { 268 return xs 269 .window(3, 2) 270 .merge() 271 // forget type to workaround lambda deduction bug on msvc 2013 272 .as_dynamic(); 273 } 274 ); 275 276 THEN("the output contains merged groups of ints"){ 277 auto required = rxu::to_vector({ 278 on.next(210, 2), 279 on.next(240, 3), 280 on.next(280, 4), 281 on.next(280, 4), 282 on.next(320, 5), 283 on.next(350, 6), 284 on.next(350, 6), 285 on.next(380, 7), 286 on.next(420, 8), 287 on.next(420, 8), 288 on.next(470, 9), 289 on.error(600, ex) 290 }); 291 auto actual = res.get_observer().messages(); 292 REQUIRE(required == actual); 293 } 294 295 THEN("there was one subscription and one unsubscription to the observable"){ 296 auto required = rxu::to_vector({ 297 o_on.subscribe(200, 600) 298 }); 299 auto actual = xs.subscriptions(); 300 REQUIRE(required == actual); 301 } 302 } 303 } 304 } 305 306 SCENARIO("window with time, basic", "[window_with_time][operators]"){ 307 GIVEN("1 hot observable of ints."){ 308 auto sc = rxsc::make_test(); 309 auto so = rx::synchronize_in_one_worker(sc); 310 auto w = sc.create_worker(); 311 const rxsc::test::messages<int> on; 312 const rxsc::test::messages<rx::observable<int>> o_on; 313 314 auto xs = sc.make_hot_observable({ 315 on.next(150, 1), 316 on.next(210, 2), 317 on.next(240, 3), 318 on.next(270, 4), 319 on.next(320, 5), 320 on.next(360, 6), 321 on.next(390, 7), 322 on.next(410, 8), 323 on.next(460, 9), 324 on.next(470, 10), 325 on.completed(490) 326 }); 327 328 WHEN("group ints by 100 time units"){ 329 using namespace std::chrono; 330 331 auto res = w.start( __anon38a1318b0702() 332 [&]() { 333 return xs 334 | rxo::window_with_time(milliseconds(100), milliseconds(50), so) 335 | rxo::merge() 336 // forget type to workaround lambda deduction bug on msvc 2013 337 | rxo::as_dynamic(); 338 } 339 ); 340 341 THEN("the output contains merged groups of ints"){ 342 auto required = rxu::to_vector({ 343 on.next(211, 2), 344 on.next(241, 3), 345 on.next(271, 4), 346 on.next(271, 4), 347 on.next(321, 5), 348 on.next(321, 5), 349 on.next(361, 6), 350 on.next(361, 6), 351 on.next(391, 7), 352 on.next(391, 7), 353 on.next(411, 8), 354 on.next(411, 8), 355 on.next(461, 9), 356 on.next(461, 9), 357 on.next(471, 10), 358 on.next(471, 10), 359 on.completed(491) 360 }); 361 auto actual = res.get_observer().messages(); 362 REQUIRE(required == actual); 363 } 364 365 THEN("there was one subscription and one unsubscription to the observable"){ 366 auto required = rxu::to_vector({ 367 o_on.subscribe(200, 490) 368 }); 369 auto actual = xs.subscriptions(); 370 REQUIRE(required == actual); 371 } 372 } 373 } 374 } 375 376 SCENARIO("window with time, basic same", "[window_with_time][operators]"){ 377 GIVEN("1 hot observable of ints."){ 378 auto sc = rxsc::make_test(); 379 auto so = rx::synchronize_in_one_worker(sc); 380 auto w = sc.create_worker(); 381 const rxsc::test::messages<int> on; 382 const rxsc::test::messages<rx::observable<int>> o_on; 383 384 auto xs = sc.make_hot_observable({ 385 on.next(150, 1), 386 on.next(210, 2), 387 on.next(240, 3), 388 on.next(270, 4), 389 on.next(320, 5), 390 on.next(360, 6), 391 on.next(390, 7), 392 on.next(410, 8), 393 on.next(460, 9), 394 on.next(470, 10), 395 on.completed(490) 396 }); 397 398 WHEN("group each int with the next 2 ints"){ 399 using namespace std::chrono; 400 401 auto res = w.start( __anon38a1318b0802() 402 [&]() { 403 return xs 404 .window_with_time(milliseconds(100), so) 405 .merge() 406 // forget type to workaround lambda deduction bug on msvc 2013 407 .as_dynamic(); 408 } 409 ); 410 411 THEN("the output contains merged groups of ints"){ 412 auto required = rxu::to_vector({ 413 on.next(211, 2), 414 on.next(241, 3), 415 on.next(271, 4), 416 on.next(321, 5), 417 on.next(361, 6), 418 on.next(391, 7), 419 on.next(411, 8), 420 on.next(461, 9), 421 on.next(471, 10), 422 on.completed(491) 423 }); 424 auto actual = res.get_observer().messages(); 425 REQUIRE(required == actual); 426 } 427 428 THEN("there was one subscription and one unsubscription to the observable"){ 429 auto required = rxu::to_vector({ 430 o_on.subscribe(200, 490) 431 }); 432 auto actual = xs.subscriptions(); 433 REQUIRE(required == actual); 434 } 435 } 436 } 437 } 438 439 SCENARIO("window with time, basic 1", "[window_with_time][operators]"){ 440 GIVEN("1 hot observable of ints."){ 441 auto sc = rxsc::make_test(); 442 auto so = rx::synchronize_in_one_worker(sc); 443 auto w = sc.create_worker(); 444 const rxsc::test::messages<int> on; 445 const rxsc::test::messages<rx::observable<int>> o_on; 446 447 auto xs = sc.make_hot_observable({ 448 on.next(100, 1), 449 on.next(210, 2), 450 on.next(240, 3), 451 on.next(280, 4), 452 on.next(320, 5), 453 on.next(350, 6), 454 on.next(380, 7), 455 on.next(420, 8), 456 on.next(470, 9), 457 on.completed(600) 458 }); 459 460 WHEN("group each int with the next 2 ints"){ 461 using namespace std::chrono; 462 463 auto res = w.start( __anon38a1318b0902() 464 [&]() { 465 return xs 466 .window_with_time(milliseconds(100), milliseconds(70), so) 467 .merge() 468 // forget type to workaround lambda deduction bug on msvc 2013 469 .as_dynamic(); 470 } 471 ); 472 473 THEN("the output contains merged groups of ints"){ 474 auto required = rxu::to_vector({ 475 on.next(211, 2), 476 on.next(241, 3), 477 on.next(281, 4), 478 on.next(281, 4), 479 on.next(321, 5), 480 on.next(351, 6), 481 on.next(351, 6), 482 on.next(381, 7), 483 on.next(421, 8), 484 on.next(421, 8), 485 on.next(471, 9), 486 on.completed(601) 487 }); 488 auto actual = res.get_observer().messages(); 489 REQUIRE(required == actual); 490 } 491 492 THEN("there was one subscription and one unsubscription to the observable"){ 493 auto required = rxu::to_vector({ 494 o_on.subscribe(200, 600) 495 }); 496 auto actual = xs.subscriptions(); 497 REQUIRE(required == actual); 498 } 499 } 500 } 501 } 502 503 SCENARIO("window with time, basic 2", "[window_with_time][operators]"){ 504 GIVEN("1 hot observable of ints."){ 505 auto sc = rxsc::make_test(); 506 auto so = rx::synchronize_in_one_worker(sc); 507 auto w = sc.create_worker(); 508 const rxsc::test::messages<int> on; 509 const rxsc::test::messages<rx::observable<int>> o_on; 510 511 auto xs = sc.make_hot_observable({ 512 on.next(100, 1), 513 on.next(210, 2), 514 on.next(240, 3), 515 on.next(280, 4), 516 on.next(320, 5), 517 on.next(350, 6), 518 on.next(380, 7), 519 on.next(420, 8), 520 on.next(470, 9), 521 on.completed(600) 522 }); 523 524 WHEN("group each int with the next 2 ints"){ 525 using namespace std::chrono; 526 527 auto res = w.start( __anon38a1318b0a02() 528 [&]() { 529 return xs 530 .window_with_time(milliseconds(70), milliseconds(100), so) 531 .merge() 532 // forget type to workaround lambda deduction bug on msvc 2013 533 .as_dynamic(); 534 } 535 ); 536 537 THEN("the output contains merged groups of ints"){ 538 auto required = rxu::to_vector({ 539 on.next(211, 2), 540 on.next(241, 3), 541 on.next(321, 5), 542 on.next(351, 6), 543 on.next(421, 8), 544 on.next(471, 9), 545 on.completed(601) 546 }); 547 auto actual = res.get_observer().messages(); 548 REQUIRE(required == actual); 549 } 550 551 THEN("there was one subscription and one unsubscription to the observable"){ 552 auto required = rxu::to_vector({ 553 o_on.subscribe(200, 600) 554 }); 555 auto actual = xs.subscriptions(); 556 REQUIRE(required == actual); 557 } 558 } 559 } 560 } 561 562 SCENARIO("window with time, error", "[window_with_time][operators]"){ 563 GIVEN("1 hot observable of ints."){ 564 auto sc = rxsc::make_test(); 565 auto so = rx::synchronize_in_one_worker(sc); 566 auto w = sc.create_worker(); 567 const rxsc::test::messages<int> on; 568 const rxsc::test::messages<rx::observable<int>> o_on; 569 570 std::runtime_error ex("window_with_time on_error from source"); 571 572 auto xs = sc.make_hot_observable({ 573 on.next(100, 1), 574 on.next(210, 2), 575 on.next(240, 3), 576 on.next(280, 4), 577 on.next(320, 5), 578 on.next(350, 6), 579 on.next(380, 7), 580 on.next(420, 8), 581 on.next(470, 9), 582 on.error(600, ex) 583 }); 584 585 WHEN("group each int with the next 2 ints"){ 586 using namespace std::chrono; 587 588 auto res = w.start( __anon38a1318b0b02() 589 [&]() { 590 return xs 591 .window_with_time(milliseconds(100), milliseconds(70), so) 592 .merge() 593 // forget type to workaround lambda deduction bug on msvc 2013 594 .as_dynamic(); 595 } 596 ); 597 598 THEN("the output contains merged groups of ints"){ 599 auto required = rxu::to_vector({ 600 on.next(211, 2), 601 on.next(241, 3), 602 on.next(281, 4), 603 on.next(281, 4), 604 on.next(321, 5), 605 on.next(351, 6), 606 on.next(351, 6), 607 on.next(381, 7), 608 on.next(421, 8), 609 on.next(421, 8), 610 on.next(471, 9), 611 on.error(601, ex) 612 }); 613 auto actual = res.get_observer().messages(); 614 REQUIRE(required == actual); 615 } 616 617 THEN("there was one subscription and one unsubscription to the observable"){ 618 auto required = rxu::to_vector({ 619 o_on.subscribe(200, 600) 620 }); 621 auto actual = xs.subscriptions(); 622 REQUIRE(required == actual); 623 } 624 } 625 } 626 } 627 628 SCENARIO("window with time, disposed", "[window_with_time][operators]"){ 629 GIVEN("1 hot observable of ints."){ 630 auto sc = rxsc::make_test(); 631 auto so = rx::synchronize_in_one_worker(sc); 632 auto w = sc.create_worker(); 633 const rxsc::test::messages<int> on; 634 const rxsc::test::messages<rx::observable<int>> o_on; 635 636 auto xs = sc.make_hot_observable({ 637 on.next(100, 1), 638 on.next(210, 2), 639 on.next(240, 3), 640 on.next(280, 4), 641 on.next(320, 5), 642 on.next(350, 6), 643 on.next(380, 7), 644 on.next(420, 8), 645 on.next(470, 9), 646 on.completed(600) 647 }); 648 649 WHEN("group each int with the next 2 ints"){ 650 using namespace std::chrono; 651 652 auto res = w.start( __anon38a1318b0c02() 653 [&]() { 654 return xs 655 .window_with_time(milliseconds(100), milliseconds(70), so) 656 .merge() 657 // forget type to workaround lambda deduction bug on msvc 2013 658 .as_dynamic(); 659 }, 660 370 661 ); 662 663 THEN("the output contains merged groups of ints"){ 664 auto required = rxu::to_vector({ 665 on.next(211, 2), 666 on.next(241, 3), 667 on.next(281, 4), 668 on.next(281, 4), 669 on.next(321, 5), 670 on.next(351, 6), 671 on.next(351, 6), 672 }); 673 auto actual = res.get_observer().messages(); 674 REQUIRE(required == actual); 675 } 676 677 THEN("there was one subscription and one unsubscription to the observable"){ 678 auto required = rxu::to_vector({ 679 o_on.subscribe(200, 371) 680 }); 681 auto actual = xs.subscriptions(); 682 REQUIRE(required == actual); 683 } 684 } 685 } 686 } 687 688 SCENARIO("window with time, basic same 1", "[window_with_time][operators]"){ 689 GIVEN("1 hot observable of ints."){ 690 auto sc = rxsc::make_test(); 691 auto so = rx::synchronize_in_one_worker(sc); 692 auto w = sc.create_worker(); 693 const rxsc::test::messages<int> on; 694 const rxsc::test::messages<rx::observable<int>> o_on; 695 696 auto xs = sc.make_hot_observable({ 697 on.next(100, 1), 698 on.next(210, 2), 699 on.next(240, 3), 700 on.next(280, 4), 701 on.next(320, 5), 702 on.next(350, 6), 703 on.next(380, 7), 704 on.next(420, 8), 705 on.next(470, 9), 706 on.completed(600) 707 }); 708 709 WHEN("group each int with the next 2 ints"){ 710 using namespace std::chrono; 711 712 auto res = w.start( __anon38a1318b0d02() 713 [&]() { 714 return xs 715 .window_with_time(milliseconds(70), so) 716 .merge() 717 // forget type to workaround lambda deduction bug on msvc 2013 718 .as_dynamic(); 719 } 720 ); 721 722 THEN("the output contains merged groups of ints"){ 723 auto required = rxu::to_vector({ 724 on.next(211, 2), 725 on.next(241, 3), 726 on.next(281, 4), 727 on.next(321, 5), 728 on.next(351, 6), 729 on.next(381, 7), 730 on.next(421, 8), 731 on.next(471, 9), 732 on.completed(601) 733 }); 734 auto actual = res.get_observer().messages(); 735 REQUIRE(required == actual); 736 } 737 738 THEN("there was one subscription and one unsubscription to the observable"){ 739 auto required = rxu::to_vector({ 740 o_on.subscribe(200, 600) 741 }); 742 auto actual = xs.subscriptions(); 743 REQUIRE(required == actual); 744 } 745 } 746 } 747 } 748 749 SCENARIO("window with time or count, basic", "[window_with_time_or_count][operators]"){ 750 GIVEN("1 hot observable of ints."){ 751 auto sc = rxsc::make_test(); 752 auto so = rx::synchronize_in_one_worker(sc); 753 auto w = sc.create_worker(); 754 const rxsc::test::messages<int> on; 755 const rxsc::test::messages<rx::observable<int>> o_on; 756 757 auto xs = sc.make_hot_observable({ 758 on.next(205, 1), 759 on.next(210, 2), 760 on.next(240, 3), 761 on.next(280, 4), 762 on.next(320, 5), 763 on.next(350, 6), 764 on.next(370, 7), 765 on.next(420, 8), 766 on.next(470, 9), 767 on.completed(600) 768 }); 769 770 WHEN("group each int with the next 2 ints"){ 771 using namespace std::chrono; 772 773 auto res = w.start( __anon38a1318b0e02() 774 [&]() { 775 return xs 776 | rxo::window_with_time_or_count(milliseconds(70), 3, so) 777 | rxo::merge() 778 // forget type to workaround lambda deduction bug on msvc 2013 779 | rxo::as_dynamic(); 780 } 781 ); 782 783 THEN("the output contains merged groups of ints"){ 784 auto required = rxu::to_vector({ 785 on.next(206, 1), 786 on.next(211, 2), 787 on.next(241, 3), 788 on.next(281, 4), 789 on.next(321, 5), 790 on.next(351, 6), 791 on.next(371, 7), 792 on.next(421, 8), 793 on.next(471, 9), 794 on.completed(601) 795 }); 796 auto actual = res.get_observer().messages(); 797 REQUIRE(required == actual); 798 } 799 800 THEN("there was one subscription and one unsubscription to the observable"){ 801 auto required = rxu::to_vector({ 802 o_on.subscribe(200, 600) 803 }); 804 auto actual = xs.subscriptions(); 805 REQUIRE(required == actual); 806 } 807 } 808 } 809 } 810 811 SCENARIO("window with time or count, error", "[window_with_time_or_count][operators]"){ 812 GIVEN("1 hot observable of ints."){ 813 auto sc = rxsc::make_test(); 814 auto so = rx::synchronize_in_one_worker(sc); 815 auto w = sc.create_worker(); 816 const rxsc::test::messages<int> on; 817 const rxsc::test::messages<rx::observable<int>> o_on; 818 819 std::runtime_error ex("window_with_time on_error from source"); 820 821 auto xs = sc.make_hot_observable({ 822 on.next(205, 1), 823 on.next(210, 2), 824 on.next(240, 3), 825 on.next(280, 4), 826 on.next(320, 5), 827 on.next(350, 6), 828 on.next(370, 7), 829 on.next(420, 8), 830 on.next(470, 9), 831 on.error(600, ex) 832 }); 833 834 WHEN("group each int with the next 2 ints"){ 835 using namespace std::chrono; 836 837 auto res = w.start( __anon38a1318b0f02() 838 [&]() { 839 return xs 840 .window_with_time_or_count(milliseconds(70), 3, so) 841 .merge() 842 // forget type to workaround lambda deduction bug on msvc 2013 843 .as_dynamic(); 844 } 845 ); 846 847 THEN("the output contains merged groups of ints"){ 848 auto required = rxu::to_vector({ 849 on.next(206, 1), 850 on.next(211, 2), 851 on.next(241, 3), 852 on.next(281, 4), 853 on.next(321, 5), 854 on.next(351, 6), 855 on.next(371, 7), 856 on.next(421, 8), 857 on.next(471, 9), 858 on.error(601, ex) 859 }); 860 auto actual = res.get_observer().messages(); 861 REQUIRE(required == actual); 862 } 863 864 THEN("there was one subscription and one unsubscription to the observable"){ 865 auto required = rxu::to_vector({ 866 o_on.subscribe(200, 600) 867 }); 868 auto actual = xs.subscriptions(); 869 REQUIRE(required == actual); 870 } 871 } 872 } 873 } 874 875 SCENARIO("window with time or count, disposed", "[window_with_time_or_count][operators]"){ 876 GIVEN("1 hot observable of ints."){ 877 auto sc = rxsc::make_test(); 878 auto so = rx::synchronize_in_one_worker(sc); 879 auto w = sc.create_worker(); 880 const rxsc::test::messages<int> on; 881 const rxsc::test::messages<rx::observable<int>> o_on; 882 883 auto xs = sc.make_hot_observable({ 884 on.next(205, 1), 885 on.next(210, 2), 886 on.next(240, 3), 887 on.next(280, 4), 888 on.next(320, 5), 889 on.next(350, 6), 890 on.next(370, 7), 891 on.next(420, 8), 892 on.next(470, 9), 893 on.completed(600) 894 }); 895 896 WHEN("group each int with the next 2 ints"){ 897 using namespace std::chrono; 898 899 auto res = w.start( __anon38a1318b1002() 900 [&]() { 901 return xs 902 .window_with_time_or_count(milliseconds(70), 3, so) 903 .merge() 904 // forget type to workaround lambda deduction bug on msvc 2013 905 .as_dynamic(); 906 }, 907 372 908 ); 909 910 THEN("the output contains merged groups of ints"){ 911 auto required = rxu::to_vector({ 912 on.next(206, 1), 913 on.next(211, 2), 914 on.next(241, 3), 915 on.next(281, 4), 916 on.next(321, 5), 917 on.next(351, 6), 918 on.next(371, 7) 919 }); 920 auto actual = res.get_observer().messages(); 921 REQUIRE(required == actual); 922 } 923 924 THEN("there was one subscription and one unsubscription to the observable"){ 925 auto required = rxu::to_vector({ 926 o_on.subscribe(200, 373) 927 }); 928 auto actual = xs.subscriptions(); 929 REQUIRE(required == actual); 930 } 931 } 932 } 933 } 934 935 SCENARIO("window with time or count, only time triggered", "[window_with_time_or_count][operators]"){ 936 GIVEN("1 hot observable of ints."){ 937 auto sc = rxsc::make_test(); 938 auto so = rx::synchronize_in_one_worker(sc); 939 auto w = sc.create_worker(); 940 const rxsc::test::messages<int> on; 941 const rxsc::test::messages<rx::observable<int>> o_on; 942 943 auto xs = sc.make_hot_observable({ 944 on.next(205, 1), 945 on.next(305, 2), 946 on.next(505, 3), 947 on.next(605, 4), 948 on.next(610, 5), 949 on.completed(850) 950 }); 951 952 WHEN("group each int with the next 2 ints"){ 953 using namespace std::chrono; 954 955 auto res = w.start( __anon38a1318b1102() 956 [&]() { 957 return xs 958 .window_with_time_or_count(milliseconds(100), 3, so) 959 .merge() 960 // forget type to workaround lambda deduction bug on msvc 2013 961 .as_dynamic(); 962 } 963 ); 964 965 THEN("the output contains merged groups of ints"){ 966 auto required = rxu::to_vector({ 967 on.next(206, 1), 968 on.next(306, 2), 969 on.next(506, 3), 970 on.next(606, 4), 971 on.next(611, 5), 972 on.completed(851) 973 }); 974 auto actual = res.get_observer().messages(); 975 REQUIRE(required == actual); 976 } 977 978 THEN("there was one subscription and one unsubscription to the observable"){ 979 auto required = rxu::to_vector({ 980 o_on.subscribe(200, 850) 981 }); 982 auto actual = xs.subscriptions(); 983 REQUIRE(required == actual); 984 } 985 } 986 } 987 } 988 989 SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){ 990 GIVEN("1 hot observable of ints."){ 991 auto sc = rxsc::make_test(); 992 auto so = rx::synchronize_in_one_worker(sc); 993 auto w = sc.create_worker(); 994 const rxsc::test::messages<int> on; 995 const rxsc::test::messages<rx::observable<int>> o_on; 996 997 auto xs = sc.make_hot_observable({ 998 on.next(205, 1), 999 on.next(305, 2), 1000 on.next(505, 3), 1001 on.next(605, 4), 1002 on.next(610, 5), 1003 on.completed(850) 1004 }); 1005 1006 WHEN("group each int with the next 2 ints"){ 1007 using namespace std::chrono; 1008 1009 auto res = w.start( __anon38a1318b1202() 1010 [&]() { 1011 return xs 1012 .window_with_time_or_count(milliseconds(370), 2, so) 1013 .map([](rx::observable<int> w){ 1014 return w.count(); 1015 }) 1016 .merge() 1017 // forget type to workaround lambda deduction bug on msvc 2013 1018 .as_dynamic(); 1019 } 1020 ); 1021 1022 THEN("the output contains merged groups of ints"){ 1023 auto required = rxu::to_vector({ 1024 on.next(306, 2), 1025 on.next(606, 2), 1026 on.next(851, 1), 1027 on.completed(851) 1028 }); 1029 auto actual = res.get_observer().messages(); 1030 REQUIRE(required == actual); 1031 } 1032 1033 THEN("there was one subscription and one unsubscription to the observable"){ 1034 auto required = rxu::to_vector({ 1035 o_on.subscribe(200, 850) 1036 }); 1037 auto actual = xs.subscriptions(); 1038 REQUIRE(required == actual); 1039 } 1040 } 1041 } 1042 } 1043