1 #include "../test.h" 2 #include <rxcpp/operators/rx-map.hpp> 3 #include <rxcpp/operators/rx-take.hpp> 4 #include <rxcpp/operators/rx-scan.hpp> 5 6 SCENARIO("scan: issue 41", "[scan][operators][issue][!hide]"){ 7 GIVEN("map of scan of interval"){ 8 auto sc = rxsc::make_current_thread(); 9 auto so = rxcpp::synchronize_in_one_worker(sc); 10 auto start = sc.now() + std::chrono::seconds(2); 11 auto period = std::chrono::seconds(1); 12 13 rxcpp::observable<>::interval(start, period, so) __anonde9ebd360102(int a, int i) 14 .scan(0, [] (int a, int i) { return a + i; }) __anonde9ebd360202(int i) 15 .map([] (int i) { return i * i; }) 16 .take(10) __anonde9ebd360302(int i) 17 .subscribe([] (int i) { std::cout << i << std::endl; }); 18 19 } 20 } 21 22 SCENARIO("scan: seed, never", "[scan][operators]"){ 23 GIVEN("a test hot observable of ints"){ 24 auto sc = rxsc::make_test(); 25 auto w = sc.create_worker(); 26 const rxsc::test::messages<int> on; 27 28 int seed = 1; 29 30 auto xs = sc.make_hot_observable({ 31 on.next(150, 1), 32 }); 33 34 WHEN("mapped to ints that are one larger"){ 35 36 auto res = w.start( __anonde9ebd360402() 37 [&]() { 38 return xs 39 | rxo::scan(seed, [](int sum, int x) { 40 return sum + x; 41 }) 42 // forget type to workaround lambda deduction bug on msvc 2013 43 | rxo::as_dynamic(); 44 } 45 ); 46 47 THEN("the output is empty"){ 48 auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); 49 auto actual = res.get_observer().messages(); 50 REQUIRE(required == actual); 51 } 52 53 THEN("there was one subscription and one unsubscription"){ 54 auto required = rxu::to_vector({ 55 on.subscribe(200, 1000) 56 }); 57 auto actual = xs.subscriptions(); 58 REQUIRE(required == actual); 59 } 60 } 61 } 62 } 63 64 SCENARIO("scan: seed, empty", "[scan][operators]"){ 65 GIVEN("a test hot observable of ints"){ 66 auto sc = rxsc::make_test(); 67 auto w = sc.create_worker(); 68 const rxsc::test::messages<int> on; 69 70 int seed = 1; 71 72 auto xs = sc.make_hot_observable({ 73 on.next(150, 1), 74 on.completed(250) 75 }); 76 77 WHEN("mapped to ints that are one larger"){ 78 79 auto res = w.start( __anonde9ebd360602() 80 [&]() { 81 return xs 82 .scan(seed, [](int sum, int x) { 83 return sum + x; 84 }) 85 // forget type to workaround lambda deduction bug on msvc 2013 86 .as_dynamic(); 87 } 88 ); 89 90 THEN("the output stops on completion"){ 91 auto required = rxu::to_vector({ 92 on.completed(250) 93 }); 94 auto actual = res.get_observer().messages(); 95 REQUIRE(required == actual); 96 } 97 98 THEN("there was one subscription and one unsubscription"){ 99 auto required = rxu::to_vector({ 100 on.subscribe(200, 250) 101 }); 102 auto actual = xs.subscriptions(); 103 REQUIRE(required == actual); 104 } 105 } 106 } 107 } 108 109 SCENARIO("scan: seed, return", "[scan][operators]"){ 110 GIVEN("a test hot observable of ints"){ 111 auto sc = rxsc::make_test(); 112 auto w = sc.create_worker(); 113 const rxsc::test::messages<int> on; 114 115 int seed = 1; 116 117 auto xs = sc.make_hot_observable({ 118 on.next(150, 1), 119 on.next(220, 2), 120 on.completed(250) 121 }); 122 123 WHEN("mapped to ints that are one larger"){ 124 125 auto res = w.start( __anonde9ebd360802() 126 [&]() { 127 return xs 128 .scan(seed, [](int sum, int x) { 129 return sum + x; 130 }) 131 // forget type to workaround lambda deduction bug on msvc 2013 132 .as_dynamic(); 133 } 134 ); 135 136 THEN("the output stops on completion"){ 137 auto required = rxu::to_vector({ 138 on.next(220, seed + 2), 139 on.completed(250) 140 }); 141 auto actual = res.get_observer().messages(); 142 REQUIRE(required == actual); 143 } 144 145 THEN("there was one subscription and one unsubscription"){ 146 auto required = rxu::to_vector({ 147 on.subscribe(200, 250) 148 }); 149 auto actual = xs.subscriptions(); 150 REQUIRE(required == actual); 151 } 152 } 153 } 154 } 155 156 SCENARIO("scan: seed, throw", "[scan][operators]"){ 157 GIVEN("a test hot observable of ints"){ 158 auto sc = rxsc::make_test(); 159 auto w = sc.create_worker(); 160 const rxsc::test::messages<int> on; 161 162 int seed = 1; 163 164 std::runtime_error ex("scan on_error from source"); 165 166 auto xs = sc.make_hot_observable({ 167 on.next(150, 1), 168 on.error(250, ex) 169 }); 170 171 WHEN("mapped to ints that are one larger"){ 172 173 auto res = w.start( __anonde9ebd360a02() 174 [&]() { 175 return xs 176 .scan(seed, [](int sum, int x) { 177 return sum + x; 178 }) 179 // forget type to workaround lambda deduction bug on msvc 2013 180 .as_dynamic(); 181 } 182 ); 183 184 THEN("the output stops on error"){ 185 auto required = rxu::to_vector({ 186 on.error(250, ex) 187 }); 188 auto actual = res.get_observer().messages(); 189 REQUIRE(required == actual); 190 } 191 192 THEN("there was one subscription and one unsubscription"){ 193 auto required = rxu::to_vector({ 194 on.subscribe(200, 250) 195 }); 196 auto actual = xs.subscriptions(); 197 REQUIRE(required == actual); 198 } 199 } 200 } 201 } 202 203 SCENARIO("scan: seed, some data", "[scan][operators]"){ 204 GIVEN("a test hot observable of ints"){ 205 auto sc = rxsc::make_test(); 206 auto w = sc.create_worker(); 207 const rxsc::test::messages<int> on; 208 209 int seed = 1; 210 211 auto xs = sc.make_hot_observable({ 212 on.next(150, 1), 213 on.next(210, 2), 214 on.next(220, 3), 215 on.next(230, 4), 216 on.next(240, 5), 217 on.completed(250) 218 }); 219 220 WHEN("mapped to ints that are one larger"){ 221 222 auto res = w.start( __anonde9ebd360c02() 223 [&]() { 224 return xs 225 .scan(seed, [](int sum, int x) { 226 return sum + x; 227 }) 228 // forget type to workaround lambda deduction bug on msvc 2013 229 .as_dynamic(); 230 } 231 ); 232 233 THEN("the output stops on completion"){ 234 auto required = rxu::to_vector({ 235 on.next(210, seed + 2), 236 on.next(220, seed + 2 + 3), 237 on.next(230, seed + 2 + 3 + 4), 238 on.next(240, seed + 2 + 3 + 4 + 5), 239 on.completed(250) 240 }); 241 auto actual = res.get_observer().messages(); 242 REQUIRE(required == actual); 243 } 244 245 THEN("there was one subscription and one unsubscription"){ 246 auto required = rxu::to_vector({ 247 on.subscribe(200, 250) 248 }); 249 auto actual = xs.subscriptions(); 250 REQUIRE(required == actual); 251 } 252 } 253 } 254 } 255 256 SCENARIO("scan: seed, accumulator throws", "[scan][operators][!throws]"){ 257 GIVEN("a test hot observable of ints"){ 258 auto sc = rxsc::make_test(); 259 auto w = sc.create_worker(); 260 const rxsc::test::messages<int> on; 261 262 int seed = 1; 263 264 std::runtime_error ex("scan on_error from source"); 265 266 auto xs = sc.make_hot_observable({ 267 on.next(150, 1), 268 on.next(210, 2), 269 on.next(220, 3), 270 on.next(230, 4), 271 on.next(240, 5), 272 on.completed(250) 273 }); 274 275 WHEN("mapped to ints that are one larger"){ 276 277 auto res = w.start( __anonde9ebd360e02() 278 [&]() { 279 return xs 280 .scan(seed, [&](int sum, int x) { 281 if (x == 4) { 282 rxu::throw_exception(ex); 283 } 284 return sum + x; 285 }) 286 // forget type to workaround lambda deduction bug on msvc 2013 287 .as_dynamic(); 288 } 289 ); 290 291 THEN("the output stops on error"){ 292 auto required = rxu::to_vector({ 293 on.next(210, seed + 2), 294 on.next(220, seed + 2 + 3), 295 on.error(230, ex) 296 }); 297 auto actual = res.get_observer().messages(); 298 REQUIRE(required == actual); 299 } 300 301 THEN("there was one subscription and one unsubscription"){ 302 auto required = rxu::to_vector({ 303 on.subscribe(200, 230) 304 }); 305 auto actual = xs.subscriptions(); 306 REQUIRE(required == actual); 307 } 308 } 309 } 310 } 311