1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 SCENARIO("amb sample"){ 7 printf("//! [amb sample]\n"); __anon880b2ba10102(int) 8 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); __anon880b2ba10202(int) 9 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); __anon880b2ba10302(int) 10 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); 11 auto values = o1.amb(o2, o3); 12 values. 13 subscribe( __anon880b2ba10402(int v)14 [](int v){printf("OnNext: %d\n", v);}, __anon880b2ba10502()15 [](){printf("OnCompleted\n");}); 16 printf("//! [amb sample]\n"); 17 } 18 19 SCENARIO("implicit amb sample"){ 20 printf("//! [implicit amb sample]\n"); __anon880b2ba10602(int) 21 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;}); __anon880b2ba10702(int) 22 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {return 2;}); __anon880b2ba10802(int) 23 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) {return 3;}); 24 auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); 25 auto values = base.amb(); 26 values. 27 subscribe( __anon880b2ba10902(int v)28 [](int v){printf("OnNext: %d\n", v);}, __anon880b2ba10a02()29 [](){printf("OnCompleted\n");}); 30 printf("//! [implicit amb sample]\n"); 31 } 32 33 #include "main.hpp" 34 35 SCENARIO("threaded amb sample"){ 36 printf("//! [threaded amb sample]\n"); 37 printf("[thread %s] Start task\n", get_pid().c_str()); __anon880b2ba10b02(int) 38 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { 39 printf("[thread %s] Timer1 fired\n", get_pid().c_str()); 40 return 1; 41 }); __anon880b2ba10c02(int) 42 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { 43 printf("[thread %s] Timer2 fired\n", get_pid().c_str()); 44 return 2; 45 }); __anon880b2ba10d02(int) 46 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { 47 printf("[thread %s] Timer3 fired\n", get_pid().c_str()); 48 return 3; 49 }); 50 auto values = o1.amb(rxcpp::observe_on_new_thread(), o2, o3); 51 values. 52 as_blocking(). 53 subscribe( __anon880b2ba10e02(int v)54 [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, __anon880b2ba10f02()55 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 56 printf("[thread %s] Finish task\n", get_pid().c_str()); 57 printf("//! [threaded amb sample]\n"); 58 } 59 60 SCENARIO("threaded implicit amb sample"){ 61 printf("//! [threaded implicit amb sample]\n"); 62 printf("[thread %s] Start task\n", get_pid().c_str()); __anon880b2ba11002(int) 63 auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) { 64 printf("[thread %s] Timer1 fired\n", get_pid().c_str()); 65 return 1; 66 }); __anon880b2ba11102(int) 67 auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) { 68 printf("[thread %s] Timer2 fired\n", get_pid().c_str()); 69 return 2; 70 }); __anon880b2ba11202(int) 71 auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(5)).map([](int) { 72 printf("[thread %s] Timer3 fired\n", get_pid().c_str()); 73 return 3; 74 }); 75 auto base = rxcpp::observable<>::from(o1.as_dynamic(), o2, o3); 76 auto values = base.amb(rxcpp::observe_on_new_thread()); 77 values. 78 as_blocking(). 79 subscribe( __anon880b2ba11302(int v)80 [](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);}, __anon880b2ba11402()81 [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); 82 printf("[thread %s] Finish task\n", get_pid().c_str()); 83 printf("//! [threaded implicit amb sample]\n"); 84 } 85