• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "rxcpp/rx.hpp"
2 
3 #include "rxcpp/rx-test.hpp"
4 #include "catch.hpp"
5 
6 SCENARIO("buffer count sample"){
7     printf("//! [buffer count sample]\n");
8     auto values = rxcpp::observable<>::range(1, 5).buffer(2);
9     values.
10         subscribe(
__anon4592fcc90102(std::vector<int> v)11             [](std::vector<int> v){
12                 printf("OnNext:");
13                 std::for_each(v.begin(), v.end(), [](int a){
14                     printf(" %d", a);
15                 });
16                 printf("\n");
17             },
__anon4592fcc90302()18             [](){printf("OnCompleted\n");});
19     printf("//! [buffer count sample]\n");
20 }
21 
22 SCENARIO("buffer count+skip sample"){
23     printf("//! [buffer count+skip sample]\n");
24     auto values = rxcpp::observable<>::range(1, 7).buffer(2, 3);
25     values.
26         subscribe(
__anon4592fcc90402(std::vector<int> v)27             [](std::vector<int> v){
28                 printf("OnNext:");
29                 std::for_each(v.begin(), v.end(), [](int a){
30                     printf(" %d", a);
31                 });
32                 printf("\n");
33             },
__anon4592fcc90602()34             [](){printf("OnCompleted\n");});
35     printf("//! [buffer count+skip sample]\n");
36 }
37 
38 #include "main.hpp"
39 
40 SCENARIO("buffer period+skip+coordination sample"){
41     printf("//! [buffer period+skip+coordination sample]\n");
42     printf("[thread %s] Start task\n", get_pid().c_str());
43     auto period = std::chrono::milliseconds(4);
44     auto skip = std::chrono::milliseconds(6);
45     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
__anon4592fcc90702(long v)46         map([](long v){
47             printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
48             return v;
49         }).
50         take(7).
51         buffer_with_time(period, skip, rxcpp::observe_on_new_thread());
52     values.
53         as_blocking().
54         subscribe(
__anon4592fcc90802(std::vector<long> v)55             [](std::vector<long> v){
56                 printf("[thread %s] OnNext:", get_pid().c_str());
57                 std::for_each(v.begin(), v.end(), [](long a){
58                     printf(" %ld", a);
59                 });
60                 printf("\n");
61             },
__anon4592fcc90a02()62             [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
63     printf("[thread %s] Finish task\n", get_pid().c_str());
64     printf("//! [buffer period+skip+coordination sample]\n");
65 }
66 
67 SCENARIO("buffer period+skip sample"){
68     printf("//! [buffer period+skip sample]\n");
69     auto period = std::chrono::milliseconds(4);
70     auto skip = std::chrono::milliseconds(6);
71     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
72         take(7).
73         buffer_with_time(period, skip);
74     values.
75         subscribe(
__anon4592fcc90b02(std::vector<long> v)76             [](std::vector<long> v){
77                 printf("OnNext:");
78                 std::for_each(v.begin(), v.end(), [](long a){
79                     printf(" %ld", a);
80                 });
81                 printf("\n");
82             },
__anon4592fcc90d02()83             [](){printf("OnCompleted\n");});
84     printf("//! [buffer period+skip sample]\n");
85 }
86 
87 SCENARIO("buffer period+skip overlapping sample"){
88     printf("//! [buffer period+skip overlapping sample]\n");
89     auto period = std::chrono::milliseconds(6);
90     auto skip = std::chrono::milliseconds(4);
91     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
92         take(7).
93         buffer_with_time(period, skip);
94     values.
95         subscribe(
__anon4592fcc90e02(std::vector<long> v)96             [](std::vector<long> v){
97                 printf("OnNext:");
98                 std::for_each(v.begin(), v.end(), [](long a){
99                     printf(" %ld", a);
100                 });
101                 printf("\n");
102             },
__anon4592fcc91002()103             [](){printf("OnCompleted\n");});
104     printf("//! [buffer period+skip overlapping sample]\n");
105 }
106 
107 SCENARIO("buffer period+skip empty sample"){
108     printf("//! [buffer period+skip empty sample]\n");
109     auto period = std::chrono::milliseconds(2);
110     auto skip = std::chrono::milliseconds(4);
111     auto values = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).
112         buffer_with_time(period, skip);
113     values.
114         subscribe(
__anon4592fcc91102(std::vector<long> v)115             [](std::vector<long> v){
116                 printf("OnNext:");
117                 std::for_each(v.begin(), v.end(), [](long a){
118                     printf(" %ld", a);
119                 });
120                 printf("\n");
121             },
__anon4592fcc91302()122             [](){printf("OnCompleted\n");});
123     printf("//! [buffer period+skip empty sample]\n");
124 }
125 
126 SCENARIO("buffer period+coordination sample"){
127     printf("//! [buffer period+coordination sample]\n");
128     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
129         take(7).
130         buffer_with_time(std::chrono::milliseconds(4), rxcpp::observe_on_new_thread());
131     values.
132         as_blocking().
133         subscribe(
__anon4592fcc91402(std::vector<long> v)134             [](std::vector<long> v){
135                 printf("OnNext:");
136                 std::for_each(v.begin(), v.end(), [](long a){
137                     printf(" %ld", a);
138                 });
139                 printf("\n");
140             },
__anon4592fcc91602()141             [](){printf("OnCompleted\n");});
142     printf("//! [buffer period+coordination sample]\n");
143 }
144 
145 SCENARIO("buffer period sample"){
146     printf("//! [buffer period sample]\n");
147     auto values = rxcpp::observable<>::interval(std::chrono::steady_clock::now() + std::chrono::milliseconds(1), std::chrono::milliseconds(2)).
148         take(7).
149         buffer_with_time(std::chrono::milliseconds(4));
150     values.
151         subscribe(
__anon4592fcc91702(std::vector<long> v)152             [](std::vector<long> v){
153                 printf("OnNext:");
154                 std::for_each(v.begin(), v.end(), [](long a){
155                     printf(" %ld", a);
156                 });
157                 printf("\n");
158             },
__anon4592fcc91902()159             [](){printf("OnCompleted\n");});
160     printf("//! [buffer period sample]\n");
161 }
162 
163 SCENARIO("buffer period+count+coordination sample"){
164     printf("//! [buffer period+count+coordination sample]\n");
165     auto int1 = rxcpp::observable<>::range(1L, 3L);
166     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
167     auto values = int1.
168         concat(int2).
169         buffer_with_time_or_count(std::chrono::milliseconds(20), 2, rxcpp::observe_on_event_loop());
170     values.
171         as_blocking().
172         subscribe(
__anon4592fcc91a02(std::vector<long> v)173             [](std::vector<long> v){
174                 printf("OnNext:");
175                 std::for_each(v.begin(), v.end(), [](long a){
176                     printf(" %ld", a);
177                 });
178                 printf("\n");
179             },
__anon4592fcc91c02()180             [](){printf("OnCompleted\n");});
181     printf("//! [buffer period+count+coordination sample]\n");
182 }
183 
184 SCENARIO("buffer period+count sample"){
185     printf("//! [buffer period+count sample]\n");
186     auto int1 = rxcpp::observable<>::range(1L, 3L);
187     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
188     auto values = int1.
189         concat(int2).
190         buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
191     values.
192         subscribe(
__anon4592fcc91d02(std::vector<long> v)193             [](std::vector<long> v){
194                 printf("OnNext:");
195                 std::for_each(v.begin(), v.end(), [](long a){
196                     printf(" %ld", a);
197                 });
198                 printf("\n");
199             },
__anon4592fcc91f02()200             [](){printf("OnCompleted\n");});
201     printf("//! [buffer period+count sample]\n");
202 }
203