1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-filter.hpp> 4 #include <rxcpp/operators/rx-map.hpp> 5 #include <rxcpp/operators/rx-take.hpp> 6 #include <rxcpp/operators/rx-concat_map.hpp> 7 #include <rxcpp/operators/rx-observe_on.hpp> 8 9 static const int static_tripletCount = 100; 10 11 SCENARIO("concat_transform pythagorian ranges", "[!hide][range][concat_transform][pythagorian][perf]"){ 12 const int& tripletCount = static_tripletCount; 13 GIVEN("some ranges"){ 14 WHEN("generating pythagorian triplets"){ 15 using namespace std::chrono; 16 typedef steady_clock clock; 17 18 auto sc = rxsc::make_immediate(); 19 //auto sc = rxsc::make_current_thread(); 20 auto so = rx::identity_one_worker(sc); 21 22 int c = 0; 23 int ct = 0; 24 int n = 1; 25 auto start = clock::now(); 26 auto triples = 27 rxs::range(1, so) 28 .concat_transform( __anon58f26a080102(int z)29 [&c, so](int z){ 30 return rxs::range(1, z, 1, so) 31 .concat_transform( 32 [&c, so, z](int x){ 33 return rxs::range(x, z, 1, so) 34 .filter([&c, z, x](int y){++c; return x*x + y*y == z*z;}) 35 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 36 // forget type to workaround lambda deduction bug on msvc 2013 37 .as_dynamic();}, 38 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}) 39 // forget type to workaround lambda deduction bug on msvc 2013 40 .as_dynamic();}, __anon58f26a080602(int , std::tuple<int,int,int> triplet)41 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}); 42 triples 43 .take(tripletCount) 44 .subscribe( __anon58f26a080702(int ,int ,int )45 rxu::apply_to([&ct](int /*x*/,int /*y*/,int /*z*/){++ct;}), __anon58f26a080802(rxu::error_ptr)46 [](rxu::error_ptr){abort();}); 47 auto finish = clock::now(); 48 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 49 duration_cast<milliseconds>(start.time_since_epoch()); 50 std::cout << "concat pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 51 52 } 53 } 54 } 55 56 SCENARIO("synchronize concat_transform pythagorian ranges", "[!hide][range][concat_transform][synchronize][pythagorian][perf]"){ 57 const int& tripletCount = static_tripletCount; 58 GIVEN("some ranges"){ 59 WHEN("generating pythagorian triplets"){ 60 using namespace std::chrono; 61 typedef steady_clock clock; 62 63 auto so = rx::synchronize_event_loop(); 64 65 int c = 0; 66 int n = 1; 67 auto start = clock::now(); 68 auto triples = 69 rxs::range(1, so) 70 .concat_transform( __anon58f26a080902(int z)71 [&c, so](int z){ 72 return rxs::range(1, z, 1, so) 73 .concat_transform( 74 [&c, so, z](int x){ 75 return rxs::range(x, z, 1, so) 76 .filter([&c, z, x](int y){ 77 ++c; 78 if (x*x + y*y == z*z) { 79 return true;} 80 else { 81 return false;}}) 82 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 83 // forget type to workaround lambda deduction bug on msvc 2013 84 .as_dynamic();}, 85 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 86 so) 87 // forget type to workaround lambda deduction bug on msvc 2013 88 .as_dynamic();}, __anon58f26a080e02(int , std::tuple<int,int,int> triplet)89 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 90 so); 91 int ct = triples 92 .take(tripletCount) 93 .as_blocking() 94 .count(); 95 96 auto finish = clock::now(); 97 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 98 duration_cast<milliseconds>(start.time_since_epoch()); 99 std::cout << "concat sync pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 100 } 101 } 102 } 103 104 SCENARIO("observe_on concat_transform pythagorian ranges", "[!hide][range][concat_transform][observe_on][pythagorian][perf]"){ 105 const int& tripletCount = static_tripletCount; 106 GIVEN("some ranges"){ 107 WHEN("generating pythagorian triplets"){ 108 using namespace std::chrono; 109 typedef steady_clock clock; 110 111 auto so = rx::observe_on_event_loop(); 112 113 int c = 0; 114 int n = 1; 115 auto start = clock::now(); 116 auto triples = 117 rxs::range(1, so) 118 .concat_transform( __anon58f26a080f02(int z)119 [&c, so](int z){ 120 return rxs::range(1, z, 1, so) 121 .concat_transform( 122 [&c, so, z](int x){ 123 return rxs::range(x, z, 1, so) 124 .filter([&c, z, x](int y){ 125 ++c; 126 if (x*x + y*y == z*z) { 127 return true;} 128 else { 129 return false;}}) 130 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 131 // forget type to workaround lambda deduction bug on msvc 2013 132 .as_dynamic();}, 133 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 134 so) 135 // forget type to workaround lambda deduction bug on msvc 2013 136 .as_dynamic();}, __anon58f26a081402(int , std::tuple<int,int,int> triplet)137 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 138 so); 139 140 int ct = triples 141 .take(tripletCount) 142 .as_blocking() 143 .count(); 144 145 auto finish = clock::now(); 146 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 147 duration_cast<milliseconds>(start.time_since_epoch()); 148 std::cout << "concat observe_on pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 149 } 150 } 151 } 152 153 SCENARIO("serialize concat_transform pythagorian ranges", "[!hide][range][concat_transform][serialize][pythagorian][perf]"){ 154 const int& tripletCount = static_tripletCount; 155 GIVEN("some ranges"){ 156 WHEN("generating pythagorian triplets"){ 157 using namespace std::chrono; 158 typedef steady_clock clock; 159 160 auto so = rx::serialize_event_loop(); 161 162 int c = 0; 163 int n = 1; 164 auto start = clock::now(); 165 auto triples = 166 rxs::range(1, so) 167 .concat_transform( __anon58f26a081502(int z)168 [&c, so](int z){ 169 return rxs::range(1, z, 1, so) 170 .concat_transform( 171 [&c, so, z](int x){ 172 return rxs::range(x, z, 1, so) 173 .filter([&c, z, x](int y){ 174 ++c; 175 if (x*x + y*y == z*z) { 176 return true;} 177 else { 178 return false;}}) 179 .transform([z, x](int y){return std::make_tuple(x, y, z);}) 180 // forget type to workaround lambda deduction bug on msvc 2013 181 .as_dynamic();}, 182 [](int /*x*/, std::tuple<int,int,int> triplet){return triplet;}, 183 so) 184 // forget type to workaround lambda deduction bug on msvc 2013 185 .as_dynamic();}, __anon58f26a081a02(int , std::tuple<int,int,int> triplet)186 [](int /*z*/, std::tuple<int,int,int> triplet){return triplet;}, 187 so); 188 189 int ct = triples 190 .take(tripletCount) 191 .as_blocking() 192 .count(); 193 194 auto finish = clock::now(); 195 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 196 duration_cast<milliseconds>(start.time_since_epoch()); 197 std::cout << "concat serial pythagorian range : " << n << " subscribed, " << c << " filtered to, " << ct << " triplets, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 198 } 199 } 200 } 201 202 SCENARIO("concat_map completes", "[concat_map][transform][map][operators]"){ 203 GIVEN("two cold observables. one of ints. one of strings."){ 204 auto sc = rxsc::make_test(); 205 auto w = sc.create_worker(); 206 const rxsc::test::messages<int> i_on; 207 const rxsc::test::messages<std::string> s_on; 208 209 auto xs = sc.make_cold_observable({ 210 i_on.next(100, 4), 211 i_on.next(200, 2), 212 i_on.completed(500) 213 }); 214 215 auto ys = sc.make_cold_observable({ 216 s_on.next(50, "foo"), 217 s_on.next(100, "bar"), 218 s_on.next(150, "baz"), 219 s_on.next(200, "qux"), 220 s_on.completed(250) 221 }); 222 223 WHEN("each int is mapped to the strings"){ 224 225 auto res = w.start( __anon58f26a081b02() 226 [&]() { 227 return xs 228 | rxo::concat_map( 229 [&](int){ 230 return ys;}, 231 [](int, std::string s){ 232 return s;}) 233 // forget type to workaround lambda deduction bug on msvc 2013 234 | rxo::as_dynamic(); 235 } 236 ); 237 238 THEN("the output contains strings repeated for each int"){ 239 auto required = rxu::to_vector({ 240 s_on.next(350, "foo"), 241 s_on.next(400, "bar"), 242 s_on.next(450, "baz"), 243 s_on.next(500, "qux"), 244 s_on.next(600, "foo"), 245 s_on.next(650, "bar"), 246 s_on.next(700, "baz"), 247 s_on.next(750, "qux"), 248 s_on.completed(800) 249 }); 250 auto actual = res.get_observer().messages(); 251 REQUIRE(required == actual); 252 } 253 254 THEN("there was one subscription and one unsubscription to the ints"){ 255 auto required = rxu::to_vector({ 256 i_on.subscribe(200, 700) 257 }); 258 auto actual = xs.subscriptions(); 259 REQUIRE(required == actual); 260 } 261 262 THEN("there were 2 subscription and unsubscription to the strings"){ 263 auto required = rxu::to_vector({ 264 s_on.subscribe(300, 550), 265 s_on.subscribe(550, 800) 266 }); 267 auto actual = ys.subscriptions(); 268 REQUIRE(required == actual); 269 } 270 } 271 } 272 } 273 274 SCENARIO("concat_transform completes", "[concat_transform][transform][map][operators]"){ 275 GIVEN("two cold observables. one of ints. one of strings."){ 276 auto sc = rxsc::make_test(); 277 auto w = sc.create_worker(); 278 const rxsc::test::messages<int> i_on; 279 const rxsc::test::messages<std::string> s_on; 280 281 auto xs = sc.make_cold_observable({ 282 i_on.next(100, 4), 283 i_on.next(200, 2), 284 i_on.completed(500) 285 }); 286 287 auto ys = sc.make_cold_observable({ 288 s_on.next(50, "foo"), 289 s_on.next(100, "bar"), 290 s_on.next(150, "baz"), 291 s_on.next(200, "qux"), 292 s_on.completed(250) 293 }); 294 295 WHEN("each int is mapped to the strings"){ 296 297 auto res = w.start( __anon58f26a081e02() 298 [&]() { 299 return xs 300 | rxo::concat_transform( 301 [&](int){ 302 return ys;}, 303 [](int, std::string s){ 304 return s;}) 305 // forget type to workaround lambda deduction bug on msvc 2013 306 | rxo::as_dynamic(); 307 } 308 ); 309 310 THEN("the output contains strings repeated for each int"){ 311 auto required = rxu::to_vector({ 312 s_on.next(350, "foo"), 313 s_on.next(400, "bar"), 314 s_on.next(450, "baz"), 315 s_on.next(500, "qux"), 316 s_on.next(600, "foo"), 317 s_on.next(650, "bar"), 318 s_on.next(700, "baz"), 319 s_on.next(750, "qux"), 320 s_on.completed(800) 321 }); 322 auto actual = res.get_observer().messages(); 323 REQUIRE(required == actual); 324 } 325 326 THEN("there was one subscription and one unsubscription to the ints"){ 327 auto required = rxu::to_vector({ 328 i_on.subscribe(200, 700) 329 }); 330 auto actual = xs.subscriptions(); 331 REQUIRE(required == actual); 332 } 333 334 THEN("there were 2 subscription and unsubscription to the strings"){ 335 auto required = rxu::to_vector({ 336 s_on.subscribe(300, 550), 337 s_on.subscribe(550, 800) 338 }); 339 auto actual = ys.subscriptions(); 340 REQUIRE(required == actual); 341 } 342 } 343 344 WHEN("each int is mapped to the strings with coordinator"){ 345 346 auto res = w.start( __anon58f26a082102() 347 [&]() { 348 return xs 349 .concat_transform( 350 [&](int){ 351 return ys;}, 352 [](int, std::string s){ 353 return s;}, 354 rx::identity_current_thread()) 355 // forget type to workaround lambda deduction bug on msvc 2013 356 .as_dynamic(); 357 } 358 ); 359 360 THEN("the output contains strings repeated for each int"){ 361 auto required = rxu::to_vector({ 362 s_on.next(350, "foo"), 363 s_on.next(400, "bar"), 364 s_on.next(450, "baz"), 365 s_on.next(500, "qux"), 366 s_on.next(600, "foo"), 367 s_on.next(650, "bar"), 368 s_on.next(700, "baz"), 369 s_on.next(750, "qux"), 370 s_on.completed(800) 371 }); 372 auto actual = res.get_observer().messages(); 373 REQUIRE(required == actual); 374 } 375 376 THEN("there was one subscription and one unsubscription to the ints"){ 377 auto required = rxu::to_vector({ 378 i_on.subscribe(200, 700) 379 }); 380 auto actual = xs.subscriptions(); 381 REQUIRE(required == actual); 382 } 383 384 THEN("there were 2 subscription and unsubscription to the strings"){ 385 auto required = rxu::to_vector({ 386 s_on.subscribe(300, 550), 387 s_on.subscribe(550, 800) 388 }); 389 auto actual = ys.subscriptions(); 390 REQUIRE(required == actual); 391 } 392 } 393 } 394 } 395 396 SCENARIO("concat_transform, no result selector, no coordination", "[concat_transform][transform][map][operators]"){ 397 GIVEN("two cold observables. one of ints. one of strings."){ 398 auto sc = rxsc::make_test(); 399 auto w = sc.create_worker(); 400 const rxsc::test::messages<int> i_on; 401 const rxsc::test::messages<std::string> s_on; 402 403 auto xs = sc.make_cold_observable({ 404 i_on.next(100, 4), 405 i_on.next(200, 2), 406 i_on.completed(500) 407 }); 408 409 auto ys = sc.make_cold_observable({ 410 s_on.next(50, "foo"), 411 s_on.next(100, "bar"), 412 s_on.next(150, "baz"), 413 s_on.next(200, "qux"), 414 s_on.completed(250) 415 }); 416 417 WHEN("each int is mapped to the strings"){ 418 419 auto res = w.start( __anon58f26a082402() 420 [&]() { 421 return xs 422 .concat_transform( 423 [&](int){ 424 return ys;}) 425 // forget type to workaround lambda deduction bug on msvc 2013 426 .as_dynamic(); 427 } 428 ); 429 430 THEN("the output contains strings repeated for each int"){ 431 auto required = rxu::to_vector({ 432 s_on.next(350, "foo"), 433 s_on.next(400, "bar"), 434 s_on.next(450, "baz"), 435 s_on.next(500, "qux"), 436 s_on.next(600, "foo"), 437 s_on.next(650, "bar"), 438 s_on.next(700, "baz"), 439 s_on.next(750, "qux"), 440 s_on.completed(800) 441 }); 442 auto actual = res.get_observer().messages(); 443 REQUIRE(required == actual); 444 } 445 446 THEN("there was one subscription and one unsubscription to the ints"){ 447 auto required = rxu::to_vector({ 448 i_on.subscribe(200, 700) 449 }); 450 auto actual = xs.subscriptions(); 451 REQUIRE(required == actual); 452 } 453 454 THEN("there were 2 subscription and unsubscription to the strings"){ 455 auto required = rxu::to_vector({ 456 s_on.subscribe(300, 550), 457 s_on.subscribe(550, 800) 458 }); 459 auto actual = ys.subscriptions(); 460 REQUIRE(required == actual); 461 } 462 } 463 } 464 } 465 466 SCENARIO("concat_transform, no result selector, with coordination", "[concat_transform][transform][map][operators]"){ 467 GIVEN("two cold observables. one of ints. one of strings."){ 468 auto sc = rxsc::make_test(); 469 auto w = sc.create_worker(); 470 const rxsc::test::messages<int> i_on; 471 const rxsc::test::messages<std::string> s_on; 472 473 auto xs = sc.make_cold_observable({ 474 i_on.next(100, 4), 475 i_on.next(200, 2), 476 i_on.completed(500) 477 }); 478 479 auto ys = sc.make_cold_observable({ 480 s_on.next(50, "foo"), 481 s_on.next(100, "bar"), 482 s_on.next(150, "baz"), 483 s_on.next(200, "qux"), 484 s_on.completed(250) 485 }); 486 487 WHEN("each int is mapped to the strings"){ 488 489 auto res = w.start( __anon58f26a082602() 490 [&]() { 491 return xs 492 .concat_transform( 493 [&](int){ 494 return ys;}, 495 rx::identity_current_thread()) 496 // forget type to workaround lambda deduction bug on msvc 2013 497 .as_dynamic(); 498 } 499 ); 500 501 THEN("the output contains strings repeated for each int"){ 502 auto required = rxu::to_vector({ 503 s_on.next(350, "foo"), 504 s_on.next(400, "bar"), 505 s_on.next(450, "baz"), 506 s_on.next(500, "qux"), 507 s_on.next(600, "foo"), 508 s_on.next(650, "bar"), 509 s_on.next(700, "baz"), 510 s_on.next(750, "qux"), 511 s_on.completed(800) 512 }); 513 auto actual = res.get_observer().messages(); 514 REQUIRE(required == actual); 515 } 516 517 THEN("there was one subscription and one unsubscription to the ints"){ 518 auto required = rxu::to_vector({ 519 i_on.subscribe(200, 700) 520 }); 521 auto actual = xs.subscriptions(); 522 REQUIRE(required == actual); 523 } 524 525 THEN("there were 2 subscription and unsubscription to the strings"){ 526 auto required = rxu::to_vector({ 527 s_on.subscribe(300, 550), 528 s_on.subscribe(550, 800) 529 }); 530 auto actual = ys.subscriptions(); 531 REQUIRE(required == actual); 532 } 533 } 534 } 535 } 536