1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("merge sample"){ 7 printf("//! [merge sample]\n"); __anon1fe4ec610102(int) 8 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); __anon1fe4ec610202(int) 9 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); __anon1fe4ec610302(int) 10 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); 11 auto values = o1.merge(o2, o3); 12 values. 13 subscribe( __anon1fe4ec610402(int v)14 [](int v){printf("OnNext: %d\n", v);}, __anon1fe4ec610502()15 [](){printf("OnCompleted\n");}); 16 printf("//! [merge sample]\n"); 17 } 18 19 SCENARIO("implicit merge sample"){ 20 printf("//! [implicit merge sample]\n"); __anon1fe4ec610602(int) 21 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); __anon1fe4ec610702(int) 22 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); __anon1fe4ec610802(int) 23 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); 24 auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); 25 auto values = base.merge(); 26 values. 27 subscribe( __anon1fe4ec610902(int v)28 [](int v){printf("OnNext: %d\n", v);}, __anon1fe4ec610a02()29 [](){printf("OnCompleted\n");}); 30 printf("//! [implicit merge sample]\n"); 31 } 32 33 #include "main.hpp" 34 35 SCENARIO("threaded merge sample"){ 36 printf("//! [threaded merge sample]\n"); 37 printf("[thread %s] Start task\n", get_pid().c_str()); __anon1fe4ec610b02(int) 38 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { 39 printf("[thread %s] Timer1 fired\n", get_pid().c_str()); 40 return 1; 41 }); __anon1fe4ec610c02(int) 42 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { 43 printf("[thread %s] Timer2 fired\n", get_pid().c_str()); 44 return 2; 45 }); __anon1fe4ec610d02(int) 46 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { 47 printf("[thread %s] Timer3 fired\n", get_pid().c_str()); 48 return 3; 49 }); 50 auto values = o1.merge(rxcpp::observe_on_new_thread(), o2, o3); 51 values. 52 as_blocking(). 53 subscribe( __anon1fe4ec610e02(int v)54 [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, __anon1fe4ec610f02()55 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 56 printf("[thread %s] Finish task\n", get_pid().c_str()); 57 printf("//! [threaded merge sample]\n"); 58 } 59 60 SCENARIO("threaded implicit merge sample"){ 61 printf("//! [threaded implicit merge sample]\n"); 62 printf("[thread %s] Start task\n", get_pid().c_str()); __anon1fe4ec611002(int) 63 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { 64 printf("[thread %s] Timer1 fired\n", get_pid().c_str()); 65 return 1; 66 }); __anon1fe4ec611102(int) 67 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).map([](int) { 68 printf("[thread %s] Timer2 fired\n", get_pid().c_str()); 69 return 2; 70 }); __anon1fe4ec611202(int) 71 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) { 72 printf("[thread %s] Timer3 fired\n", get_pid().c_str()); 73 return 3; 74 }); 75 auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); 76 auto values = base.merge(rxcpp::observe_on_new_thread()); 77 values. 78 as_blocking(). 79 subscribe( __anon1fe4ec611302(int v)80 [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, __anon1fe4ec611402()81 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 82 printf("[thread %s] Finish task\n", get_pid().c_str()); 83 printf("//! [threaded implicit merge sample]\n"); 84 } 85