1 #include "../test.h" 2 #include "rxcpp/operators/rx-time_interval.hpp" 3 4 using namespace std::chrono; 5 6 SCENARIO("should not emit time intervals if the source never emits any items", "[time_interval][operators]"){ 7 GIVEN("a source"){ 8 typedef rxsc::detail::test_type::clock_type::time_point::duration duration; 9 10 auto sc = rxsc::make_test(); 11 auto w = sc.create_worker(); 12 const rxsc::test::messages<int> on; 13 14 auto xs = sc.make_hot_observable({ 15 on.next(150, 1) 16 }); 17 18 WHEN("time_interval operator is invoked"){ 19 20 auto res = w.start( __anon0f8f85a60102() 21 [xs]() { 22 return xs 23 | rxo::time_interval(); 24 } 25 ); 26 27 THEN("the output is empty"){ 28 auto required = std::vector<rxsc::test::messages<duration>::recorded_type>(); 29 auto actual = res.get_observer().messages(); 30 REQUIRE(required == actual); 31 } 32 33 THEN("there was 1 subscription/unsubscription to the source"){ 34 auto required = rxu::to_vector({ 35 on.subscribe(200, 1000) 36 }); 37 auto actual = xs.subscriptions(); 38 REQUIRE(required == actual); 39 } 40 } 41 } 42 } 43 44 SCENARIO("should not emit time intervals if the source observable is empty", "[time_interval][operators]"){ 45 GIVEN("a source"){ 46 typedef rxsc::detail::test_type::clock_type::time_point::duration duration; 47 48 auto sc = rxsc::make_test(); 49 auto so = rx::synchronize_in_one_worker(sc); 50 auto w = sc.create_worker(); 51 const rxsc::test::messages<int> on; 52 const rxsc::test::messages<duration> on_time_interval; 53 54 auto xs = sc.make_hot_observable({ 55 on.next(150, 1), 56 on.completed(250) 57 }); 58 59 WHEN("time_interval operator is invoked"){ 60 61 auto res = w.start( __anon0f8f85a60202() 62 [so, xs]() { 63 return xs.time_interval(); 64 } 65 ); 66 67 THEN("the output only contains complete message"){ 68 auto required = rxu::to_vector({ 69 on_time_interval.completed(250) 70 }); 71 auto actual = res.get_observer().messages(); 72 REQUIRE(required == actual); 73 } 74 75 THEN("there was 1 subscription/unsubscription to the source"){ 76 auto required = rxu::to_vector({ 77 on.subscribe(200, 250) 78 }); 79 auto actual = xs.subscriptions(); 80 REQUIRE(required == actual); 81 } 82 83 } 84 } 85 } 86 87 SCENARIO("should emit time intervals for every item in the source observable", "[time_interval][operators]"){ 88 GIVEN("a source"){ 89 typedef rxsc::detail::test_type::clock_type clock_type; 90 typedef clock_type::time_point::duration duration; 91 92 auto sc = rxsc::make_test(); 93 auto so = rx::synchronize_in_one_worker(sc); 94 auto w = sc.create_worker(); 95 const rxsc::test::messages<int> on; 96 const rxsc::test::messages<duration> on_time_interval; 97 98 auto xs = sc.make_hot_observable({ 99 on.next(150, 1), 100 on.next(210, 2), 101 on.next(240, 3), 102 on.completed(250) 103 }); 104 105 WHEN("time_interval operator is invoked"){ 106 107 auto res = w.start( __anon0f8f85a60302() 108 [so, xs]() { 109 return xs.time_interval(so); 110 } 111 ); 112 113 THEN("the output contains the emitted items while subscribed"){ 114 auto required = rxu::to_vector({ 115 on_time_interval.next(210, milliseconds(10)), 116 on_time_interval.next(240, milliseconds(30)), 117 on_time_interval.completed(250) 118 }); 119 auto actual = res.get_observer().messages(); 120 REQUIRE(required == actual); 121 } 122 123 THEN("there was 1 subscription/unsubscription to the source"){ 124 auto required = rxu::to_vector({ 125 on.subscribe(200, 250) 126 }); 127 auto actual = xs.subscriptions(); 128 REQUIRE(required == actual); 129 } 130 131 } 132 } 133 } 134 135 SCENARIO("should emit time intervals and an error if there is an error", "[time_interval][operators]"){ 136 GIVEN("a source"){ 137 typedef rxsc::detail::test_type::clock_type clock_type; 138 typedef clock_type::time_point::duration duration; 139 140 auto sc = rxsc::make_test(); 141 auto so = rx::synchronize_in_one_worker(sc); 142 auto w = sc.create_worker(); 143 const rxsc::test::messages<int> on; 144 const rxsc::test::messages<duration> on_time_interval; 145 146 std::runtime_error ex("on_error from source"); 147 148 auto xs = sc.make_hot_observable({ 149 on.next(150, 1), 150 on.next(210, 2), 151 on.next(240, 3), 152 on.error(250, ex) 153 }); 154 155 WHEN("time_interval operator is invoked"){ 156 157 auto res = w.start( __anon0f8f85a60402() 158 [so, xs]() { 159 return xs.time_interval(so); 160 } 161 ); 162 163 THEN("the output contains emitted items and an error"){ 164 auto required = rxu::to_vector({ 165 on_time_interval.next(210, milliseconds(10)), 166 on_time_interval.next(240, milliseconds(30)), 167 on_time_interval.error(250, ex) 168 }); 169 auto actual = res.get_observer().messages(); 170 REQUIRE(required == actual); 171 } 172 173 THEN("there was 1 subscription/unsubscription to the source"){ 174 auto required = rxu::to_vector({ 175 on.subscribe(200, 250) 176 }); 177 auto actual = xs.subscriptions(); 178 REQUIRE(required == actual); 179 } 180 181 } 182 } 183 } 184