• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-merge_delay_error.hpp>
4 #include <rxcpp/operators/rx-observe_on.hpp>
5 
6 //merge_delay_error must work the very same way as `merge()` except the error handling
7 
8 SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
9     GIVEN("1 hot observable with 3 cold observables of ints."){
10         auto sc = rxsc::make_test();
11         auto w = sc.create_worker();
12         const rxsc::test::messages<int> on;
13         const rxsc::test::messages<rx::observable<int>> o_on;
14 
15         auto ys1 = sc.make_cold_observable({
16             on.next(10, 101),
17             on.next(20, 102),
18             on.next(110, 103),
19             on.next(120, 104),
20             on.next(210, 105),
21             on.next(220, 106),
22             on.completed(230)
23         });
24 
25         auto ys2 = sc.make_cold_observable({
26             on.next(10, 201),
27             on.next(20, 202),
28             on.next(30, 203),
29             on.next(40, 204),
30             on.completed(50)
31         });
32 
33         auto ys3 = sc.make_cold_observable({
34             on.next(10, 301),
35             on.next(20, 302),
36             on.next(30, 303),
37             on.next(40, 304),
38             on.next(120, 305),
39             on.completed(150)
40         });
41 
42         auto xs = sc.make_hot_observable({
43             o_on.next(300, ys1),
44             o_on.next(400, ys2),
45             o_on.next(500, ys3),
46             o_on.completed(600)
47         });
48 
49         WHEN("each int is merged"){
50 
51             auto res = w.start(
__anon3d92c6da0102() 52                 [&]() {
53                     return xs
54                         | rxo::merge_delay_error()
55                         // forget type to workaround lambda deduction bug on msvc 2013
56                         | rxo::as_dynamic();
57                 }
58             );
59 
60             THEN("the output contains merged ints"){
61                 auto required = rxu::to_vector({
62                     on.next(310, 101),
63                     on.next(320, 102),
64                     on.next(410, 103),
65                     on.next(410, 201),
66                     on.next(420, 104),
67                     on.next(420, 202),
68                     on.next(430, 203),
69                     on.next(440, 204),
70                     on.next(510, 105),
71                     on.next(510, 301),
72                     on.next(520, 106),
73                     on.next(520, 302),
74                     on.next(530, 303),
75                     on.next(540, 304),
76                     on.next(620, 305),
77                     on.completed(650)
78                 });
79                 auto actual = res.get_observer().messages();
80                 REQUIRE(required == actual);
81             }
82 
83             THEN("there was one subscription and one unsubscription to the xs"){
84                 auto required = rxu::to_vector({
85                     on.subscribe(200, 600)
86                 });
87                 auto actual = xs.subscriptions();
88                 REQUIRE(required == actual);
89             }
90 
91             THEN("there was one subscription and one unsubscription to the ys1"){
92                 auto required = rxu::to_vector({
93                     on.subscribe(300, 530)
94                 });
95                 auto actual = ys1.subscriptions();
96                 REQUIRE(required == actual);
97             }
98 
99             THEN("there was one subscription and one unsubscription to the ys2"){
100                 auto required = rxu::to_vector({
101                     on.subscribe(400, 450)
102                 });
103                 auto actual = ys2.subscriptions();
104                 REQUIRE(required == actual);
105             }
106 
107             THEN("there was one subscription and one unsubscription to the ys3"){
108                 auto required = rxu::to_vector({
109                     on.subscribe(500, 650)
110                 });
111                 auto actual = ys3.subscriptions();
112                 REQUIRE(required == actual);
113             }
114         }
115     }
116 }
117 
118 SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
119     GIVEN("1 hot observable with 3 cold observables of ints."){
120         auto sc = rxsc::make_test();
121         auto w = sc.create_worker();
122         const rxsc::test::messages<int> on;
123         const rxsc::test::messages<rx::observable<int>> o_on;
124 
125         auto ys1 = sc.make_cold_observable({
126             on.next(10, 101),
127             on.next(20, 102),
128             on.next(110, 103),
129             on.next(120, 104),
130             on.next(210, 105),
131             on.next(230, 107),
132             on.completed(240)
133         });
134 
135         auto ys2 = sc.make_cold_observable({
136             on.next(10, 201),
137             on.next(20, 202),
138             on.next(30, 203),
139             on.error(40, std::runtime_error("merge_delay_error on_error from ys2")),
140             on.next(50, 205),
141             on.completed(60)
142         });
143 
144         auto ys3 = sc.make_cold_observable({
145             on.next(10, 301),
146             on.next(20, 302),
147             on.next(30, 303),
148             on.next(40, 304),
149             on.next(120, 305),
150             on.completed(150)
151         });
152 
153         WHEN("each int is merged"){
154 
155             auto res = w.start(
__anon3d92c6da0202() 156                 [&]() {
157                     return ys1
158                         .merge_delay_error(ys2, ys3);
159                 }
160             );
161 
162             rx::composite_exception ex;
163             THEN("the output contains merged ints"){
164                 auto required = rxu::to_vector({
165                     on.next(210, 101),
166                     on.next(210, 201),
167                     on.next(210, 301),
168                     on.next(220, 102),
169                     on.next(220, 202),
170                     on.next(220, 302),
171                     on.next(230, 203),
172                     on.next(230, 303),
173                     on.next(240, 304),
174                     on.next(310, 103),
175                     on.next(320, 104),
176                     on.next(320, 305),
177                     on.next(410, 105),
178                     on.next(430, 107),
179                     on.error(440, ex)
180                 });
181                 auto actual = res.get_observer().messages();
182                 REQUIRE(required == actual);
183             }
184 
185             THEN("there was one subscription and one unsubscription to the ys1"){
186                 auto required = rxu::to_vector({
187                     on.subscribe(200, 440)
188                 });
189                 auto actual = ys1.subscriptions();
190                 REQUIRE(required == actual);
191             }
192 
193             THEN("there was one subscription and one unsubscription to the ys2"){
194                 auto required = rxu::to_vector({
195                     on.subscribe(200, 240)
196                 });
197                 auto actual = ys2.subscriptions();
198                 REQUIRE(required == actual);
199             }
200 
201             THEN("there was one subscription and one unsubscription to the ys3"){
202                 auto required = rxu::to_vector({
203                     on.subscribe(200, 350)
204                 });
205                 auto actual = ys3.subscriptions();
206                 REQUIRE(required == actual);
207             }
208         }
209     }
210 }
211 
212 SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
213     GIVEN("1 hot observable with 3 cold observables of ints."){
214         auto sc = rxsc::make_test();
215         auto w = sc.create_worker();
216         const rxsc::test::messages<int> on;
217         const rxsc::test::messages<rx::observable<int>> o_on;
218 
219         auto ys1 = sc.make_cold_observable({
220             on.next(10, 101),
221             on.next(20, 102),
222             on.next(110, 103),
223             on.next(120, 104),
224             on.next(210, 105),
225             on.error(220, std::runtime_error("merge_delay_error on_error from ys1")),
226             on.next(230, 107),
227             on.completed(240)
228         });
229 
230         auto ys2 = sc.make_cold_observable({
231             on.next(10, 201),
232             on.next(20, 202),
233             on.next(30, 203),
234             on.error(40, std::runtime_error("merge_delay_error on_error from ys2")),
235             on.next(50, 205),
236             on.completed(60)
237         });
238 
239         auto ys3 = sc.make_cold_observable({
240             on.next(10, 301),
241             on.next(20, 302),
242             on.next(30, 303),
243             on.next(40, 304),
244             on.next(120, 305),
245             on.completed(150)
246         });
247 
248         WHEN("each int is merged"){
249 
250             auto res = w.start(
__anon3d92c6da0302() 251                 [&]() {
252                     return ys1
253                         .merge_delay_error(ys2, ys3);
254                 }
255             );
256 
257             rx::composite_exception ex;
258             THEN("the output contains merged ints"){
259                 auto required = rxu::to_vector({
260                     on.next(210, 101),
261                     on.next(210, 201),
262                     on.next(210, 301),
263                     on.next(220, 102),
264                     on.next(220, 202),
265                     on.next(220, 302),
266                     on.next(230, 203),
267                     on.next(230, 303),
268                     on.next(240, 304),
269                     on.next(310, 103),
270                     on.next(320, 104),
271                     on.next(320, 305),
272                     on.next(410, 105),
273                     on.error(420, ex)
274                 });
275                 auto actual = res.get_observer().messages();
276                 REQUIRE(required == actual);
277             }
278 
279             THEN("there was one subscription and one unsubscription to the ys1"){
280                 auto required = rxu::to_vector({
281                     on.subscribe(200, 420)
282                 });
283                 auto actual = ys1.subscriptions();
284                 REQUIRE(required == actual);
285             }
286 
287             THEN("there was one subscription and one unsubscription to the ys2"){
288                 auto required = rxu::to_vector({
289                     on.subscribe(200, 240)
290                 });
291                 auto actual = ys2.subscriptions();
292                 REQUIRE(required == actual);
293             }
294 
295             THEN("there was one subscription and one unsubscription to the ys3"){
296                 auto required = rxu::to_vector({
297                     on.subscribe(200, 350)
298                 });
299                 auto actual = ys3.subscriptions();
300                 REQUIRE(required == actual);
301             }
302         }
303     }
304 }
305