• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-map.hpp>
3 #include <rxcpp/operators/rx-take.hpp>
4 #include <rxcpp/operators/rx-scan.hpp>
5 
6 SCENARIO("scan: issue 41", "[scan][operators][issue][!hide]"){
7     GIVEN("map of scan of interval"){
8         auto sc = rxsc::make_current_thread();
9         auto so = rxcpp::synchronize_in_one_worker(sc);
10         auto start = sc.now() + std::chrono::seconds(2);
11         auto period = std::chrono::seconds(1);
12 
13         rxcpp::observable<>::interval(start, period, so)
__anonde9ebd360102(int a, int i) 14             .scan(0, [] (int a, int i) { return a + i; })
__anonde9ebd360202(int i) 15             .map([] (int i) { return i * i; })
16             .take(10)
__anonde9ebd360302(int i) 17             .subscribe([] (int i) { std::cout << i << std::endl; });
18 
19     }
20 }
21 
22 SCENARIO("scan: seed, never", "[scan][operators]"){
23     GIVEN("a test hot observable of ints"){
24         auto sc = rxsc::make_test();
25         auto w = sc.create_worker();
26         const rxsc::test::messages<int> on;
27 
28         int seed = 1;
29 
30         auto xs = sc.make_hot_observable({
31             on.next(150, 1),
32         });
33 
34         WHEN("mapped to ints that are one larger"){
35 
36             auto res = w.start(
__anonde9ebd360402() 37                 [&]() {
38                     return xs
39                         | rxo::scan(seed, [](int sum, int x) {
40                             return sum + x;
41                         })
42                         // forget type to workaround lambda deduction bug on msvc 2013
43                         | rxo::as_dynamic();
44                 }
45             );
46 
47             THEN("the output is empty"){
48                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
49                 auto actual = res.get_observer().messages();
50                 REQUIRE(required == actual);
51             }
52 
53             THEN("there was one subscription and one unsubscription"){
54                 auto required = rxu::to_vector({
55                     on.subscribe(200, 1000)
56                 });
57                 auto actual = xs.subscriptions();
58                 REQUIRE(required == actual);
59             }
60         }
61     }
62 }
63 
64 SCENARIO("scan: seed, empty", "[scan][operators]"){
65     GIVEN("a test hot observable of ints"){
66         auto sc = rxsc::make_test();
67         auto w = sc.create_worker();
68         const rxsc::test::messages<int> on;
69 
70         int seed = 1;
71 
72         auto xs = sc.make_hot_observable({
73             on.next(150, 1),
74             on.completed(250)
75         });
76 
77         WHEN("mapped to ints that are one larger"){
78 
79             auto res = w.start(
__anonde9ebd360602() 80                 [&]() {
81                     return xs
82                         .scan(seed, [](int sum, int x) {
83                             return sum + x;
84                         })
85                         // forget type to workaround lambda deduction bug on msvc 2013
86                         .as_dynamic();
87                 }
88             );
89 
90             THEN("the output stops on completion"){
91                 auto required = rxu::to_vector({
92                     on.completed(250)
93                 });
94                 auto actual = res.get_observer().messages();
95                 REQUIRE(required == actual);
96             }
97 
98             THEN("there was one subscription and one unsubscription"){
99                 auto required = rxu::to_vector({
100                     on.subscribe(200, 250)
101                 });
102                 auto actual = xs.subscriptions();
103                 REQUIRE(required == actual);
104             }
105         }
106     }
107 }
108 
109 SCENARIO("scan: seed, return", "[scan][operators]"){
110     GIVEN("a test hot observable of ints"){
111         auto sc = rxsc::make_test();
112         auto w = sc.create_worker();
113         const rxsc::test::messages<int> on;
114 
115         int seed = 1;
116 
117         auto xs = sc.make_hot_observable({
118             on.next(150, 1),
119             on.next(220, 2),
120             on.completed(250)
121         });
122 
123         WHEN("mapped to ints that are one larger"){
124 
125             auto res = w.start(
__anonde9ebd360802() 126                 [&]() {
127                     return xs
128                         .scan(seed, [](int sum, int x) {
129                             return sum + x;
130                         })
131                         // forget type to workaround lambda deduction bug on msvc 2013
132                         .as_dynamic();
133                 }
134             );
135 
136             THEN("the output stops on completion"){
137                 auto required = rxu::to_vector({
138                     on.next(220, seed + 2),
139                     on.completed(250)
140                 });
141                 auto actual = res.get_observer().messages();
142                 REQUIRE(required == actual);
143             }
144 
145             THEN("there was one subscription and one unsubscription"){
146                 auto required = rxu::to_vector({
147                     on.subscribe(200, 250)
148                 });
149                 auto actual = xs.subscriptions();
150                 REQUIRE(required == actual);
151             }
152         }
153     }
154 }
155 
156 SCENARIO("scan: seed, throw", "[scan][operators]"){
157     GIVEN("a test hot observable of ints"){
158         auto sc = rxsc::make_test();
159         auto w = sc.create_worker();
160         const rxsc::test::messages<int> on;
161 
162         int seed = 1;
163 
164         std::runtime_error ex("scan on_error from source");
165 
166         auto xs = sc.make_hot_observable({
167             on.next(150, 1),
168             on.error(250, ex)
169         });
170 
171         WHEN("mapped to ints that are one larger"){
172 
173             auto res = w.start(
__anonde9ebd360a02() 174                 [&]() {
175                     return xs
176                         .scan(seed, [](int sum, int x) {
177                             return sum + x;
178                         })
179                         // forget type to workaround lambda deduction bug on msvc 2013
180                         .as_dynamic();
181                 }
182             );
183 
184             THEN("the output stops on error"){
185                 auto required = rxu::to_vector({
186                     on.error(250, ex)
187                 });
188                 auto actual = res.get_observer().messages();
189                 REQUIRE(required == actual);
190             }
191 
192             THEN("there was one subscription and one unsubscription"){
193                 auto required = rxu::to_vector({
194                     on.subscribe(200, 250)
195                 });
196                 auto actual = xs.subscriptions();
197                 REQUIRE(required == actual);
198             }
199         }
200     }
201 }
202 
203 SCENARIO("scan: seed, some data", "[scan][operators]"){
204     GIVEN("a test hot observable of ints"){
205         auto sc = rxsc::make_test();
206         auto w = sc.create_worker();
207         const rxsc::test::messages<int> on;
208 
209         int seed = 1;
210 
211         auto xs = sc.make_hot_observable({
212             on.next(150, 1),
213             on.next(210, 2),
214             on.next(220, 3),
215             on.next(230, 4),
216             on.next(240, 5),
217             on.completed(250)
218         });
219 
220         WHEN("mapped to ints that are one larger"){
221 
222             auto res = w.start(
__anonde9ebd360c02() 223                 [&]() {
224                     return xs
225                         .scan(seed, [](int sum, int x) {
226                             return sum + x;
227                         })
228                         // forget type to workaround lambda deduction bug on msvc 2013
229                         .as_dynamic();
230                 }
231             );
232 
233             THEN("the output stops on completion"){
234                 auto required = rxu::to_vector({
235                     on.next(210, seed + 2),
236                     on.next(220, seed + 2 + 3),
237                     on.next(230, seed + 2 + 3 + 4),
238                     on.next(240, seed + 2 + 3 + 4 + 5),
239                     on.completed(250)
240                 });
241                 auto actual = res.get_observer().messages();
242                 REQUIRE(required == actual);
243             }
244 
245             THEN("there was one subscription and one unsubscription"){
246                 auto required = rxu::to_vector({
247                     on.subscribe(200, 250)
248                 });
249                 auto actual = xs.subscriptions();
250                 REQUIRE(required == actual);
251             }
252         }
253     }
254 }
255 
256 SCENARIO("scan: seed, accumulator throws", "[scan][operators][!throws]"){
257     GIVEN("a test hot observable of ints"){
258         auto sc = rxsc::make_test();
259         auto w = sc.create_worker();
260         const rxsc::test::messages<int> on;
261 
262         int seed = 1;
263 
264         std::runtime_error ex("scan on_error from source");
265 
266         auto xs = sc.make_hot_observable({
267             on.next(150, 1),
268             on.next(210, 2),
269             on.next(220, 3),
270             on.next(230, 4),
271             on.next(240, 5),
272             on.completed(250)
273         });
274 
275         WHEN("mapped to ints that are one larger"){
276 
277             auto res = w.start(
__anonde9ebd360e02() 278                 [&]() {
279                     return xs
280                         .scan(seed, [&](int sum, int x) {
281                             if (x == 4) {
282                                 rxu::throw_exception(ex);
283                             }
284                             return sum + x;
285                         })
286                         // forget type to workaround lambda deduction bug on msvc 2013
287                         .as_dynamic();
288                 }
289             );
290 
291             THEN("the output stops on error"){
292                 auto required = rxu::to_vector({
293                     on.next(210, seed + 2),
294                     on.next(220, seed + 2 + 3),
295                     on.error(230, ex)
296                 });
297                 auto actual = res.get_observer().messages();
298                 REQUIRE(required == actual);
299             }
300 
301             THEN("there was one subscription and one unsubscription"){
302                 auto required = rxu::to_vector({
303                     on.subscribe(200, 230)
304                 });
305                 auto actual = xs.subscriptions();
306                 REQUIRE(required == actual);
307             }
308         }
309     }
310 }
311