1 #include "../test.h" 2 #include <rxcpp/operators/rx-replay.hpp> 3 4 SCENARIO("replay basic", "[replay][multicast][subject][operators]"){ 5 GIVEN("a test hot observable of ints"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto xs = sc.make_hot_observable({ 11 on.next(110, 0), 12 on.next(220, 1), 13 on.next(280, 2), 14 on.next(290, 3), 15 on.next(340, 4), 16 on.next(360, 5), 17 on.next(370, 6), 18 on.next(390, 7), 19 on.next(410, 8), 20 on.next(430, 9), 21 on.next(450, 10), 22 on.next(520, 11), 23 on.next(560, 12), 24 on.completed(600) 25 }); 26 27 auto res = w.make_subscriber<int>(); 28 29 rx::connectable_observable<int> ys; 30 31 WHEN("subscribed and then connected"){ 32 33 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe00102(const rxsc::schedulable&)34 [&ys, &xs](const rxsc::schedulable&){ 35 ys = xs.replay().as_dynamic(); 36 }); 37 38 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe00202(const rxsc::schedulable&)39 [&ys, &res](const rxsc::schedulable&){ 40 ys.subscribe(res); 41 }); 42 43 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe00302(const rxsc::schedulable&)44 [&res](const rxsc::schedulable&){ 45 res.unsubscribe(); 46 }); 47 48 { 49 rx::composite_subscription connection; 50 51 w.schedule_absolute(300, __anon724c1fe00402(const rxsc::schedulable&)52 [connection, &ys](const rxsc::schedulable&){ 53 ys.connect(connection); 54 }); 55 w.schedule_absolute(400, __anon724c1fe00502(const rxsc::schedulable&)56 [connection](const rxsc::schedulable&){ 57 connection.unsubscribe(); 58 }); 59 } 60 61 { 62 rx::composite_subscription connection; 63 64 w.schedule_absolute(500, __anon724c1fe00602(const rxsc::schedulable&)65 [connection, &ys](const rxsc::schedulable&){ 66 ys.connect(connection); 67 }); 68 w.schedule_absolute(550, __anon724c1fe00702(const rxsc::schedulable&)69 [connection](const rxsc::schedulable&){ 70 connection.unsubscribe(); 71 }); 72 } 73 74 { 75 rx::composite_subscription connection; 76 77 w.schedule_absolute(650, __anon724c1fe00802(const rxsc::schedulable&)78 [connection, &ys](const rxsc::schedulable&){ 79 ys.connect(connection); 80 }); 81 w.schedule_absolute(800, __anon724c1fe00902(const rxsc::schedulable&)82 [connection](const rxsc::schedulable&){ 83 connection.unsubscribe(); 84 }); 85 } 86 87 w.start(); 88 89 THEN("the output only contains items sent while subscribed"){ 90 auto required = rxu::to_vector({ 91 on.next(340, 4), 92 on.next(360, 5), 93 on.next(370, 6), 94 on.next(390, 7), 95 on.next(520, 11) 96 }); 97 auto actual = res.get_observer().messages(); 98 REQUIRE(required == actual); 99 } 100 101 THEN("there were 3 subscription/unsubscription"){ 102 auto required = rxu::to_vector({ 103 on.subscribe(300, 400), 104 on.subscribe(500, 550), 105 on.subscribe(650, 800) 106 }); 107 auto actual = xs.subscriptions(); 108 REQUIRE(required == actual); 109 } 110 111 } 112 } 113 } 114 115 SCENARIO("replay error", "[replay][error][multicast][subject][operators]"){ 116 GIVEN("a test hot observable of ints"){ 117 auto sc = rxsc::make_test(); 118 auto w = sc.create_worker(); 119 const rxsc::test::messages<int> on; 120 121 std::runtime_error ex("publish on_error"); 122 123 auto xs = sc.make_hot_observable({ 124 on.next(110, 0), 125 on.next(220, 1), 126 on.next(280, 2), 127 on.next(290, 3), 128 on.next(340, 4), 129 on.next(360, 5), 130 on.next(370, 6), 131 on.next(390, 7), 132 on.next(410, 8), 133 on.next(430, 9), 134 on.next(450, 10), 135 on.next(520, 11), 136 on.next(560, 12), 137 on.error(600, ex) 138 }); 139 140 auto res = w.make_subscriber<int>(); 141 142 rx::connectable_observable<int> ys; 143 144 WHEN("subscribed and then connected"){ 145 146 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe00a02(const rxsc::schedulable&)147 [&ys, &xs](const rxsc::schedulable&){ 148 ys = xs.replay().as_dynamic(); 149 }); 150 151 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe00b02(const rxsc::schedulable&)152 [&ys, &res](const rxsc::schedulable&){ 153 ys.subscribe(res); 154 }); 155 156 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe00c02(const rxsc::schedulable&)157 [&res](const rxsc::schedulable&){ 158 res.unsubscribe(); 159 }); 160 161 { 162 rx::composite_subscription connection; 163 164 w.schedule_absolute(300, __anon724c1fe00d02(const rxsc::schedulable&)165 [connection, &ys](const rxsc::schedulable&){ 166 ys.connect(connection); 167 }); 168 w.schedule_absolute(400, __anon724c1fe00e02(const rxsc::schedulable&)169 [connection](const rxsc::schedulable&){ 170 connection.unsubscribe(); 171 }); 172 } 173 174 { 175 rx::composite_subscription connection; 176 177 w.schedule_absolute(500, __anon724c1fe00f02(const rxsc::schedulable&)178 [connection, &ys](const rxsc::schedulable&){ 179 ys.connect(connection); 180 }); 181 w.schedule_absolute(800, __anon724c1fe01002(const rxsc::schedulable&)182 [connection](const rxsc::schedulable&){ 183 connection.unsubscribe(); 184 }); 185 } 186 187 w.start(); 188 189 THEN("the output only contains items sent while subscribed"){ 190 auto required = rxu::to_vector({ 191 on.next(340, 4), 192 on.next(360, 5), 193 on.next(370, 6), 194 on.next(390, 7), 195 on.next(520, 11), 196 on.next(560, 12), 197 on.error(600, ex) 198 }); 199 auto actual = res.get_observer().messages(); 200 REQUIRE(required == actual); 201 } 202 203 THEN("there were 2 subscription/unsubscription"){ 204 auto required = rxu::to_vector({ 205 on.subscribe(300, 400), 206 on.subscribe(500, 600) 207 }); 208 auto actual = xs.subscriptions(); 209 REQUIRE(required == actual); 210 } 211 212 } 213 } 214 } 215 216 SCENARIO("replay multiple subscriptions", "[replay][multicast][subject][operators]"){ 217 GIVEN("a test hot observable of ints"){ 218 auto sc = rxsc::make_test(); 219 auto w = sc.create_worker(); 220 const rxsc::test::messages<int> on; 221 222 auto xs = sc.make_hot_observable({ 223 on.next(110, 0), 224 on.next(220, 1), 225 on.next(280, 2), 226 on.next(290, 3), 227 on.next(340, 4), 228 on.next(360, 5), 229 on.next(370, 6), 230 on.next(390, 7), 231 on.next(410, 8), 232 on.next(430, 9), 233 on.next(450, 10), 234 on.next(520, 11), 235 on.next(560, 12), 236 on.completed(650) 237 }); 238 239 rx::connectable_observable<int> ys; 240 241 WHEN("subscribed and then connected"){ 242 243 // Create connectable observable 244 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe01102(const rxsc::schedulable&)245 [&ys, &xs](const rxsc::schedulable&){ 246 ys = xs.replay().as_dynamic(); 247 }); 248 249 // Manage connection 250 rx::composite_subscription connection; 251 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe01202(const rxsc::schedulable&)252 [connection, &ys](const rxsc::schedulable&){ 253 ys.connect(connection); 254 }); 255 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe01302(const rxsc::schedulable&)256 [connection](const rxsc::schedulable&){ 257 connection.unsubscribe(); 258 }); 259 260 // Subscribe before the first item emitted 261 auto res1 = w.make_subscriber<int>(); __anon724c1fe01402(const rxsc::schedulable&)262 w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); 263 264 // Subscribe in the middle of emitting 265 auto res2 = w.make_subscriber<int>(); __anon724c1fe01502(const rxsc::schedulable&)266 w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); 267 268 // Subscribe after the last item emitted 269 auto res3 = w.make_subscriber<int>(); __anon724c1fe01602(const rxsc::schedulable&)270 w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); 271 272 w.start(); 273 274 THEN("the output only contains items sent while subscribed"){ 275 auto required = rxu::to_vector({ 276 on.next(220, 1), 277 on.next(280, 2), 278 on.next(290, 3), 279 on.next(340, 4), 280 on.next(360, 5), 281 on.next(370, 6), 282 on.next(390, 7), 283 on.next(410, 8), 284 on.next(430, 9), 285 on.next(450, 10), 286 on.next(520, 11), 287 on.next(560, 12), 288 on.completed(650) 289 }); 290 auto actual = res1.get_observer().messages(); 291 REQUIRE(required == actual); 292 } 293 294 THEN("the output only contains items sent while subscribed"){ 295 auto required = rxu::to_vector({ 296 on.next(400, 1), 297 on.next(400, 2), 298 on.next(400, 3), 299 on.next(400, 4), 300 on.next(400, 5), 301 on.next(400, 6), 302 on.next(400, 7), 303 on.next(410, 8), 304 on.next(430, 9), 305 on.next(450, 10), 306 on.next(520, 11), 307 on.next(560, 12), 308 on.completed(650) 309 }); 310 auto actual = res2.get_observer().messages(); 311 REQUIRE(required == actual); 312 } 313 314 THEN("the output only contains items sent while subscribed"){ 315 auto required = rxu::to_vector({ 316 on.next(600, 1), 317 on.next(600, 2), 318 on.next(600, 3), 319 on.next(600, 4), 320 on.next(600, 5), 321 on.next(600, 6), 322 on.next(600, 7), 323 on.next(600, 8), 324 on.next(600, 9), 325 on.next(600, 10), 326 on.next(600, 11), 327 on.next(600, 12), 328 on.completed(650) 329 }); 330 auto actual = res3.get_observer().messages(); 331 REQUIRE(required == actual); 332 } 333 334 THEN("there was 1 subscription/unsubscription"){ 335 auto required = rxu::to_vector({ 336 on.subscribe(200, 650) 337 }); 338 auto actual = xs.subscriptions(); 339 REQUIRE(required == actual); 340 } 341 342 } 343 } 344 } 345 346 SCENARIO("replay multiple subscriptions with count", "[replay][multicast][subject][operators]"){ 347 GIVEN("a test hot observable of ints"){ 348 auto sc = rxsc::make_test(); 349 auto w = sc.create_worker(); 350 const rxsc::test::messages<int> on; 351 352 auto xs = sc.make_hot_observable({ 353 on.next(110, 0), 354 on.next(220, 1), 355 on.next(280, 2), 356 on.next(290, 3), 357 on.next(340, 4), 358 on.next(360, 5), 359 on.next(370, 6), 360 on.next(390, 7), 361 on.next(410, 8), 362 on.next(430, 9), 363 on.next(450, 10), 364 on.next(520, 11), 365 on.next(560, 12), 366 on.completed(650) 367 }); 368 369 rx::connectable_observable<int> ys; 370 371 WHEN("subscribed and then connected"){ 372 373 // Create connectable observable 374 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe01702(const rxsc::schedulable&)375 [&ys, &xs](const rxsc::schedulable&){ 376 ys = xs.replay(3).as_dynamic(); 377 }); 378 379 // Manage connection 380 rx::composite_subscription connection; 381 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe01802(const rxsc::schedulable&)382 [connection, &ys](const rxsc::schedulable&){ 383 ys.connect(connection); 384 }); 385 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe01902(const rxsc::schedulable&)386 [connection](const rxsc::schedulable&){ 387 connection.unsubscribe(); 388 }); 389 390 // Subscribe before the first item emitted 391 auto res1 = w.make_subscriber<int>(); __anon724c1fe01a02(const rxsc::schedulable&)392 w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); 393 394 // Subscribe in the middle of emitting 395 auto res2 = w.make_subscriber<int>(); __anon724c1fe01b02(const rxsc::schedulable&)396 w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); 397 398 // Subscribe after the last item emitted 399 auto res3 = w.make_subscriber<int>(); __anon724c1fe01c02(const rxsc::schedulable&)400 w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); 401 402 w.start(); 403 404 THEN("the output only contains items sent while subscribed"){ 405 auto required = rxu::to_vector({ 406 on.next(220, 1), 407 on.next(280, 2), 408 on.next(290, 3), 409 on.next(340, 4), 410 on.next(360, 5), 411 on.next(370, 6), 412 on.next(390, 7), 413 on.next(410, 8), 414 on.next(430, 9), 415 on.next(450, 10), 416 on.next(520, 11), 417 on.next(560, 12), 418 on.completed(650) 419 }); 420 auto actual = res1.get_observer().messages(); 421 REQUIRE(required == actual); 422 } 423 424 THEN("the output only contains items sent while subscribed"){ 425 auto required = rxu::to_vector({ 426 on.next(400, 5), 427 on.next(400, 6), 428 on.next(400, 7), 429 on.next(410, 8), 430 on.next(430, 9), 431 on.next(450, 10), 432 on.next(520, 11), 433 on.next(560, 12), 434 on.completed(650) 435 }); 436 auto actual = res2.get_observer().messages(); 437 REQUIRE(required == actual); 438 } 439 440 THEN("the output only contains items sent while subscribed"){ 441 auto required = rxu::to_vector({ 442 on.next(600, 10), 443 on.next(600, 11), 444 on.next(600, 12), 445 on.completed(650) 446 }); 447 auto actual = res3.get_observer().messages(); 448 REQUIRE(required == actual); 449 } 450 451 THEN("there was 1 subscription/unsubscription"){ 452 auto required = rxu::to_vector({ 453 on.subscribe(200, 650) 454 }); 455 auto actual = xs.subscriptions(); 456 REQUIRE(required == actual); 457 } 458 459 } 460 } 461 } 462 463 SCENARIO("replay multiple subscriptions with time", "[replay][multicast][subject][operators]"){ 464 GIVEN("a test hot observable of ints"){ 465 auto sc = rxsc::make_test(); 466 auto w = sc.create_worker(); 467 auto so = rx::identity_one_worker(sc); 468 const rxsc::test::messages<int> on; 469 470 auto xs = sc.make_hot_observable({ 471 on.next(110, 0), 472 on.next(220, 1), 473 on.next(240, 2), 474 on.next(260, 3), 475 on.next(340, 4), 476 on.next(360, 5), 477 on.next(370, 6), 478 on.next(390, 7), 479 on.next(410, 8), 480 on.next(430, 9), 481 on.next(450, 10), 482 on.next(520, 11), 483 on.next(560, 12), 484 on.completed(650) 485 }); 486 487 rx::connectable_observable<int> ys; 488 489 WHEN("subscribed and then connected"){ 490 using namespace std::chrono; 491 492 // Create connectable observable 493 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe01d02(const rxsc::schedulable&)494 [&](const rxsc::schedulable&){ 495 ys = xs.replay(milliseconds(100), so).as_dynamic(); 496 }); 497 498 // Manage connection 499 rx::composite_subscription connection; 500 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe01e02(const rxsc::schedulable&)501 [connection, &ys](const rxsc::schedulable&){ 502 ys.connect(connection); 503 }); 504 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe01f02(const rxsc::schedulable&)505 [connection](const rxsc::schedulable&){ 506 connection.unsubscribe(); 507 }); 508 509 // Subscribe before the first item emitted 510 auto res1 = w.make_subscriber<int>(); __anon724c1fe02002(const rxsc::schedulable&)511 w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); 512 513 // Subscribe in the middle of emitting 514 auto res2 = w.make_subscriber<int>(); __anon724c1fe02102(const rxsc::schedulable&)515 w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); 516 517 // Subscribe after the last item emitted 518 auto res3 = w.make_subscriber<int>(); __anon724c1fe02202(const rxsc::schedulable&)519 w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); 520 521 w.start(); 522 523 THEN("the output only contains items sent while subscribed"){ 524 auto required = rxu::to_vector({ 525 on.next(220, 1), 526 on.next(240, 2), 527 on.next(260, 3), 528 on.next(340, 4), 529 on.next(360, 5), 530 on.next(370, 6), 531 on.next(390, 7), 532 on.next(410, 8), 533 on.next(430, 9), 534 on.next(450, 10), 535 on.next(520, 11), 536 on.next(560, 12), 537 on.completed(650) 538 }); 539 auto actual = res1.get_observer().messages(); 540 REQUIRE(required == actual); 541 } 542 543 THEN("the output only contains items sent while subscribed"){ 544 auto required = rxu::to_vector({ 545 on.next(400, 4), 546 on.next(400, 5), 547 on.next(400, 6), 548 on.next(400, 7), 549 on.next(410, 8), 550 on.next(430, 9), 551 on.next(450, 10), 552 on.next(520, 11), 553 on.next(560, 12), 554 on.completed(650) 555 }); 556 auto actual = res2.get_observer().messages(); 557 REQUIRE(required == actual); 558 } 559 560 THEN("the output only contains items sent while subscribed"){ 561 auto required = rxu::to_vector({ 562 on.next(600, 11), 563 on.next(600, 12), 564 on.completed(650) 565 }); 566 auto actual = res3.get_observer().messages(); 567 REQUIRE(required == actual); 568 } 569 570 THEN("there was 1 subscription/unsubscription"){ 571 auto required = rxu::to_vector({ 572 on.subscribe(200, 650) 573 }); 574 auto actual = xs.subscriptions(); 575 REQUIRE(required == actual); 576 } 577 578 } 579 } 580 } 581 582 SCENARIO("replay multiple subscriptions with count and time", "[replay][multicast][subject][operators]"){ 583 GIVEN("a test hot observable of ints"){ 584 auto sc = rxsc::make_test(); 585 auto w = sc.create_worker(); 586 auto so = rx::identity_one_worker(sc); 587 const rxsc::test::messages<int> on; 588 589 auto xs = sc.make_hot_observable({ 590 on.next(110, 0), 591 on.next(220, 1), 592 on.next(240, 2), 593 on.next(260, 3), 594 on.next(340, 4), 595 on.next(360, 5), 596 on.next(370, 6), 597 on.next(390, 7), 598 on.next(410, 8), 599 on.next(430, 9), 600 on.next(450, 10), 601 on.next(520, 11), 602 on.next(560, 12), 603 on.completed(650) 604 }); 605 606 rx::connectable_observable<int> ys; 607 608 WHEN("subscribed and then connected"){ 609 using namespace std::chrono; 610 611 // Create connectable observable 612 w.schedule_absolute(rxsc::test::created_time, __anon724c1fe02302(const rxsc::schedulable&)613 [&](const rxsc::schedulable&){ 614 ys = xs.replay(3, milliseconds(100), so).as_dynamic(); 615 }); 616 617 // Manage connection 618 rx::composite_subscription connection; 619 w.schedule_absolute(rxsc::test::subscribed_time, __anon724c1fe02402(const rxsc::schedulable&)620 [connection, &ys](const rxsc::schedulable&){ 621 ys.connect(connection); 622 }); 623 w.schedule_absolute(rxsc::test::unsubscribed_time, __anon724c1fe02502(const rxsc::schedulable&)624 [connection](const rxsc::schedulable&){ 625 connection.unsubscribe(); 626 }); 627 628 // Subscribe before the first item emitted 629 auto res1 = w.make_subscriber<int>(); __anon724c1fe02602(const rxsc::schedulable&)630 w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); 631 632 // Subscribe in the middle of emitting 633 auto res2 = w.make_subscriber<int>(); __anon724c1fe02702(const rxsc::schedulable&)634 w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); 635 636 // Subscribe after the last item emitted 637 auto res3 = w.make_subscriber<int>(); __anon724c1fe02802(const rxsc::schedulable&)638 w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); 639 640 w.start(); 641 642 THEN("the output only contains items sent while subscribed"){ 643 auto required = rxu::to_vector({ 644 on.next(220, 1), 645 on.next(240, 2), 646 on.next(260, 3), 647 on.next(340, 4), 648 on.next(360, 5), 649 on.next(370, 6), 650 on.next(390, 7), 651 on.next(410, 8), 652 on.next(430, 9), 653 on.next(450, 10), 654 on.next(520, 11), 655 on.next(560, 12), 656 on.completed(650) 657 }); 658 auto actual = res1.get_observer().messages(); 659 REQUIRE(required == actual); 660 } 661 662 THEN("the output only contains items sent while subscribed"){ 663 auto required = rxu::to_vector({ 664 on.next(400, 5), 665 on.next(400, 6), 666 on.next(400, 7), 667 on.next(410, 8), 668 on.next(430, 9), 669 on.next(450, 10), 670 on.next(520, 11), 671 on.next(560, 12), 672 on.completed(650) 673 }); 674 auto actual = res2.get_observer().messages(); 675 REQUIRE(required == actual); 676 } 677 678 THEN("the output only contains items sent while subscribed"){ 679 auto required = rxu::to_vector({ 680 on.next(600, 11), 681 on.next(600, 12), 682 on.completed(650) 683 }); 684 auto actual = res3.get_observer().messages(); 685 REQUIRE(required == actual); 686 } 687 688 THEN("there was 1 subscription/unsubscription"){ 689 auto required = rxu::to_vector({ 690 on.subscribe(200, 650) 691 }); 692 auto actual = xs.subscriptions(); 693 REQUIRE(required == actual); 694 } 695 696 } 697 } 698 } 699