1 #include "rxcpp/rx.hpp" 2 3 #include "rxcpp/rx-test.hpp" 4 #include "catch.hpp" 5 6 #include "main.hpp" 7 8 SCENARIO("replay sample"){ 9 printf("//! [replay sample]\n"); 10 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 11 take(5). 12 replay(); 13 14 // Subscribe from the beginning 15 values.subscribe( __anond4f2187e0102(long v)16 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond4f2187e0202()17 [](){printf("[1] OnCompleted\n");}); 18 19 // Start emitting 20 values.connect(); 21 22 // Wait before subscribing __anond4f2187e0302(long)23 rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){ 24 values.as_blocking().subscribe( 25 [](long v){printf("[2] OnNext: %ld\n", v);}, 26 [](){printf("[2] OnCompleted\n");}); 27 }); 28 printf("//! [replay sample]\n"); 29 } 30 31 SCENARIO("threaded replay sample"){ 32 printf("//! [threaded replay sample]\n"); 33 printf("[thread %s] Start task\n", get_pid().c_str()); 34 auto coordination = rxcpp::serialize_new_thread(); 35 auto worker = coordination.create_coordinator().get_worker(); 36 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). 37 take(5). 38 replay(coordination); 39 40 // Subscribe from the beginning __anond4f2187e0602(const rxcpp::schedulers::schedulable&)41 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 42 values.subscribe( 43 [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, 44 [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); 45 }); 46 47 // Wait before subscribing __anond4f2187e0902(const rxcpp::schedulers::schedulable&)48 worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){ 49 values.subscribe( 50 [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, 51 [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); 52 }); 53 54 // Start emitting __anond4f2187e0c02(const rxcpp::schedulers::schedulable&)55 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 56 values.connect(); 57 }); 58 59 // Add blocking subscription to see results 60 values.as_blocking().subscribe(); 61 printf("[thread %s] Finish task\n", get_pid().c_str()); 62 printf("//! [threaded replay sample]\n"); 63 } 64 65 SCENARIO("replay count sample"){ 66 printf("//! [replay count sample]\n"); 67 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 68 take(5). 69 replay(2); 70 71 // Subscribe from the beginning 72 values.subscribe( __anond4f2187e0d02(long v)73 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond4f2187e0e02()74 [](){printf("[1] OnCompleted\n");}); 75 76 // Start emitting 77 values.connect(); 78 79 // Wait before subscribing __anond4f2187e0f02(long)80 rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){ 81 values.as_blocking().subscribe( 82 [](long v){printf("[2] OnNext: %ld\n", v);}, 83 [](){printf("[2] OnCompleted\n");}); 84 }); 85 printf("//! [replay count sample]\n"); 86 } 87 88 SCENARIO("threaded replay count sample"){ 89 printf("//! [threaded replay count sample]\n"); 90 printf("[thread %s] Start task\n", get_pid().c_str()); 91 auto coordination = rxcpp::serialize_new_thread(); 92 auto worker = coordination.create_coordinator().get_worker(); 93 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). 94 take(5). 95 replay(2, coordination); 96 97 // Subscribe from the beginning __anond4f2187e1202(const rxcpp::schedulers::schedulable&)98 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 99 values.subscribe( 100 [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, 101 [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); 102 }); 103 104 // Wait before subscribing __anond4f2187e1502(const rxcpp::schedulers::schedulable&)105 worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){ 106 values.subscribe( 107 [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, 108 [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); 109 }); 110 111 // Start emitting __anond4f2187e1802(const rxcpp::schedulers::schedulable&)112 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 113 values.connect(); 114 }); 115 116 // Add blocking subscription to see results 117 values.as_blocking().subscribe(); 118 printf("[thread %s] Finish task\n", get_pid().c_str()); 119 printf("//! [threaded replay count sample]\n"); 120 } 121 122 SCENARIO("replay period sample"){ 123 printf("//! [replay period sample]\n"); 124 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 125 take(5). 126 replay(std::chrono::milliseconds(125)); 127 128 // Subscribe from the beginning 129 values.subscribe( __anond4f2187e1902(long v)130 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond4f2187e1a02()131 [](){printf("[1] OnCompleted\n");}); 132 133 // Start emitting 134 values.connect(); 135 136 // Wait before subscribing __anond4f2187e1b02(long)137 rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){ 138 values.as_blocking().subscribe( 139 [](long v){printf("[2] OnNext: %ld\n", v);}, 140 [](){printf("[2] OnCompleted\n");}); 141 }); 142 printf("//! [replay period sample]\n"); 143 } 144 145 SCENARIO("threaded replay period sample"){ 146 printf("//! [threaded replay period sample]\n"); 147 printf("[thread %s] Start task\n", get_pid().c_str()); 148 auto coordination = rxcpp::serialize_new_thread(); 149 auto worker = coordination.create_coordinator().get_worker(); 150 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). 151 take(5). 152 replay(std::chrono::milliseconds(125), coordination); 153 154 // Subscribe from the beginning __anond4f2187e1e02(const rxcpp::schedulers::schedulable&)155 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 156 values.subscribe( 157 [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, 158 [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); 159 }); 160 161 // Wait before subscribing __anond4f2187e2102(const rxcpp::schedulers::schedulable&)162 worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){ 163 values.subscribe( 164 [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, 165 [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); 166 }); 167 168 // Start emitting __anond4f2187e2402(const rxcpp::schedulers::schedulable&)169 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 170 values.connect(); 171 }); 172 173 // Add blocking subscription to see results 174 values.as_blocking().subscribe(); 175 printf("[thread %s] Finish task\n", get_pid().c_str()); 176 printf("//! [threaded replay period sample]\n"); 177 } 178 179 SCENARIO("replay count+period sample"){ 180 printf("//! [replay count+period sample]\n"); 181 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). 182 take(5). 183 replay(2, std::chrono::milliseconds(125)); 184 185 // Subscribe from the beginning 186 values.subscribe( __anond4f2187e2502(long v)187 [](long v){printf("[1] OnNext: %ld\n", v);}, __anond4f2187e2602()188 [](){printf("[1] OnCompleted\n");}); 189 190 // Start emitting 191 values.connect(); 192 193 // Wait before subscribing __anond4f2187e2702(long)194 rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){ 195 values.as_blocking().subscribe( 196 [](long v){printf("[2] OnNext: %ld\n", v);}, 197 [](){printf("[2] OnCompleted\n");}); 198 }); 199 printf("//! [replay count+period sample]\n"); 200 } 201 202 SCENARIO("threaded replay count+period sample"){ 203 printf("//! [threaded replay count+period sample]\n"); 204 printf("[thread %s] Start task\n", get_pid().c_str()); 205 auto coordination = rxcpp::serialize_new_thread(); 206 auto worker = coordination.create_coordinator().get_worker(); 207 auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). 208 take(5). 209 replay(2, std::chrono::milliseconds(125), coordination); 210 211 // Subscribe from the beginning __anond4f2187e2a02(const rxcpp::schedulers::schedulable&)212 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 213 values.subscribe( 214 [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);}, 215 [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());}); 216 }); 217 218 // Wait before subscribing __anond4f2187e2d02(const rxcpp::schedulers::schedulable&)219 worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){ 220 values.subscribe( 221 [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);}, 222 [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());}); 223 }); 224 225 // Start emitting __anond4f2187e3002(const rxcpp::schedulers::schedulable&)226 worker.schedule([&](const rxcpp::schedulers::schedulable&){ 227 values.connect(); 228 }); 229 230 // Add blocking subscription to see results 231 values.as_blocking().subscribe(); 232 printf("[thread %s] Finish task\n", get_pid().c_str()); 233 printf("//! [threaded replay count+period sample]\n"); 234 } 235