1 #include "../test.h" 2 #include <rxcpp/operators/rx-concat.hpp> 3 #include <rxcpp/operators/rx-reduce.hpp> 4 #include <rxcpp/operators/rx-observe_on.hpp> 5 6 const int static_onnextcalls = 1000000; 7 8 SCENARIO("synchronize concat ranges", "[!hide][range][synchronize][concat][perf]"){ 9 const int& onnextcalls = static_onnextcalls; 10 GIVEN("some ranges"){ 11 WHEN("generating ints"){ 12 using namespace std::chrono; 13 typedef steady_clock clock; 14 15 auto so = rx::synchronize_event_loop(); 16 17 int n = 1; 18 auto sectionCount = onnextcalls / 3; 19 auto start = clock::now(); 20 auto c = rxs::range(0, sectionCount - 1, 1, so) 21 .concat( 22 so, 23 rxs::range(sectionCount, sectionCount * 2 - 1, 1, so), 24 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 25 .as_blocking() 26 .count(); 27 28 auto finish = clock::now(); 29 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 30 duration_cast<milliseconds>(start.time_since_epoch()); 31 std::cout << "concat sync ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 32 } 33 } 34 } 35 36 SCENARIO("observe_on concat ranges", "[!hide][range][observe_on][concat][perf]"){ 37 const int& onnextcalls = static_onnextcalls; 38 GIVEN("some ranges"){ 39 WHEN("generating ints"){ 40 using namespace std::chrono; 41 typedef steady_clock clock; 42 43 auto so = rx::observe_on_event_loop(); 44 45 int n = 1; 46 auto sectionCount = onnextcalls / 3; 47 auto start = clock::now(); 48 int c = rxs::range(0, sectionCount - 1, 1, so) 49 .concat( 50 so, 51 rxs::range(sectionCount, sectionCount * 2 - 1, 1, so), 52 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 53 .as_blocking() 54 .count(); 55 56 auto finish = clock::now(); 57 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 58 duration_cast<milliseconds>(start.time_since_epoch()); 59 std::cout << "concat observe_on ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 60 } 61 } 62 } 63 64 SCENARIO("serialize concat ranges", "[!hide][range][serialize][concat][perf]"){ 65 const int& onnextcalls = static_onnextcalls; 66 GIVEN("some ranges"){ 67 WHEN("generating ints"){ 68 using namespace std::chrono; 69 typedef steady_clock clock; 70 71 auto so = rx::serialize_event_loop(); 72 73 int n = 1; 74 auto sectionCount = onnextcalls / 3; 75 auto start = clock::now(); 76 int c = rxs::range(0, sectionCount - 1, 1, so) 77 .concat( 78 so, 79 rxs::range(sectionCount, sectionCount * 2 - 1, 1, so), 80 rxs::range(sectionCount * 2, onnextcalls - 1, 1, so)) 81 .as_blocking() 82 .count(); 83 84 auto finish = clock::now(); 85 auto msElapsed = duration_cast<milliseconds>(finish.time_since_epoch()) - 86 duration_cast<milliseconds>(start.time_since_epoch()); 87 std::cout << "concat serial ranges : " << n << " subscribed, " << c << " emitted, " << msElapsed.count() << "ms elapsed " << std::endl; 88 } 89 } 90 } 91 92 93 SCENARIO("concat completes", "[concat][join][operators]"){ 94 GIVEN("1 hot observable with 3 cold observables of ints."){ 95 auto sc = rxsc::make_test(); 96 auto w = sc.create_worker(); 97 const rxsc::test::messages<int> on; 98 const rxsc::test::messages<rx::observable<int>> o_on; 99 100 auto ys1 = sc.make_cold_observable({ 101 on.next(10, 101), 102 on.next(20, 102), 103 on.next(110, 103), 104 on.next(120, 104), 105 on.next(210, 105), 106 on.next(220, 106), 107 on.completed(230) 108 }); 109 110 auto ys2 = sc.make_cold_observable({ 111 on.next(10, 201), 112 on.next(20, 202), 113 on.next(30, 203), 114 on.next(40, 204), 115 on.completed(50) 116 }); 117 118 auto ys3 = sc.make_cold_observable({ 119 on.next(10, 301), 120 on.next(20, 302), 121 on.next(30, 303), 122 on.next(40, 304), 123 on.next(120, 305), 124 on.completed(150) 125 }); 126 127 auto xs = sc.make_hot_observable({ 128 o_on.next(300, ys1), 129 o_on.next(400, ys2), 130 o_on.next(500, ys3), 131 o_on.completed(600) 132 }); 133 134 WHEN("each int is merged"){ 135 136 auto res = w.start( __anon34dfc1eb0102() 137 [&]() { 138 return xs 139 | rxo::concat() 140 // forget type to workaround lambda deduction bug on msvc 2013 141 | rxo::as_dynamic(); 142 } 143 ); 144 145 THEN("the output contains merged ints"){ 146 auto required = rxu::to_vector({ 147 on.next(310, 101), 148 on.next(320, 102), 149 on.next(410, 103), 150 on.next(420, 104), 151 on.next(510, 105), 152 on.next(520, 106), 153 on.next(540, 201), 154 on.next(550, 202), 155 on.next(560, 203), 156 on.next(570, 204), 157 on.next(590, 301), 158 on.next(600, 302), 159 on.next(610, 303), 160 on.next(620, 304), 161 on.next(700, 305), 162 on.completed(730) 163 }); 164 auto actual = res.get_observer().messages(); 165 REQUIRE(required == actual); 166 } 167 168 THEN("there was one subscription and one unsubscription to the ints"){ 169 auto required = rxu::to_vector({ 170 on.subscribe(200, 600) 171 }); 172 auto actual = xs.subscriptions(); 173 REQUIRE(required == actual); 174 } 175 176 THEN("there was one subscription and one unsubscription to the ys1"){ 177 auto required = rxu::to_vector({ 178 on.subscribe(300, 530) 179 }); 180 auto actual = ys1.subscriptions(); 181 REQUIRE(required == actual); 182 } 183 184 THEN("there was one subscription and one unsubscription to the ys2"){ 185 auto required = rxu::to_vector({ 186 on.subscribe(530, 580) 187 }); 188 auto actual = ys2.subscriptions(); 189 REQUIRE(required == actual); 190 } 191 192 THEN("there was one subscription and one unsubscription to the ys3"){ 193 auto required = rxu::to_vector({ 194 on.subscribe(580, 730) 195 }); 196 auto actual = ys3.subscriptions(); 197 REQUIRE(required == actual); 198 } 199 } 200 } 201 } 202