1 #include "../test.h" 2 #include <rxcpp/operators/rx-take.hpp> 3 #include <rxcpp/operators/rx-map.hpp> 4 #include <rxcpp/operators/rx-observe_on.hpp> 5 6 const int static_onnextcalls = 100000; 7 8 SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){ 9 const int& onnextcalls = static_onnextcalls; 10 GIVEN("a range"){ 11 WHEN("multicasting a million ints"){ 12 using namespace std::chrono; 13 typedef steady_clock clock; 14 15 auto el = rx::observe_on_new_thread(); 16 17 for (int n = 0; n < 10; n++) 18 { 19 std::atomic_bool disposed; 20 std::atomic_bool done; 21 auto c = std::make_shared<int>(0); 22 23 rx::composite_subscription cs; __anon2d9421e70102()24 cs.add([&](){ 25 if (!done) {abort();} 26 disposed = true; 27 }); 28 29 auto start = clock::now(); 30 rxs::range<int>(1) 31 .take(onnextcalls) 32 .observe_on(el) 33 .as_blocking() 34 .subscribe( 35 cs, __anon2d9421e70202(int)36 [c](int){ 37 ++(*c); 38 }, __anon2d9421e70302()39 [&](){ 40 done = true; 41 }); 42 auto expected = onnextcalls; 43 REQUIRE(*c == expected); 44 auto finish = clock::now(); 45 auto msElapsed = duration_cast<milliseconds>(finish-start); 46 std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl; 47 } 48 } 49 } 50 } 51 52 SCENARIO("observe_on", "[observe][observe_on]"){ 53 GIVEN("a source"){ 54 auto sc = rxsc::make_test(); 55 auto so = rx::synchronize_in_one_worker(sc); 56 auto w = sc.create_worker(); 57 const rxsc::test::messages<int> on; 58 59 auto xs = sc.make_hot_observable({ 60 on.next(150, 1), 61 on.next(210, 2), 62 on.next(240, 3), 63 on.completed(300) 64 }); 65 66 WHEN("subscribe_on is specified"){ 67 68 auto res = w.start( __anon2d9421e70402() 69 [so, xs]() { 70 return xs 71 .observe_on(so); 72 } 73 ); 74 75 THEN("the output contains items sent while subscribed"){ 76 auto required = rxu::to_vector({ 77 on.next(211, 2), 78 on.next(241, 3), 79 on.completed(301) 80 }); 81 auto actual = res.get_observer().messages(); 82 REQUIRE(required == actual); 83 } 84 85 THEN("there was 1 subscription/unsubscription to the source"){ 86 auto required = rxu::to_vector({ 87 on.subscribe(200, 300) 88 }); 89 auto actual = xs.subscriptions(); 90 REQUIRE(required == actual); 91 } 92 93 } 94 } 95 } 96 97 SCENARIO("stream observe_on", "[observe][observe_on]"){ 98 GIVEN("a source"){ 99 auto sc = rxsc::make_test(); 100 auto so = rx::synchronize_in_one_worker(sc); 101 auto w = sc.create_worker(); 102 const rxsc::test::messages<int> on; 103 104 auto xs = sc.make_hot_observable({ 105 on.next(150, 1), 106 on.next(210, 2), 107 on.next(240, 3), 108 on.completed(300) 109 }); 110 111 WHEN("observe_on is specified"){ 112 113 auto res = w.start( __anon2d9421e70502() 114 [so, xs]() { 115 return xs 116 | rxo::observe_on(so); 117 } 118 ); 119 120 THEN("the output contains items sent while subscribed"){ 121 auto required = rxu::to_vector({ 122 on.next(211, 2), 123 on.next(241, 3), 124 on.completed(301) 125 }); 126 auto actual = res.get_observer().messages(); 127 REQUIRE(required == actual); 128 } 129 130 THEN("there was 1 subscription/unsubscription to the source"){ 131 auto required = rxu::to_vector({ 132 on.subscribe(200, 300) 133 }); 134 auto actual = xs.subscriptions(); 135 REQUIRE(required == actual); 136 } 137 138 } 139 } 140 } 141 142 class nocompare { 143 public: 144 int v; 145 }; 146 147 SCENARIO("observe_on no-comparison", "[observe][observe_on]"){ 148 GIVEN("a source"){ 149 auto sc = rxsc::make_test(); 150 auto so = rx::observe_on_one_worker(sc); 151 auto w = sc.create_worker(); 152 const rxsc::test::messages<nocompare> in; 153 const rxsc::test::messages<int> out; 154 155 auto xs = sc.make_hot_observable({ 156 in.next(150, nocompare{1}), 157 in.next(210, nocompare{2}), 158 in.next(240, nocompare{3}), 159 in.completed(300) 160 }); 161 162 WHEN("observe_on is specified"){ 163 164 auto res = w.start( __anon2d9421e70602() 165 [so, xs]() { 166 return xs 167 | rxo::observe_on(so) 168 | rxo::map([](nocompare v){ return v.v; }) 169 | rxo::as_dynamic(); 170 } 171 ); 172 173 THEN("the output contains items sent while subscribed"){ 174 auto required = rxu::to_vector({ 175 out.next(211, 2), 176 out.next(241, 3), 177 out.completed(301) 178 }); 179 auto actual = res.get_observer().messages(); 180 REQUIRE(required == actual); 181 } 182 183 THEN("there was 1 subscription/unsubscription to the source"){ 184 auto required = rxu::to_vector({ 185 out.subscribe(200, 300) 186 }); 187 auto actual = xs.subscriptions(); 188 REQUIRE(required == actual); 189 } 190 191 } 192 } 193 } 194