• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include "rxcpp/operators/rx-reduce.hpp"
3 
4 SCENARIO("reduce some data with seed", "[reduce][operators]"){
5     GIVEN("a test hot observable of ints"){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         int seed = 42;
11 
12         auto xs = sc.make_hot_observable({
13             on.next(150, 1),
14             on.next(210, 0),
15             on.next(220, 1),
16             on.next(230, 2),
17             on.next(240, 3),
18             on.next(250, 4),
19             on.completed(260)
20         });
21 
22         WHEN("mapped to ints that are one larger"){
23 
24             auto res = w.start(
__anonb3a3a90d0102() 25                 [&]() {
26                     return xs
27                         .reduce(seed,
28                             [](int sum, int x) {
29                                 return sum + x;
30                             },
31                             [](int sum) {
32                                 return sum * 5;
33                             })
34                         // forget type to workaround lambda deduction bug on msvc 2013
35                         .as_dynamic();
36                 }
37             );
38 
39             THEN("the output stops on completion"){
40                 auto required = rxu::to_vector({
41                     on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
42                     on.completed(260)
43                 });
44                 auto actual = res.get_observer().messages();
45                 REQUIRE(required == actual);
46             }
47 
48             THEN("there was one subscription and one unsubscription"){
49                 auto required = rxu::to_vector({
50                     on.subscribe(200, 260)
51                 });
52                 auto actual = xs.subscriptions();
53                 REQUIRE(required == actual);
54             }
55         }
56     }
57 }
58 
59 SCENARIO("accumulate some data with seed", "[accumulate][reduce][operators]"){
60     GIVEN("a test hot observable of ints"){
61         auto sc = rxsc::make_test();
62         auto w = sc.create_worker();
63         const rxsc::test::messages<int> on;
64 
65         int seed = 42;
66 
67         auto xs = sc.make_hot_observable({
68             on.next(150, 1),
69             on.next(210, 0),
70             on.next(220, 1),
71             on.next(230, 2),
72             on.next(240, 3),
73             on.next(250, 4),
74             on.completed(260)
75         });
76 
77         WHEN("mapped to ints that are one larger"){
78 
79             auto res = w.start(
__anonb3a3a90d0402() 80                 [&]() {
81                     return xs
82                         .accumulate(seed,
83                             [](int sum, int x) {
84                                 return sum + x;
85                             },
86                             [](int sum) {
87                                 return sum * 5;
88                             })
89                         // forget type to workaround lambda deduction bug on msvc 2013
90                         .as_dynamic();
91                 }
92             );
93 
94             THEN("the output stops on completion"){
95                 auto required = rxu::to_vector({
96                     on.next(260, (seed + 0 + 1 + 2 + 3 + 4) * 5),
97                     on.completed(260)
98                 });
99                 auto actual = res.get_observer().messages();
100                 REQUIRE(required == actual);
101             }
102 
103             THEN("there was one subscription and one unsubscription"){
104                 auto required = rxu::to_vector({
105                     on.subscribe(200, 260)
106                 });
107                 auto actual = xs.subscriptions();
108                 REQUIRE(required == actual);
109             }
110         }
111     }
112 }
113 
114 SCENARIO("average some data", "[reduce][average][operators]"){
115     GIVEN("a test hot observable of ints"){
116         auto sc = rxsc::make_test();
117         auto w = sc.create_worker();
118         const rxsc::test::messages<int> on;
119         const rxsc::test::messages<double> d_on;
120 
121         auto xs = sc.make_hot_observable({
122             on.next(150, 1),
123             on.next(210, 3),
124             on.next(220, 4),
125             on.next(230, 2),
126             on.completed(250)
127         });
128 
129         WHEN("mapped to ints that are one larger"){
130 
131             auto res = w.start(
__anonb3a3a90d0702() 132                 [&]() {
133                     return xs.average();
134                 }
135             );
136 
137             THEN("the output stops on completion"){
138                 auto required = rxu::to_vector({
139                     d_on.next(250, 3.0),
140                     d_on.completed(250)
141                 });
142                 auto actual = res.get_observer().messages();
143                 REQUIRE(required == actual);
144             }
145 
146             THEN("there was one subscription and one unsubscription"){
147                 auto required = rxu::to_vector({
148                     on.subscribe(200, 250)
149                 });
150                 auto actual = xs.subscriptions();
151                 REQUIRE(required == actual);
152             }
153         }
154     }
155 }
156 
157 SCENARIO("sum some data", "[reduce][sum][operators]"){
158     GIVEN("a test hot observable of ints"){
159         auto sc = rxsc::make_test();
160         auto w = sc.create_worker();
161         const rxsc::test::messages<int> on;
162         const rxsc::test::messages<int> d_on;
163 
164         auto xs = sc.make_hot_observable({
165              on.next(150, 1),
166              on.next(210, 3),
167              on.next(220, 4),
168              on.next(230, 2),
169              on.completed(250)
170          });
171 
172         WHEN("sum is calculated"){
173 
174             auto res = w.start(
__anonb3a3a90d0802() 175                 [&]() {
176                     return xs.sum();
177                 }
178             );
179 
180             THEN("the output contains the sum of source values"){
181                 auto required = rxu::to_vector({
182                     d_on.next(250, 9),
183                     d_on.completed(250)
184                 });
185                 auto actual = res.get_observer().messages();
186                 REQUIRE(required == actual);
187             }
188 
189             THEN("there was one subscription and one unsubscription"){
190                 auto required = rxu::to_vector({
191                     on.subscribe(200, 250)
192                 });
193                 auto actual = xs.subscriptions();
194                 REQUIRE(required == actual);
195             }
196         }
197     }
198 }
199 
200 SCENARIO("max", "[reduce][max][operators]"){
201     GIVEN("a test hot observable of ints"){
202         auto sc = rxsc::make_test();
203         auto w = sc.create_worker();
204         const rxsc::test::messages<int> on;
205         const rxsc::test::messages<int> d_on;
206 
207         auto xs = sc.make_hot_observable({
208             on.next(150, 1),
209             on.next(210, 3),
210             on.next(220, 4),
211             on.next(230, 2),
212             on.completed(250)
213         });
214 
215         WHEN("max is calculated"){
216 
217             auto res = w.start(
__anonb3a3a90d0902() 218                 [&]() {
219                     return xs.max();
220                 }
221             );
222 
223             THEN("the output contains the max of source values"){
224                 auto required = rxu::to_vector({
225                     d_on.next(250, 4),
226                     d_on.completed(250)
227                 });
228                 auto actual = res.get_observer().messages();
229                 REQUIRE(required == actual);
230             }
231 
232             THEN("there was one subscription and one unsubscription"){
233                 auto required = rxu::to_vector({
234                     on.subscribe(200, 250)
235                 });
236                 auto actual = xs.subscriptions();
237                 REQUIRE(required == actual);
238             }
239         }
240     }
241 }
242 
243 // Does not work because calling max() on an empty stream throws an exception
244 // which will crash when exceptions are disabled.
245 //
246 // TODO: the max internal implementation should be rewritten not to throw exceptions.
247 SCENARIO("max, empty", "[reduce][max][operators][!throws]"){
248     GIVEN("a test hot observable of ints"){
249         auto sc = rxsc::make_test();
250         auto w = sc.create_worker();
251         const rxsc::test::messages<int> on;
252         const rxsc::test::messages<int> d_on;
253 
254         std::runtime_error ex("max on_error");
255 
256         auto xs = sc.make_hot_observable({
257             on.next(150, 1),
258             on.completed(250)
259         });
260 
261         WHEN("max is calculated"){
262 
263             auto res = w.start(
__anonb3a3a90d0a02() 264                 [&]() {
265                   return xs.max();
266                 }
267             );
268 
269             THEN("the output contains only error message"){
270                 auto required = rxu::to_vector({
271                     d_on.error(250, ex)
272                 });
273                 auto actual = res.get_observer().messages();
274                 REQUIRE(required == actual);
275             }
276 
277             THEN("there was one subscription and one unsubscription"){
278                 auto required = rxu::to_vector({
279                     on.subscribe(200, 250)
280                 });
281                 auto actual = xs.subscriptions();
282                 REQUIRE(required == actual);
283             }
284         }
285     }
286 }
287 
288 SCENARIO("max, error", "[reduce][max][operators]"){
289     GIVEN("a test hot observable of ints"){
290         auto sc = rxsc::make_test();
291         auto w = sc.create_worker();
292         const rxsc::test::messages<int> on;
293         const rxsc::test::messages<int> d_on;
294 
295         std::runtime_error ex("max on_error from source");
296 
297         auto xs = sc.make_hot_observable({
298             on.next(150, 1),
299             on.error(250, ex)
300         });
301 
302         WHEN("max is calculated"){
303 
304             auto res = w.start(
__anonb3a3a90d0b02() 305                 [&]() {
306                   return xs.max();
307                 }
308             );
309 
310             THEN("the output contains only error message"){
311                 auto required = rxu::to_vector({
312                     d_on.error(250, ex)
313                 });
314                 auto actual = res.get_observer().messages();
315                 REQUIRE(required == actual);
316             }
317 
318             THEN("there was one subscription and one unsubscription"){
319                 auto required = rxu::to_vector({
320                     on.subscribe(200, 250)
321                 });
322                 auto actual = xs.subscriptions();
323                 REQUIRE(required == actual);
324             }
325         }
326     }
327 }
328 
329 SCENARIO("min", "[reduce][min][operators]"){
330     GIVEN("a test hot observable of ints"){
331         auto sc = rxsc::make_test();
332         auto w = sc.create_worker();
333         const rxsc::test::messages<int> on;
334         const rxsc::test::messages<int> d_on;
335 
336         auto xs = sc.make_hot_observable({
337             on.next(150, 1),
338             on.next(210, 3),
339             on.next(220, 4),
340             on.next(230, 2),
341             on.completed(250)
342         });
343 
344         WHEN("min is calculated"){
345 
346             auto res = w.start(
__anonb3a3a90d0c02() 347                 [&]() {
348                   return xs.min();
349                 }
350             );
351 
352             THEN("the output contains the min of source values"){
353                 auto required = rxu::to_vector({
354                     d_on.next(250, 2),
355                     d_on.completed(250)
356                 });
357                 auto actual = res.get_observer().messages();
358                 REQUIRE(required == actual);
359             }
360 
361             THEN("there was one subscription and one unsubscription"){
362                 auto required = rxu::to_vector({
363                     on.subscribe(200, 250)
364                 });
365                 auto actual = xs.subscriptions();
366                 REQUIRE(required == actual);
367             }
368         }
369     }
370 }
371 
372 // Does not work with exceptions disabled, min will throw when stream is empty
373 // and this crashes immediately.
374 // TODO: min implementation should be rewritten not to throw exceptions.
375 SCENARIO("min, empty", "[reduce][min][operators][!throws]"){
376     GIVEN("a test hot observable of ints"){
377         auto sc = rxsc::make_test();
378         auto w = sc.create_worker();
379         const rxsc::test::messages<int> on;
380         const rxsc::test::messages<int> d_on;
381 
382         std::runtime_error ex("min on_error");
383 
384         auto xs = sc.make_hot_observable({
385             on.next(150, 1),
386             on.completed(250)
387         });
388 
389         WHEN("min is calculated"){
390 
391             auto res = w.start(
__anonb3a3a90d0d02() 392                 [&]() {
393                   return xs.min();
394                 }
395             );
396 
397             THEN("the output contains only error message"){
398                 auto required = rxu::to_vector({
399                     d_on.error(250, ex)
400                 });
401                 auto actual = res.get_observer().messages();
402                 REQUIRE(required == actual);
403             }
404 
405             THEN("there was one subscription and one unsubscription"){
406                 auto required = rxu::to_vector({
407                     on.subscribe(200, 250)
408                 });
409                 auto actual = xs.subscriptions();
410                 REQUIRE(required == actual);
411             }
412         }
413     }
414 }
415 
416 SCENARIO("min, error", "[reduce][min][operators]"){
417     GIVEN("a test hot observable of ints"){
418         auto sc = rxsc::make_test();
419         auto w = sc.create_worker();
420         const rxsc::test::messages<int> on;
421         const rxsc::test::messages<int> d_on;
422 
423         std::runtime_error ex("min on_error from source");
424 
425         auto xs = sc.make_hot_observable({
426             on.next(150, 1),
427             on.error(250, ex)
428         });
429 
430         WHEN("min is calculated"){
431 
432             auto res = w.start(
__anonb3a3a90d0e02() 433                 [&]() {
434                   return xs.min();
435                 }
436             );
437 
438             THEN("the output contains only error message"){
439                 auto required = rxu::to_vector({
440                     d_on.error(250, ex)
441                 });
442                 auto actual = res.get_observer().messages();
443                 REQUIRE(required == actual);
444             }
445 
446             THEN("there was one subscription and one unsubscription"){
447                 auto required = rxu::to_vector({
448                     on.subscribe(200, 250)
449                 });
450                 auto actual = xs.subscriptions();
451                 REQUIRE(required == actual);
452             }
453         }
454     }
455 }
456