1 #include "../test.h" 2 #include "rxcpp/operators/rx-combine_latest.hpp" 3 #include "rxcpp/operators/rx-map.hpp" 4 #include "rxcpp/operators/rx-take.hpp" 5 #include "rxcpp/operators/rx-observe_on.hpp" 6 #include "rxcpp/operators/rx-publish.hpp" 7 #include "rxcpp/operators/rx-ref_count.hpp" 8 9 #include <sstream> 10 11 SCENARIO("observe subscription", "[!hide]"){ 12 GIVEN("observable of ints"){ 13 WHEN("subscribe"){ 14 auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>(); 15 __anon442cb92f0102(rxcpp::subscriber<int> out)16 auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){ 17 auto it = observers->insert(observers->end(), out); 18 it->add([=](){ 19 observers->erase(it); 20 }); 21 }); 22 23 } 24 } 25 } 26 27 static const int static_subscriptions = 10000; 28 29 SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){ 30 const int& subscriptions = static_subscriptions; 31 GIVEN("a for loop"){ 32 WHEN("subscribe 100K times"){ 33 using namespace std::chrono; 34 typedef steady_clock clock; 35 36 auto sc = rxsc::make_current_thread(); 37 auto w = sc.create_worker(); 38 int runs = 10; 39 __anon442cb92f0302(const rxsc::schedulable& self) 40 auto loop = [&](const rxsc::schedulable& self) { 41 int c = 0; 42 int n = 1; 43 auto start = clock::now(); 44 for (int i = 0; i < subscriptions; i++) { 45 rx::observable<>::just(1) 46 .map([](int i) { 47 std::stringstream serializer; 48 serializer << i; 49 return serializer.str(); 50 }) 51 .map([](const std::string& s) { 52 int i; 53 std::stringstream(s) >> i; 54 return i; 55 }) 56 .subscribe([&](int){ 57 ++c; 58 }); 59 } 60 auto finish = clock::now(); 61 auto msElapsed = duration_cast<milliseconds>(finish-start); 62 std::cout << "loop subscribe map : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 63 64 if (--runs > 0) { 65 self(); 66 } 67 }; 68 69 w.schedule(loop); 70 } 71 } 72 } 73 74 SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){ 75 const int& subscriptions = static_subscriptions; 76 GIVEN("a for loop"){ 77 WHEN("subscribe 100K times"){ 78 using namespace std::chrono; 79 typedef steady_clock clock; 80 81 auto sc = rxsc::make_current_thread(); 82 auto w = sc.create_worker(); 83 int runs = 10; 84 __anon442cb92f0702(const rxsc::schedulable& self) 85 auto loop = [&](const rxsc::schedulable& self) { 86 int c = 0; 87 int n = 1; 88 auto start = clock::now(); 89 for (int i = 0; i < subscriptions; i++) { 90 rx::observable<>::just(1) 91 .combine_latest([](int i, int j) { 92 return i + j; 93 }, rx::observable<>::just(2)) 94 .subscribe([&](int){ 95 ++c; 96 }); 97 } 98 auto finish = clock::now(); 99 auto msElapsed = duration_cast<milliseconds>(finish-start); 100 std::cout << "loop subscribe combine_latest : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 101 102 if (--runs > 0) { 103 self(); 104 } 105 }; 106 107 w.schedule(loop); 108 } 109 } 110 } 111 112 SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){ 113 GIVEN("range"){ 114 WHEN("synchronized"){ 115 using namespace std::chrono; 116 typedef steady_clock clock; 117 118 auto sc = rxsc::make_current_thread(); 119 auto w = sc.create_worker(); 120 121 auto es = rx::synchronize_event_loop(); 122 123 const int values = 10000; 124 125 int runs = 10; 126 __anon442cb92f0a02(const rxsc::schedulable& self) 127 auto loop = [&](const rxsc::schedulable& self) { 128 std::atomic<int> c(0); 129 int n = 1; 130 auto liftrequirecompletion = [&](rx::subscriber<int> dest){ 131 auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest)); 132 std::get<2>(*completionstate).add([=](){ 133 if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) { 134 abort(); 135 } 136 }); 137 // VS2013 deduction issue requires dynamic (type-forgetting) 138 return rx::make_subscriber<int>( 139 std::get<2>(*completionstate), 140 [=](int n){ 141 ++std::get<1>(*completionstate); 142 std::get<2>(*completionstate).on_next(n); 143 }, 144 [=](rxu::error_ptr){ 145 abort(); 146 }, 147 [=](){ 148 if (std::get<1>(*completionstate) != values) { 149 abort(); 150 } 151 std::get<0>(*completionstate) = true; 152 std::get<2>(*completionstate).on_completed(); 153 }).as_dynamic(); 154 }; 155 auto start = clock::now(); 156 auto ew = es.create_coordinator().get_worker(); 157 std::atomic<int> v(0); 158 auto s0 = rxs::range(1, es) 159 .take(values) 160 .lift<int>(liftrequirecompletion) 161 .as_dynamic() 162 .publish_synchronized(es) 163 .ref_count() 164 .lift<int>(liftrequirecompletion) 165 .subscribe( 166 rx::make_observer_dynamic<int>( 167 [&](int){ 168 ++v; 169 }, 170 [&](){ 171 ++c; 172 })); 173 auto s1 = rxs::range(values + 1, es) 174 .take(values) 175 .lift<int>(liftrequirecompletion) 176 .as_dynamic() 177 .publish_synchronized(es) 178 .ref_count() 179 .lift<int>(liftrequirecompletion) 180 .subscribe( 181 rx::make_observer_dynamic<int>( 182 [&](int){ 183 ++v; 184 }, 185 [&](){ 186 ++c; 187 })); 188 auto s2 = rxs::range((values * 2) + 1, es) 189 .take(values) 190 .lift<int>(liftrequirecompletion) 191 .as_dynamic() 192 .publish_synchronized(es) 193 .ref_count() 194 .lift<int>(liftrequirecompletion) 195 .subscribe( 196 rx::make_observer_dynamic<int>( 197 [&](int){ 198 ++v; 199 }, 200 [&](){ 201 ++c; 202 })); 203 while(v != values * 3 || c != 3); 204 s0.unsubscribe(); 205 s1.unsubscribe(); 206 s2.unsubscribe(); 207 auto finish = clock::now(); 208 auto msElapsed = duration_cast<milliseconds>(finish-start); 209 std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 210 211 if (--runs > 0) { 212 self(); 213 } 214 }; 215 216 w.schedule(loop); 217 } 218 } 219 } 220 221 SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){ 222 GIVEN("range"){ 223 WHEN("observed on"){ 224 using namespace std::chrono; 225 typedef steady_clock clock; 226 227 auto sc = rxsc::make_current_thread(); 228 auto w = sc.create_worker(); 229 230 auto es = rx::observe_on_event_loop(); 231 232 const int values = 10000; 233 234 int runs = 10; 235 __anon442cb92f1602(const rxsc::schedulable& self) 236 auto loop = [&](const rxsc::schedulable& self) { 237 std::atomic<int> c(0); 238 int n = 1; 239 auto liftrequirecompletion = [&](rx::subscriber<int> dest){ 240 auto completionstate = std::make_shared<std::tuple<bool, long, rx::subscriber<int>>>(false, 0, std::move(dest)); 241 std::get<2>(*completionstate).add([=](){ 242 if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) { 243 abort(); 244 } 245 }); 246 // VS2013 deduction issue requires dynamic (type-forgetting) 247 return rx::make_subscriber<int>( 248 std::get<2>(*completionstate), 249 [=](int n){ 250 ++std::get<1>(*completionstate); 251 std::get<2>(*completionstate).on_next(n); 252 }, 253 [=](rxu::error_ptr){ 254 abort(); 255 }, 256 [=](){ 257 if (std::get<1>(*completionstate) != values) { 258 abort(); 259 } 260 std::get<0>(*completionstate) = true; 261 std::get<2>(*completionstate).on_completed(); 262 }).as_dynamic(); 263 }; 264 auto start = clock::now(); 265 auto ew = es.create_coordinator().get_worker(); 266 std::atomic<int> v(0); 267 auto s0 = rxs::range(1, es) 268 .take(values) 269 .lift<int>(liftrequirecompletion) 270 .as_dynamic() 271 .observe_on(es) 272 .lift<int>(liftrequirecompletion) 273 .subscribe( 274 rx::make_observer_dynamic<int>( 275 [&](int){ 276 ++v; 277 }, 278 [&](){ 279 ++c; 280 })); 281 auto s1 = rxs::range(values + 1, es) 282 .take(values) 283 .lift<int>(liftrequirecompletion) 284 .as_dynamic() 285 .observe_on(es) 286 .lift<int>(liftrequirecompletion) 287 .subscribe( 288 rx::make_observer_dynamic<int>( 289 [&](int){ 290 ++v; 291 }, 292 [&](){ 293 ++c; 294 })); 295 auto s2 = rxs::range((values * 2) + 1, es) 296 .take(values) 297 .lift<int>(liftrequirecompletion) 298 .as_dynamic() 299 .observe_on(es) 300 .lift<int>(liftrequirecompletion) 301 .subscribe( 302 rx::make_observer_dynamic<int>( 303 [&](int){ 304 ++v; 305 }, 306 [&](){ 307 ++c; 308 })); 309 while(v != values * 3 || c != 3); 310 s0.unsubscribe(); 311 s1.unsubscribe(); 312 s2.unsubscribe(); 313 auto finish = clock::now(); 314 auto msElapsed = duration_cast<milliseconds>(finish-start); 315 std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; 316 317 if (--runs > 0) { 318 self(); 319 } 320 }; 321 322 w.schedule(loop); 323 } 324 } 325 } 326 327 SCENARIO("subscription traits", "[subscription][traits]"){ 328 GIVEN("given some subscription types"){ 329 auto es = rx::make_subscription(); 330 rx::composite_subscription cs; 331 WHEN("tested"){ 332 THEN("is_subscription value is true for empty subscription"){ 333 REQUIRE(rx::is_subscription<decltype(es)>::value); 334 } 335 THEN("is_subscription value is true for composite_subscription"){ 336 REQUIRE(rx::is_subscription<decltype(cs)>::value); 337 } 338 } 339 } 340 } 341 342 SCENARIO("non-subscription traits", "[subscription][traits]"){ 343 GIVEN("given some non-subscription types"){ __anon442cb92f2202()344 auto l = [](){}; 345 int i = 0; 346 void* v = nullptr; 347 WHEN("tested"){ 348 THEN("is_subscription value is false for lambda"){ 349 l(); 350 REQUIRE(!rx::is_subscription<decltype(l)>::value); 351 } 352 THEN("is_subscription value is false for int"){ 353 i = 0; 354 REQUIRE(!rx::is_subscription<decltype(i)>::value); 355 } 356 THEN("is_subscription value is false for void*"){ 357 v = nullptr; 358 REQUIRE(!rx::is_subscription<decltype(v)>::value); 359 } 360 THEN("is_subscription value is false for void"){ 361 REQUIRE(!rx::is_subscription<void>::value); 362 } 363 } 364 } 365 } 366 367 SCENARIO("subscription static", "[subscription]"){ 368 GIVEN("given a subscription"){ 369 int i=0; __anon442cb92f2302()370 auto s = rx::make_subscription([&i](){++i;}); 371 WHEN("not used"){ 372 THEN("is subscribed"){ 373 REQUIRE(s.is_subscribed()); 374 } 375 THEN("i is 0"){ 376 REQUIRE(i == 0); 377 } 378 } 379 WHEN("used"){ 380 THEN("is not subscribed when unsubscribed once"){ 381 s.unsubscribe(); 382 REQUIRE(!s.is_subscribed()); 383 } 384 THEN("is not subscribed when unsubscribed twice"){ 385 s.unsubscribe(); 386 s.unsubscribe(); 387 REQUIRE(!s.is_subscribed()); 388 } 389 THEN("i is 1 when unsubscribed once"){ 390 s.unsubscribe(); 391 REQUIRE(i == 1); 392 } 393 THEN("i is 1 when unsubscribed twice"){ 394 s.unsubscribe(); 395 s.unsubscribe(); 396 REQUIRE(i == 1); 397 } 398 } 399 } 400 } 401 402 SCENARIO("subscription empty", "[subscription]"){ 403 GIVEN("given an empty subscription"){ 404 auto s = rx::make_subscription(); 405 WHEN("not used"){ 406 THEN("is not subscribed"){ 407 REQUIRE(!s.is_subscribed()); 408 } 409 } 410 WHEN("used"){ 411 THEN("is not subscribed when unsubscribed once"){ 412 s.unsubscribe(); 413 REQUIRE(!s.is_subscribed()); 414 } 415 THEN("is not subscribed when unsubscribed twice"){ 416 s.unsubscribe(); 417 s.unsubscribe(); 418 REQUIRE(!s.is_subscribed()); 419 } 420 } 421 } 422 } 423 424 SCENARIO("subscription composite", "[subscription]"){ 425 GIVEN("given a subscription"){ 426 int i=0; 427 rx::composite_subscription s; 428 s.add(rx::make_subscription()); __anon442cb92f2402()429 s.add(rx::make_subscription([&i](){++i;})); __anon442cb92f2502()430 s.add([&i](){++i;}); 431 WHEN("not used"){ 432 THEN("is subscribed"){ 433 REQUIRE(s.is_subscribed()); 434 } 435 THEN("i is 0"){ 436 REQUIRE(i == 0); 437 } 438 } 439 WHEN("used"){ 440 THEN("is not subscribed when unsubscribed once"){ 441 s.unsubscribe(); 442 REQUIRE(!s.is_subscribed()); 443 } 444 THEN("is not subscribed when unsubscribed twice"){ 445 s.unsubscribe(); 446 s.unsubscribe(); 447 REQUIRE(!s.is_subscribed()); 448 } 449 THEN("i is 2 when unsubscribed once"){ 450 s.unsubscribe(); 451 REQUIRE(i == 2); 452 } 453 THEN("i is 2 when unsubscribed twice"){ 454 s.unsubscribe(); 455 s.unsubscribe(); 456 REQUIRE(i == 2); 457 } 458 } 459 } 460 } 461 462