• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "rxcpp/rx.hpp"
2 
3 #include "rxcpp/rx-test.hpp"
4 #include "catch.hpp"
5 
6 #include "main.hpp"
7 
8 SCENARIO("replay sample"){
9     printf("//! [replay sample]\n");
10     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
11         take(5).
12         replay();
13 
14     // Subscribe from the beginning
15     values.subscribe(
__anond4f2187e0102(long v)16         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond4f2187e0202()17         [](){printf("[1] OnCompleted\n");});
18 
19     // Start emitting
20     values.connect();
21 
22     // Wait before subscribing
__anond4f2187e0302(long)23     rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
24         values.as_blocking().subscribe(
25             [](long v){printf("[2] OnNext: %ld\n", v);},
26             [](){printf("[2] OnCompleted\n");});
27     });
28     printf("//! [replay sample]\n");
29 }
30 
31 SCENARIO("threaded replay sample"){
32     printf("//! [threaded replay sample]\n");
33     printf("[thread %s] Start task\n", get_pid().c_str());
34     auto coordination = rxcpp::serialize_new_thread();
35     auto worker = coordination.create_coordinator().get_worker();
36     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
37         take(5).
38         replay(coordination);
39 
40     // Subscribe from the beginning
__anond4f2187e0602(const rxcpp::schedulers::schedulable&)41     worker.schedule([&](const rxcpp::schedulers::schedulable&){
42         values.subscribe(
43             [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
44             [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
45     });
46 
47     // Wait before subscribing
__anond4f2187e0902(const rxcpp::schedulers::schedulable&)48     worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
49         values.subscribe(
50             [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
51             [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
52     });
53 
54     // Start emitting
__anond4f2187e0c02(const rxcpp::schedulers::schedulable&)55     worker.schedule([&](const rxcpp::schedulers::schedulable&){
56         values.connect();
57     });
58 
59     // Add blocking subscription to see results
60     values.as_blocking().subscribe();
61     printf("[thread %s] Finish task\n", get_pid().c_str());
62     printf("//! [threaded replay sample]\n");
63 }
64 
65 SCENARIO("replay count sample"){
66     printf("//! [replay count sample]\n");
67     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
68         take(5).
69         replay(2);
70 
71     // Subscribe from the beginning
72     values.subscribe(
__anond4f2187e0d02(long v)73         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond4f2187e0e02()74         [](){printf("[1] OnCompleted\n");});
75 
76     // Start emitting
77     values.connect();
78 
79     // Wait before subscribing
__anond4f2187e0f02(long)80     rxcpp::observable<>::timer(std::chrono::milliseconds(125)).subscribe([&](long){
81         values.as_blocking().subscribe(
82             [](long v){printf("[2] OnNext: %ld\n", v);},
83             [](){printf("[2] OnCompleted\n");});
84     });
85     printf("//! [replay count sample]\n");
86 }
87 
88 SCENARIO("threaded replay count sample"){
89     printf("//! [threaded replay count sample]\n");
90     printf("[thread %s] Start task\n", get_pid().c_str());
91     auto coordination = rxcpp::serialize_new_thread();
92     auto worker = coordination.create_coordinator().get_worker();
93     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
94         take(5).
95         replay(2, coordination);
96 
97     // Subscribe from the beginning
__anond4f2187e1202(const rxcpp::schedulers::schedulable&)98     worker.schedule([&](const rxcpp::schedulers::schedulable&){
99         values.subscribe(
100             [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
101             [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
102     });
103 
104     // Wait before subscribing
__anond4f2187e1502(const rxcpp::schedulers::schedulable&)105     worker.schedule(coordination.now() + std::chrono::milliseconds(125), [&](const rxcpp::schedulers::schedulable&){
106         values.subscribe(
107             [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
108             [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
109     });
110 
111     // Start emitting
__anond4f2187e1802(const rxcpp::schedulers::schedulable&)112     worker.schedule([&](const rxcpp::schedulers::schedulable&){
113         values.connect();
114     });
115 
116     // Add blocking subscription to see results
117     values.as_blocking().subscribe();
118     printf("[thread %s] Finish task\n", get_pid().c_str());
119     printf("//! [threaded replay count sample]\n");
120 }
121 
122 SCENARIO("replay period sample"){
123     printf("//! [replay period sample]\n");
124     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
125         take(5).
126         replay(std::chrono::milliseconds(125));
127 
128     // Subscribe from the beginning
129     values.subscribe(
__anond4f2187e1902(long v)130         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond4f2187e1a02()131         [](){printf("[1] OnCompleted\n");});
132 
133     // Start emitting
134     values.connect();
135 
136     // Wait before subscribing
__anond4f2187e1b02(long)137     rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
138         values.as_blocking().subscribe(
139             [](long v){printf("[2] OnNext: %ld\n", v);},
140             [](){printf("[2] OnCompleted\n");});
141     });
142     printf("//! [replay period sample]\n");
143 }
144 
145 SCENARIO("threaded replay period sample"){
146     printf("//! [threaded replay period sample]\n");
147     printf("[thread %s] Start task\n", get_pid().c_str());
148     auto coordination = rxcpp::serialize_new_thread();
149     auto worker = coordination.create_coordinator().get_worker();
150     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
151         take(5).
152         replay(std::chrono::milliseconds(125), coordination);
153 
154     // Subscribe from the beginning
__anond4f2187e1e02(const rxcpp::schedulers::schedulable&)155     worker.schedule([&](const rxcpp::schedulers::schedulable&){
156         values.subscribe(
157             [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
158             [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
159     });
160 
161     // Wait before subscribing
__anond4f2187e2102(const rxcpp::schedulers::schedulable&)162     worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
163         values.subscribe(
164             [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
165             [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
166     });
167 
168     // Start emitting
__anond4f2187e2402(const rxcpp::schedulers::schedulable&)169     worker.schedule([&](const rxcpp::schedulers::schedulable&){
170         values.connect();
171     });
172 
173     // Add blocking subscription to see results
174     values.as_blocking().subscribe();
175     printf("[thread %s] Finish task\n", get_pid().c_str());
176     printf("//! [threaded replay period sample]\n");
177 }
178 
179 SCENARIO("replay count+period sample"){
180     printf("//! [replay count+period sample]\n");
181     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
182         take(5).
183         replay(2, std::chrono::milliseconds(125));
184 
185     // Subscribe from the beginning
186     values.subscribe(
__anond4f2187e2502(long v)187         [](long v){printf("[1] OnNext: %ld\n", v);},
__anond4f2187e2602()188         [](){printf("[1] OnCompleted\n");});
189 
190     // Start emitting
191     values.connect();
192 
193     // Wait before subscribing
__anond4f2187e2702(long)194     rxcpp::observable<>::timer(std::chrono::milliseconds(175)).subscribe([&](long){
195         values.as_blocking().subscribe(
196             [](long v){printf("[2] OnNext: %ld\n", v);},
197             [](){printf("[2] OnCompleted\n");});
198     });
199     printf("//! [replay count+period sample]\n");
200 }
201 
202 SCENARIO("threaded replay count+period sample"){
203     printf("//! [threaded replay count+period sample]\n");
204     printf("[thread %s] Start task\n", get_pid().c_str());
205     auto coordination = rxcpp::serialize_new_thread();
206     auto worker = coordination.create_coordinator().get_worker();
207     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
208         take(5).
209         replay(2, std::chrono::milliseconds(125), coordination);
210 
211     // Subscribe from the beginning
__anond4f2187e2a02(const rxcpp::schedulers::schedulable&)212     worker.schedule([&](const rxcpp::schedulers::schedulable&){
213         values.subscribe(
214             [](long v){printf("[thread %s][1] OnNext: %ld\n", get_pid().c_str(), v);},
215             [](){printf("[thread %s][1] OnCompleted\n", get_pid().c_str());});
216     });
217 
218     // Wait before subscribing
__anond4f2187e2d02(const rxcpp::schedulers::schedulable&)219     worker.schedule(coordination.now() + std::chrono::milliseconds(175), [&](const rxcpp::schedulers::schedulable&){
220         values.subscribe(
221             [](long v){printf("[thread %s][2] OnNext: %ld\n", get_pid().c_str(), v);},
222             [](){printf("[thread %s][2] OnCompleted\n", get_pid().c_str());});
223     });
224 
225     // Start emitting
__anond4f2187e3002(const rxcpp::schedulers::schedulable&)226     worker.schedule([&](const rxcpp::schedulers::schedulable&){
227         values.connect();
228     });
229 
230     // Add blocking subscription to see results
231     values.as_blocking().subscribe();
232     printf("[thread %s] Finish task\n", get_pid().c_str());
233     printf("//! [threaded replay count+period sample]\n");
234 }
235