• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "rxcpp/rx.hpp"
2 
3 #include "rxcpp/rx-test.hpp"
4 #include "catch.hpp"
5 
6 SCENARIO("range sample"){
7     printf("//! [range sample]\n");
8     auto values1 = rxcpp::observable<>::range(1, 5);
9     values1.
10         subscribe(
__anon8162dade0102(int v)11             [](int v){printf("OnNext: %d\n", v);},
__anon8162dade0202()12             [](){printf("OnCompleted\n");});
13     printf("//! [range sample]\n");
14 }
15 
16 #include "main.hpp"
17 
18 SCENARIO("threaded range sample"){
19     printf("//! [threaded range sample]\n");
20     printf("[thread %s] Start task\n", get_pid().c_str());
21     auto values = rxcpp::observable<>::range(1, 3, rxcpp::observe_on_new_thread());
22     auto s = values.
__anon8162dade0302(int v) 23         map([](int v) { return std::make_tuple(get_pid(), v);});
24     s.
25         as_blocking().
26         subscribe(
27             rxcpp::util::apply_to(
__anon8162dade0402(const std::string pid, int v) 28                 [](const std::string pid, int v) {
29                     printf("[thread %s] OnNext: %d\n", pid.c_str(), v);
30                 }),
__anon8162dade0502()31             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
32     printf("[thread %s] Finish task\n", get_pid().c_str());
33     printf("//! [threaded range sample]\n");
34 }
35 
36 SCENARIO("subscribe_on range sample"){
37     printf("//! [subscribe_on range sample]\n");
38     printf("[thread %s] Start task\n", get_pid().c_str());
39     auto values = rxcpp::observable<>::range(1, 3);
40     auto s = values.
41         subscribe_on(rxcpp::observe_on_new_thread()).
__anon8162dade0602(int v) 42         map([](int v) { return std::make_tuple(get_pid(), v);});
43     s.
44         as_blocking().
45         subscribe(
46             rxcpp::util::apply_to(
__anon8162dade0702(const std::string pid, int v) 47                 [](const std::string pid, int v) {
48                     printf("[thread %s] OnNext: %d\n", pid.c_str(), v);
49                 }),
__anon8162dade0802()50             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
51     printf("[thread %s] Finish task\n", get_pid().c_str());
52     printf("//! [subscribe_on range sample]\n");
53 }
54 
55 
56 SCENARIO("range concat sample"){
57     printf("//! [range concat sample]\n");
58 
59     auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
60 
61     auto s1 = values.
62         take(3).
__anon8162dade0902(int v) 63         map([](int v) { return std::make_tuple("1:", v);});
64 
65     auto s2 = values.
66         take(3).
__anon8162dade0a02(int v) 67         map([](int v) { return std::make_tuple("2:", v);});
68 
69     s1.
70         concat(s2).
71         subscribe(rxcpp::util::apply_to(
__anon8162dade0b02(const char* s, int p) 72             [](const char* s, int p) {
73                 printf("%s %d\n", s, p);
74             }));
75     printf("//! [range concat sample]\n");
76 }
77 
78 SCENARIO("range merge sample"){
79     printf("//! [range merge sample]\n");
80 
81     auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
82 
83     auto s1 = values.
__anon8162dade0c02(int v) 84         map([](int v) { return std::make_tuple("1:", v);});
85 
86     auto s2 = values.
__anon8162dade0d02(int v) 87         map([](int v) { return std::make_tuple("2:", v);});
88 
89     s1.
90         merge(s2).
91         take(6).
92         as_blocking().
93         subscribe(rxcpp::util::apply_to(
__anon8162dade0e02(const char* s, int p) 94             [](const char* s, int p) {
95                 printf("%s %d\n", s, p);
96             }));
97     printf("//! [range merge sample]\n");
98 }
99 
100 SCENARIO("threaded range concat sample"){
101     printf("//! [threaded range concat sample]\n");
102     auto threads = rxcpp::observe_on_event_loop();
103 
104     auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
105 
106     auto s1 = values.
107         subscribe_on(threads).
108         take(3).
__anon8162dade0f02(int v) 109         map([](int v) { std::this_thread::yield(); return std::make_tuple("1:", v);});
110 
111     auto s2 = values.
112         subscribe_on(threads).
113         take(3).
__anon8162dade1002(int v) 114         map([](int v) { std::this_thread::yield(); return std::make_tuple("2:", v);});
115 
116     s1.
117         concat(s2).
118         observe_on(threads).
119         as_blocking().
120         subscribe(rxcpp::util::apply_to(
__anon8162dade1102(const char* s, int p) 121             [](const char* s, int p) {
122                 printf("%s %d\n", s, p);
123             }));
124     printf("//! [threaded range concat sample]\n");
125 }
126 
127 SCENARIO("threaded range merge sample"){
128     printf("//! [threaded range merge sample]\n");
129     auto threads = rxcpp::observe_on_event_loop();
130 
131     auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
132 
133     auto s1 = values.
134         subscribe_on(threads).
__anon8162dade1202(int v) 135         map([](int v) { std::this_thread::yield(); return std::make_tuple("1:", v);});
136 
137     auto s2 = values.
138         subscribe_on(threads).
__anon8162dade1302(int v) 139         map([](int v) { std::this_thread::yield(); return std::make_tuple("2:", v);});
140 
141     s1.
142         merge(s2).
143         take(6).
144         observe_on(threads).
145         as_blocking().
146         subscribe(rxcpp::util::apply_to(
__anon8162dade1402(const char* s, int p) 147             [](const char* s, int p) {
148                 printf("%s %d\n", s, p);
149             }));
150     printf("//! [threaded range merge sample]\n");
151 }
152 
153