1 #include "../test.h" 2 #include <rxcpp/operators/rx-take_last.hpp> 3 4 SCENARIO("take last 0", "[take_last][operators]"){ 5 GIVEN("a source"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 auto xs = sc.make_hot_observable({ 11 on.next(150, 1), 12 on.next(210, 2), 13 on.next(220, 3), 14 on.next(230, 4), 15 on.next(240, 5), 16 on.completed(250) 17 }); 18 19 WHEN("0 last values are taken"){ 20 21 auto res = w.start( __anon1000c90e0102() 22 [xs]() { 23 return xs 24 | rxo::take_last(0) 25 // forget type to workaround lambda deduction bug on msvc 2013 26 | rxo::as_dynamic(); 27 } 28 ); 29 30 THEN("the output only contains the completion event"){ 31 auto required = rxu::to_vector({ 32 on.completed(250) 33 }); 34 auto actual = res.get_observer().messages(); 35 REQUIRE(required == actual); 36 } 37 38 THEN("there was 1 subscription/unsubscription to the source"){ 39 auto required = rxu::to_vector({ 40 on.subscribe(200, 250) 41 }); 42 auto actual = xs.subscriptions(); 43 REQUIRE(required == actual); 44 } 45 46 } 47 } 48 } 49 50 SCENARIO("take last 1", "[take_last][operators]"){ 51 GIVEN("a source"){ 52 auto sc = rxsc::make_test(); 53 auto w = sc.create_worker(); 54 const rxsc::test::messages<int> on; 55 56 auto xs = sc.make_hot_observable({ 57 on.next(150, 1), 58 on.next(210, 2), 59 on.next(220, 3), 60 on.next(230, 4), 61 on.next(240, 5), 62 on.completed(250) 63 }); 64 65 WHEN("1 last value is taken"){ 66 67 auto res = w.start( __anon1000c90e0202() 68 [xs]() { 69 return xs 70 .take_last(1) 71 // forget type to workaround lambda deduction bug on msvc 2013 72 .as_dynamic(); 73 } 74 ); 75 76 THEN("the output only contains items sent while subscribed"){ 77 auto required = rxu::to_vector({ 78 on.next(250, 5), 79 on.completed(250) 80 }); 81 auto actual = res.get_observer().messages(); 82 REQUIRE(required == actual); 83 } 84 85 THEN("there was 1 subscription/unsubscription to the source"){ 86 auto required = rxu::to_vector({ 87 on.subscribe(200, 250) 88 }); 89 auto actual = xs.subscriptions(); 90 REQUIRE(required == actual); 91 } 92 93 } 94 } 95 } 96 97 SCENARIO("take last 2", "[take_last][operators]"){ 98 GIVEN("a source"){ 99 auto sc = rxsc::make_test(); 100 auto w = sc.create_worker(); 101 const rxsc::test::messages<int> on; 102 103 auto xs = sc.make_hot_observable({ 104 on.next(150, 1), 105 on.next(210, 2), 106 on.next(220, 3), 107 on.next(230, 4), 108 on.next(240, 5), 109 on.completed(250) 110 }); 111 112 WHEN("2 last values are taken"){ 113 114 auto res = w.start( __anon1000c90e0302() 115 [xs]() { 116 return xs 117 .take_last(2) 118 // forget type to workaround lambda deduction bug on msvc 2013 119 .as_dynamic(); 120 } 121 ); 122 123 THEN("the output only contains items sent while subscribed"){ 124 auto required = rxu::to_vector({ 125 on.next(250, 4), 126 on.next(250, 5), 127 on.completed(250) 128 }); 129 auto actual = res.get_observer().messages(); 130 REQUIRE(required == actual); 131 } 132 133 THEN("there was 1 subscription/unsubscription to the source"){ 134 auto required = rxu::to_vector({ 135 on.subscribe(200, 250) 136 }); 137 auto actual = xs.subscriptions(); 138 REQUIRE(required == actual); 139 } 140 141 } 142 } 143 } 144 145 SCENARIO("take last 10, complete before all elements are taken", "[take_last][operators]"){ 146 GIVEN("a source"){ 147 auto sc = rxsc::make_test(); 148 auto w = sc.create_worker(); 149 const rxsc::test::messages<int> on; 150 151 auto xs = sc.make_hot_observable({ 152 on.next(150, 1), 153 on.next(210, 2), 154 on.next(220, 3), 155 on.next(230, 4), 156 on.next(240, 5), 157 on.completed(250) 158 }); 159 160 WHEN("10 last values are taken"){ 161 162 auto res = w.start( __anon1000c90e0402() 163 [xs]() { 164 return xs 165 .take_last(10) 166 // forget type to workaround lambda deduction bug on msvc 2013 167 .as_dynamic(); 168 } 169 ); 170 171 THEN("the output only contains items sent while subscribed"){ 172 auto required = rxu::to_vector({ 173 on.next(250, 2), 174 on.next(250, 3), 175 on.next(250, 4), 176 on.next(250, 5), 177 on.completed(250) 178 }); 179 auto actual = res.get_observer().messages(); 180 REQUIRE(required == actual); 181 } 182 183 THEN("there was 1 subscription/unsubscription to the source"){ 184 auto required = rxu::to_vector({ 185 on.subscribe(200, 250) 186 }); 187 auto actual = xs.subscriptions(); 188 REQUIRE(required == actual); 189 } 190 191 } 192 } 193 } 194 195 SCENARIO("no items to take_last", "[take_last][operators]"){ 196 GIVEN("a source"){ 197 auto sc = rxsc::make_test(); 198 auto so = rx::synchronize_in_one_worker(sc); 199 auto w = sc.create_worker(); 200 const rxsc::test::messages<int> on; 201 202 auto xs = sc.make_hot_observable({ 203 on.next(150, 1) 204 }); 205 206 WHEN("2 last values are taken"){ 207 208 auto res = w.start( __anon1000c90e0502() 209 [so, xs]() { 210 return xs 211 .take_last(2) 212 // forget type to workaround lambda deduction bug on msvc 2013 213 .as_dynamic(); 214 } 215 ); 216 217 THEN("the output is empty"){ 218 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 219 auto actual = res.get_observer().messages(); 220 REQUIRE(required == actual); 221 } 222 223 THEN("there was 1 subscription/unsubscription to the source"){ 224 auto required = rxu::to_vector({ 225 on.subscribe(200, 1000) 226 }); 227 auto actual = xs.subscriptions(); 228 REQUIRE(required == actual); 229 } 230 } 231 } 232 } 233 234 SCENARIO("take_last, source observable emits an error", "[take_last][operators]"){ 235 GIVEN("a source"){ 236 auto sc = rxsc::make_test(); 237 auto so = rx::synchronize_in_one_worker(sc); 238 auto w = sc.create_worker(); 239 const rxsc::test::messages<int> on; 240 241 std::runtime_error ex("on_error from source"); 242 243 auto xs = sc.make_hot_observable({ 244 on.next(150, 1), 245 on.error(250, ex) 246 }); 247 248 WHEN("2 last values are taken"){ 249 250 auto res = w.start( __anon1000c90e0602() 251 [so, xs]() { 252 return xs 253 .take_last(2) 254 // forget type to workaround lambda deduction bug on msvc 2013 255 .as_dynamic(); 256 } 257 ); 258 259 THEN("the output contains only an error message"){ 260 auto required = rxu::to_vector({ 261 on.error(250, ex) 262 }); 263 auto actual = res.get_observer().messages(); 264 REQUIRE(required == actual); 265 } 266 267 THEN("there was 1 subscription/unsubscription to the source"){ 268 auto required = rxu::to_vector({ 269 on.subscribe(200, 250) 270 }); 271 auto actual = xs.subscriptions(); 272 REQUIRE(required == actual); 273 } 274 275 } 276 } 277 } 278