1 #include "../test.h" 2 #include "rxcpp/operators/rx-reduce.hpp" 3 4 SCENARIO("reduce some data with seed", "[reduce][operators]"){ 5 GIVEN("a test hot observable of ints"){ 6 auto sc = rxsc::make_test(); 7 auto w = sc.create_worker(); 8 const rxsc::test::messages<int> on; 9 10 int seed = 42; 11 12 auto xs = sc.make_hot_observable({ 13 on.next(150, 1), 14 on.next(210, 0), 15 on.next(220, 1), 16 on.next(230, 2), 17 on.next(240, 3), 18 on.next(250, 4), 19 on.completed(260) 20 }); 21 22 WHEN("mapped to ints that are one larger"){ 23 24 auto res = w.start( __anonb3a3a90d0102() 25 [&]() { 26 return xs 27 .reduce(seed, 28 [](int sum, int x) { 29 return sum + x; 30 }, 31 [](int sum) { 32 return sum * 5; 33 }) 34 // forget type to workaround lambda deduction bug on msvc 2013 35 .as_dynamic(); 36 } 37 ); 38 39 THEN("the output stops on completion"){ 40 auto required = rxu::to_vector({ 41 on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5), 42 on.completed(260) 43 }); 44 auto actual = res.get_observer().messages(); 45 REQUIRE(required == actual); 46 } 47 48 THEN("there was one subscription and one unsubscription"){ 49 auto required = rxu::to_vector({ 50 on.subscribe(200, 260) 51 }); 52 auto actual = xs.subscriptions(); 53 REQUIRE(required == actual); 54 } 55 } 56 } 57 } 58 59 SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){ 60 GIVEN("a test hot observable of ints"){ 61 auto sc = rxsc::make_test(); 62 auto w = sc.create_worker(); 63 const rxsc::test::messages<int> on; 64 65 int seed = 42; 66 67 auto xs = sc.make_hot_observable({ 68 on.next(150, 1), 69 on.next(210, 0), 70 on.next(220, 1), 71 on.next(230, 2), 72 on.next(240, 3), 73 on.next(250, 4), 74 on.completed(260) 75 }); 76 77 WHEN("mapped to ints that are one larger"){ 78 79 auto res = w.start( __anonb3a3a90d0402() 80 [&]() { 81 return xs 82 .accumulate(seed, 83 [](int sum, int x) { 84 return sum + x; 85 }, 86 [](int sum) { 87 return sum * 5; 88 }) 89 // forget type to workaround lambda deduction bug on msvc 2013 90 .as_dynamic(); 91 } 92 ); 93 94 THEN("the output stops on completion"){ 95 auto required = rxu::to_vector({ 96 on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5), 97 on.completed(260) 98 }); 99 auto actual = res.get_observer().messages(); 100 REQUIRE(required == actual); 101 } 102 103 THEN("there was one subscription and one unsubscription"){ 104 auto required = rxu::to_vector({ 105 on.subscribe(200, 260) 106 }); 107 auto actual = xs.subscriptions(); 108 REQUIRE(required == actual); 109 } 110 } 111 } 112 } 113 114 SCENARIO("average some data", "[reduce][average][operators]"){ 115 GIVEN("a test hot observable of ints"){ 116 auto sc = rxsc::make_test(); 117 auto w = sc.create_worker(); 118 const rxsc::test::messages<int> on; 119 const rxsc::test::messages<double> d_on; 120 121 auto xs = sc.make_hot_observable({ 122 on.next(150, 1), 123 on.next(210, 3), 124 on.next(220, 4), 125 on.next(230, 2), 126 on.completed(250) 127 }); 128 129 WHEN("mapped to ints that are one larger"){ 130 131 auto res = w.start( __anonb3a3a90d0702() 132 [&]() { 133 return xs.average(); 134 } 135 ); 136 137 THEN("the output stops on completion"){ 138 auto required = rxu::to_vector({ 139 d_on.next(250, 3.0), 140 d_on.completed(250) 141 }); 142 auto actual = res.get_observer().messages(); 143 REQUIRE(required == actual); 144 } 145 146 THEN("there was one subscription and one unsubscription"){ 147 auto required = rxu::to_vector({ 148 on.subscribe(200, 250) 149 }); 150 auto actual = xs.subscriptions(); 151 REQUIRE(required == actual); 152 } 153 } 154 } 155 } 156 157 SCENARIO("sum some data", "[reduce][sum][operators]"){ 158 GIVEN("a test hot observable of ints"){ 159 auto sc = rxsc::make_test(); 160 auto w = sc.create_worker(); 161 const rxsc::test::messages<int> on; 162 const rxsc::test::messages<int> d_on; 163 164 auto xs = sc.make_hot_observable({ 165 on.next(150, 1), 166 on.next(210, 3), 167 on.next(220, 4), 168 on.next(230, 2), 169 on.completed(250) 170 }); 171 172 WHEN("sum is calculated"){ 173 174 auto res = w.start( __anonb3a3a90d0802() 175 [&]() { 176 return xs.sum(); 177 } 178 ); 179 180 THEN("the output contains the sum of source values"){ 181 auto required = rxu::to_vector({ 182 d_on.next(250, 9), 183 d_on.completed(250) 184 }); 185 auto actual = res.get_observer().messages(); 186 REQUIRE(required == actual); 187 } 188 189 THEN("there was one subscription and one unsubscription"){ 190 auto required = rxu::to_vector({ 191 on.subscribe(200, 250) 192 }); 193 auto actual = xs.subscriptions(); 194 REQUIRE(required == actual); 195 } 196 } 197 } 198 } 199 200 SCENARIO("max", "[reduce][max][operators]"){ 201 GIVEN("a test hot observable of ints"){ 202 auto sc = rxsc::make_test(); 203 auto w = sc.create_worker(); 204 const rxsc::test::messages<int> on; 205 const rxsc::test::messages<int> d_on; 206 207 auto xs = sc.make_hot_observable({ 208 on.next(150, 1), 209 on.next(210, 3), 210 on.next(220, 4), 211 on.next(230, 2), 212 on.completed(250) 213 }); 214 215 WHEN("max is calculated"){ 216 217 auto res = w.start( __anonb3a3a90d0902() 218 [&]() { 219 return xs.max(); 220 } 221 ); 222 223 THEN("the output contains the max of source values"){ 224 auto required = rxu::to_vector({ 225 d_on.next(250, 4), 226 d_on.completed(250) 227 }); 228 auto actual = res.get_observer().messages(); 229 REQUIRE(required == actual); 230 } 231 232 THEN("there was one subscription and one unsubscription"){ 233 auto required = rxu::to_vector({ 234 on.subscribe(200, 250) 235 }); 236 auto actual = xs.subscriptions(); 237 REQUIRE(required == actual); 238 } 239 } 240 } 241 } 242 243 // Does not work because calling max() on an empty stream throws an exception 244 // which will crash when exceptions are disabled. 245 // 246 // TODO: the max internal implementation should be rewritten not to throw exceptions. 247 SCENARIO("max, empty", "[reduce][max][operators][!throws]"){ 248 GIVEN("a test hot observable of ints"){ 249 auto sc = rxsc::make_test(); 250 auto w = sc.create_worker(); 251 const rxsc::test::messages<int> on; 252 const rxsc::test::messages<int> d_on; 253 254 std::runtime_error ex("max on_error"); 255 256 auto xs = sc.make_hot_observable({ 257 on.next(150, 1), 258 on.completed(250) 259 }); 260 261 WHEN("max is calculated"){ 262 263 auto res = w.start( __anonb3a3a90d0a02() 264 [&]() { 265 return xs.max(); 266 } 267 ); 268 269 THEN("the output contains only error message"){ 270 auto required = rxu::to_vector({ 271 d_on.error(250, ex) 272 }); 273 auto actual = res.get_observer().messages(); 274 REQUIRE(required == actual); 275 } 276 277 THEN("there was one subscription and one unsubscription"){ 278 auto required = rxu::to_vector({ 279 on.subscribe(200, 250) 280 }); 281 auto actual = xs.subscriptions(); 282 REQUIRE(required == actual); 283 } 284 } 285 } 286 } 287 288 SCENARIO("max, error", "[reduce][max][operators]"){ 289 GIVEN("a test hot observable of ints"){ 290 auto sc = rxsc::make_test(); 291 auto w = sc.create_worker(); 292 const rxsc::test::messages<int> on; 293 const rxsc::test::messages<int> d_on; 294 295 std::runtime_error ex("max on_error from source"); 296 297 auto xs = sc.make_hot_observable({ 298 on.next(150, 1), 299 on.error(250, ex) 300 }); 301 302 WHEN("max is calculated"){ 303 304 auto res = w.start( __anonb3a3a90d0b02() 305 [&]() { 306 return xs.max(); 307 } 308 ); 309 310 THEN("the output contains only error message"){ 311 auto required = rxu::to_vector({ 312 d_on.error(250, ex) 313 }); 314 auto actual = res.get_observer().messages(); 315 REQUIRE(required == actual); 316 } 317 318 THEN("there was one subscription and one unsubscription"){ 319 auto required = rxu::to_vector({ 320 on.subscribe(200, 250) 321 }); 322 auto actual = xs.subscriptions(); 323 REQUIRE(required == actual); 324 } 325 } 326 } 327 } 328 329 SCENARIO("min", "[reduce][min][operators]"){ 330 GIVEN("a test hot observable of ints"){ 331 auto sc = rxsc::make_test(); 332 auto w = sc.create_worker(); 333 const rxsc::test::messages<int> on; 334 const rxsc::test::messages<int> d_on; 335 336 auto xs = sc.make_hot_observable({ 337 on.next(150, 1), 338 on.next(210, 3), 339 on.next(220, 4), 340 on.next(230, 2), 341 on.completed(250) 342 }); 343 344 WHEN("min is calculated"){ 345 346 auto res = w.start( __anonb3a3a90d0c02() 347 [&]() { 348 return xs.min(); 349 } 350 ); 351 352 THEN("the output contains the min of source values"){ 353 auto required = rxu::to_vector({ 354 d_on.next(250, 2), 355 d_on.completed(250) 356 }); 357 auto actual = res.get_observer().messages(); 358 REQUIRE(required == actual); 359 } 360 361 THEN("there was one subscription and one unsubscription"){ 362 auto required = rxu::to_vector({ 363 on.subscribe(200, 250) 364 }); 365 auto actual = xs.subscriptions(); 366 REQUIRE(required == actual); 367 } 368 } 369 } 370 } 371 372 // Does not work with exceptions disabled, min will throw when stream is empty 373 // and this crashes immediately. 374 // TODO: min implementation should be rewritten not to throw exceptions. 375 SCENARIO("min, empty", "[reduce][min][operators][!throws]"){ 376 GIVEN("a test hot observable of ints"){ 377 auto sc = rxsc::make_test(); 378 auto w = sc.create_worker(); 379 const rxsc::test::messages<int> on; 380 const rxsc::test::messages<int> d_on; 381 382 std::runtime_error ex("min on_error"); 383 384 auto xs = sc.make_hot_observable({ 385 on.next(150, 1), 386 on.completed(250) 387 }); 388 389 WHEN("min is calculated"){ 390 391 auto res = w.start( __anonb3a3a90d0d02() 392 [&]() { 393 return xs.min(); 394 } 395 ); 396 397 THEN("the output contains only error message"){ 398 auto required = rxu::to_vector({ 399 d_on.error(250, ex) 400 }); 401 auto actual = res.get_observer().messages(); 402 REQUIRE(required == actual); 403 } 404 405 THEN("there was one subscription and one unsubscription"){ 406 auto required = rxu::to_vector({ 407 on.subscribe(200, 250) 408 }); 409 auto actual = xs.subscriptions(); 410 REQUIRE(required == actual); 411 } 412 } 413 } 414 } 415 416 SCENARIO("min, error", "[reduce][min][operators]"){ 417 GIVEN("a test hot observable of ints"){ 418 auto sc = rxsc::make_test(); 419 auto w = sc.create_worker(); 420 const rxsc::test::messages<int> on; 421 const rxsc::test::messages<int> d_on; 422 423 std::runtime_error ex("min on_error from source"); 424 425 auto xs = sc.make_hot_observable({ 426 on.next(150, 1), 427 on.error(250, ex) 428 }); 429 430 WHEN("min is calculated"){ 431 432 auto res = w.start( __anonb3a3a90d0e02() 433 [&]() { 434 return xs.min(); 435 } 436 ); 437 438 THEN("the output contains only error message"){ 439 auto required = rxu::to_vector({ 440 d_on.error(250, ex) 441 }); 442 auto actual = res.get_observer().messages(); 443 REQUIRE(required == actual); 444 } 445 446 THEN("there was one subscription and one unsubscription"){ 447 auto required = rxu::to_vector({ 448 on.subscribe(200, 250) 449 }); 450 auto actual = xs.subscriptions(); 451 REQUIRE(required == actual); 452 } 453 } 454 } 455 } 456