1 #include "../test.h" 2 3 #include <rxcpp/rx-coroutine.hpp> 4 5 #ifdef _RESUMABLE_FUNCTIONS_SUPPORTED 6 7 SCENARIO("coroutine completes", "[coroutine]"){ 8 GIVEN("a source") { 9 auto sc = rxsc::make_test(); 10 auto w = sc.create_worker(); 11 const rxsc::test::messages<int> on; 12 13 auto xs = sc.make_hot_observable({ 14 on.next(110, 1), 15 on.next(210, 2), 16 on.next(310, 10), 17 on.completed(350) 18 }); 19 20 WHEN("for co_await"){ 21 22 std::vector<typename rxsc::test::messages<int>::recorded_type> messages; 23 24 w.advance_to(rxsc::test::subscribed_time); 25 __anon5129b6260102() 26 auto d = [&]() -> std::future<void> { 27 RXCPP_TRY { 28 for co_await (auto n : xs | rxo::as_dynamic()) { 29 messages.push_back(on.next(w.clock(), n)); 30 } 31 messages.push_back(on.completed(w.clock())); 32 } RXCPP_CATCH(...) { 33 messages.push_back(on.error(w.clock(), rxu::current_exception())); 34 } 35 }(); 36 37 w.advance_to(rxsc::test::unsubscribed_time); 38 39 THEN("the function completed"){ 40 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready); 41 } 42 43 THEN("the output only contains true"){ 44 auto required = rxu::to_vector({ 45 on.next(210, 2), 46 on.next(310, 10), 47 on.completed(350) 48 }); 49 auto actual = messages; 50 REQUIRE(required == actual); 51 } 52 53 THEN("there was 1 subscription/unsubscription to the source"){ 54 auto required = rxu::to_vector({ 55 on.subscribe(200, 350) 56 }); 57 auto actual = xs.subscriptions(); 58 REQUIRE(required == actual); 59 } 60 61 } 62 } 63 } 64 65 SCENARIO("coroutine errors", "[coroutine]"){ 66 GIVEN("a source") { 67 auto sc = rxsc::make_test(); 68 auto w = sc.create_worker(); 69 const rxsc::test::messages<int> on; 70 71 std::runtime_error ex("error in source"); 72 73 auto xs = sc.make_hot_observable({ 74 on.next(110, 1), 75 on.next(210, 2), 76 on.error(310, ex), 77 on.next(310, 10), 78 on.completed(350) 79 }); 80 81 WHEN("for co_await"){ 82 83 std::vector<typename rxsc::test::messages<int>::recorded_type> messages; 84 85 w.advance_to(rxsc::test::subscribed_time); 86 __anon5129b6260202() 87 auto d = [&]() -> std::future<void> { 88 RXCPP_TRY { 89 for co_await (auto n : xs | rxo::as_dynamic()) { 90 messages.push_back(on.next(w.clock(), n)); 91 } 92 messages.push_back(on.completed(w.clock())); 93 } RXCPP_CATCH(...) { 94 messages.push_back(on.error(w.clock(), rxu::current_exception())); 95 } 96 }(); 97 98 w.advance_to(rxsc::test::unsubscribed_time); 99 100 THEN("the function completed"){ 101 REQUIRE(d.wait_for(std::chrono::seconds(0)) == std::future_status::ready); 102 } 103 104 THEN("the output only contains true"){ 105 auto required = rxu::to_vector({ 106 on.next(210, 2), 107 on.error(310, ex) 108 }); 109 auto actual = messages; 110 REQUIRE(required == actual); 111 } 112 113 THEN("there was 1 subscription/unsubscription to the source"){ 114 auto required = rxu::to_vector({ 115 on.subscribe(200, 310) 116 }); 117 auto actual = xs.subscriptions(); 118 REQUIRE(required == actual); 119 } 120 121 } 122 } 123 } 124 125 #endif 126