1 #include "../test.h" 2 #include <rxcpp/operators/rx-reduce.hpp> 3 #include <rxcpp/operators/rx-map.hpp> 4 #include <rxcpp/operators/rx-subscribe_on.hpp> 5 #include <rxcpp/operators/rx-observe_on.hpp> 6 7 #include <sstream> 8 9 static const int static_subscriptions = 50000; 10 11 SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){ 12 const int& subscriptions = static_subscriptions; 13 GIVEN("a for loop"){ 14 WHEN("subscribe 50K times"){ 15 using namespace std::chrono; 16 typedef steady_clock clock; 17 18 int runs = 10; 19 20 for (;runs > 0; --runs) { 21 22 int c = 0; 23 int n = 1; 24 auto start = clock::now(); 25 for (int i = 0; i < subscriptions; ++i) { 26 c += rx::observable<>::just(1) __anonef2514930102(int i) 27 .map([](int i) { 28 std::stringstream serializer; 29 serializer << i; 30 return serializer.str(); 31 }) __anonef2514930202(const std::string& s) 32 .map([](const std::string& s) { 33 int i; 34 std::stringstream(s) >> i; 35 return i; 36 }) 37 .subscribe_on(rx::observe_on_event_loop()) 38 .observe_on(rx::observe_on_event_loop()) 39 .as_blocking() 40 .count(); 41 } 42 auto finish = clock::now(); 43 auto msElapsed = duration_cast<milliseconds>(finish-start); 44 REQUIRE(subscriptions == c); 45 std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 46 } 47 } 48 } 49 } 50 51 SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){ 52 const int& subscriptions = static_subscriptions; 53 GIVEN("a for loop"){ 54 WHEN("subscribe 50K times"){ 55 using namespace std::chrono; 56 typedef steady_clock clock; 57 58 int runs = 10; 59 60 for (;runs > 0; --runs) { 61 62 int c = 0; 63 int n = 1; 64 auto start = clock::now(); 65 66 for (int i = 0; i < subscriptions; ++i) { 67 c += rx::observable<>:: 68 just(1). __anonef2514930302(int i) 69 map([](int i) { 70 std::stringstream serializer; 71 serializer << i; 72 return serializer.str(); 73 }). __anonef2514930402(const std::string& s) 74 map([](const std::string& s) { 75 int i; 76 std::stringstream(s) >> i; 77 return i; 78 }). 79 subscribe_on(rx::observe_on_event_loop()). 80 as_blocking(). 81 count(); 82 } 83 auto finish = clock::now(); 84 auto msElapsed = duration_cast<milliseconds>(finish-start); 85 REQUIRE(subscriptions == c); 86 std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 87 } 88 } 89 } 90 } 91 92 SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){ 93 GIVEN("a source"){ 94 auto sc = rxsc::make_test(); 95 auto so = rx::synchronize_in_one_worker(sc); 96 auto w = sc.create_worker(); 97 const rxsc::test::messages<int> on; 98 99 auto xs = sc.make_hot_observable({ 100 on.next(150, 1), 101 on.next(210, 2), 102 on.next(240, 3), 103 on.completed(300) 104 }); 105 106 WHEN("subscribe_on is specified"){ 107 108 auto res = w.start( __anonef2514930502() 109 [so, xs]() { 110 return xs 111 .subscribe_on(so); 112 } 113 ); 114 115 THEN("the output contains items sent while subscribed"){ 116 auto required = rxu::to_vector({ 117 on.next(210, 2), 118 on.next(240, 3), 119 on.completed(300) 120 }); 121 auto actual = res.get_observer().messages(); 122 REQUIRE(required == actual); 123 } 124 125 THEN("there was 1 subscription/unsubscription to the source"){ 126 auto required = rxu::to_vector({ 127 on.subscribe(201, 300) 128 }); 129 auto actual = xs.subscriptions(); 130 REQUIRE(required == actual); 131 } 132 133 } 134 } 135 } 136 137 SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){ 138 GIVEN("a source"){ 139 auto sc = rxsc::make_test(); 140 auto so = rx::synchronize_in_one_worker(sc); 141 auto w = sc.create_worker(); 142 const rxsc::test::messages<int> on; 143 144 auto xs = sc.make_hot_observable({ 145 on.next(150, 1), 146 on.next(210, 2), 147 on.next(240, 3), 148 on.completed(300) 149 }); 150 151 WHEN("subscribe_on is specified"){ 152 153 auto res = w.start( __anonef2514930602() 154 [so, xs]() { 155 return xs 156 | rxo::subscribe_on(so); 157 } 158 ); 159 160 THEN("the output contains items sent while subscribed"){ 161 auto required = rxu::to_vector({ 162 on.next(210, 2), 163 on.next(240, 3), 164 on.completed(300) 165 }); 166 auto actual = res.get_observer().messages(); 167 REQUIRE(required == actual); 168 } 169 170 THEN("there was 1 subscription/unsubscription to the source"){ 171 auto required = rxu::to_vector({ 172 on.subscribe(201, 300) 173 }); 174 auto actual = xs.subscriptions(); 175 REQUIRE(required == actual); 176 } 177 178 } 179 } 180 } 181