• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include "rxcpp/operators/rx-amb.hpp"
3 
4 SCENARIO("variadic amb never 3", "[amb][join][operators]"){
5     GIVEN("3 hot observables of ints."){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto ys1 = sc.make_hot_observable({
11             on.next(100, 1)
12         });
13 
14         auto ys2 = sc.make_hot_observable({
15             on.next(110, 2)
16         });
17 
18         auto ys3 = sc.make_hot_observable({
19             on.next(120, 3)
20         });
21 
22         WHEN("the first observable is selected to produce ints"){
23 
24             auto res = w.start(
__anon6fbafb480102() 25                 [&]() {
26                     return ys1
27                         | rxo::amb(ys2, ys3)
28                         // forget type to workaround lambda deduction bug on msvc 2013
29                         | rxo::as_dynamic();
30                 }
31             );
32 
33             THEN("the output is empty"){
34                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
35                 auto actual = res.get_observer().messages();
36                 REQUIRE(required == actual);
37             }
38 
39             THEN("there was one subscription and one unsubscription to the ys1"){
40                 auto required = rxu::to_vector({
41                     on.subscribe(200, 1000)
42                 });
43                 auto actual = ys1.subscriptions();
44                 REQUIRE(required == actual);
45             }
46 
47             THEN("there was one subscription and one unsubscription to the ys2"){
48                 auto required = rxu::to_vector({
49                     on.subscribe(200, 1000)
50                 });
51                 auto actual = ys2.subscriptions();
52                 REQUIRE(required == actual);
53             }
54 
55             THEN("there was one subscription and one unsubscription to the ys3"){
56                 auto required = rxu::to_vector({
57                     on.subscribe(200, 1000)
58                 });
59                 auto actual = ys3.subscriptions();
60                 REQUIRE(required == actual);
61             }
62         }
63     }
64 }
65 
66 SCENARIO("variadic amb never empty", "[amb][join][operators]"){
67     GIVEN("2 hot observables of ints."){
68         auto sc = rxsc::make_test();
69         auto w = sc.create_worker();
70         const rxsc::test::messages<int> on;
71 
72         auto ys1 = sc.make_hot_observable({
73             on.next(100, 1)
74         });
75 
76         auto ys2 = sc.make_hot_observable({
77             on.next(110, 2),
78             on.completed(400)
79         });
80 
81         WHEN("the first observable is selected to produce ints"){
82 
83             auto res = w.start(
__anon6fbafb480202() 84                 [&]() {
85                     return ys1
86                         .amb(ys2)
87                         // forget type to workaround lambda deduction bug on msvc 2013
88                         .as_dynamic();
89                 }
90             );
91 
92             THEN("the output contains only complete message"){
93                 auto required = rxu::to_vector({
94                     on.completed(400)
95                 });
96                 auto actual = res.get_observer().messages();
97                 REQUIRE(required == actual);
98             }
99 
100             THEN("there was one subscription and one unsubscription to the ys1"){
101                 auto required = rxu::to_vector({
102                     on.subscribe(200, 400)
103                 });
104                 auto actual = ys1.subscriptions();
105                 REQUIRE(required == actual);
106             }
107 
108             THEN("there was one subscription and one unsubscription to the ys2"){
109                 auto required = rxu::to_vector({
110                     on.subscribe(200, 400)
111                 });
112                 auto actual = ys2.subscriptions();
113                 REQUIRE(required == actual);
114             }
115         }
116     }
117 }
118 
119 SCENARIO("variadic amb empty never", "[amb][join][operators]"){
120     GIVEN("2 hot observables of ints."){
121         auto sc = rxsc::make_test();
122         auto w = sc.create_worker();
123         const rxsc::test::messages<int> on;
124 
125         auto ys1 = sc.make_hot_observable({
126             on.next(100, 1),
127             on.completed(400)
128         });
129 
130         auto ys2 = sc.make_hot_observable({
131             on.next(110, 2)
132         });
133 
134         WHEN("the first observable is selected to produce ints"){
135 
136             auto res = w.start(
__anon6fbafb480302() 137                 [&]() {
138                     return ys1
139                         .amb(ys2)
140                         // forget type to workaround lambda deduction bug on msvc 2013
141                         .as_dynamic();
142                 }
143             );
144 
145             THEN("the output contains only complete message"){
146                 auto required = rxu::to_vector({
147                     on.completed(400)
148                 });
149                 auto actual = res.get_observer().messages();
150                 REQUIRE(required == actual);
151             }
152 
153             THEN("there was one subscription and one unsubscription to the ys1"){
154                 auto required = rxu::to_vector({
155                     on.subscribe(200, 400)
156                 });
157                 auto actual = ys1.subscriptions();
158                 REQUIRE(required == actual);
159             }
160 
161             THEN("there was one subscription and one unsubscription to the ys2"){
162                 auto required = rxu::to_vector({
163                     on.subscribe(200, 400)
164                 });
165                 auto actual = ys2.subscriptions();
166                 REQUIRE(required == actual);
167             }
168         }
169     }
170 }
171 
172 SCENARIO("variadic amb completes", "[amb][join][operators]"){
173     GIVEN("3 cold observables of ints."){
174         auto sc = rxsc::make_test();
175         auto w = sc.create_worker();
176         const rxsc::test::messages<int> on;
177 
178         auto ys1 = sc.make_cold_observable({
179             on.next(10, 101),
180             on.next(110, 102),
181             on.next(210, 103),
182             on.completed(310)
183         });
184 
185         auto ys2 = sc.make_cold_observable({
186             on.next(20, 201),
187             on.next(120, 202),
188             on.next(220, 203),
189             on.completed(320)
190         });
191 
192         auto ys3 = sc.make_cold_observable({
193             on.next(30, 301),
194             on.next(130, 302),
195             on.next(230, 303),
196             on.completed(330)
197         });
198 
199         WHEN("the first observable is selected to produce ints"){
200 
201             auto res = w.start(
__anon6fbafb480402() 202                 [&]() {
203                     return ys1
204                         .amb(ys2, ys3)
205                         // forget type to workaround lambda deduction bug on msvc 2013
206                         .as_dynamic();
207                 }
208             );
209 
210             THEN("the output contains ints from the first observable"){
211                 auto required = rxu::to_vector({
212                     on.next(210, 101),
213                     on.next(310, 102),
214                     on.next(410, 103),
215                     on.completed(510)
216                 });
217                 auto actual = res.get_observer().messages();
218                 REQUIRE(required == actual);
219             }
220 
221             THEN("there was one subscription and one unsubscription to the ys1"){
222                 auto required = rxu::to_vector({
223                     on.subscribe(200, 510)
224                 });
225                 auto actual = ys1.subscriptions();
226                 REQUIRE(required == actual);
227             }
228 
229             THEN("there was one subscription and one unsubscription to the ys2"){
230                 auto required = rxu::to_vector({
231                     on.subscribe(200, 210)
232                 });
233                 auto actual = ys2.subscriptions();
234                 REQUIRE(required == actual);
235             }
236 
237             THEN("there was one subscription and one unsubscription to the ys3"){
238                 auto required = rxu::to_vector({
239                     on.subscribe(200, 210)
240                 });
241                 auto actual = ys3.subscriptions();
242                 REQUIRE(required == actual);
243             }
244         }
245     }
246 }
247 
248 SCENARIO("variadic amb winner&owner throws", "[amb][join][operators]"){
249     GIVEN("3 cold observables of ints."){
250         auto sc = rxsc::make_test();
251         auto w = sc.create_worker();
252         const rxsc::test::messages<int> on;
253 
254         std::runtime_error ex("amb on_error from source");
255 
256         auto ys1 = sc.make_cold_observable({
257             on.next(10, 101),
258             on.next(110, 102),
259             on.next(210, 103),
260             on.error(310, ex)
261         });
262 
263         auto ys2 = sc.make_cold_observable({
264             on.next(20, 201),
265             on.next(120, 202),
266             on.next(220, 203),
267             on.completed(320)
268         });
269 
270         auto ys3 = sc.make_cold_observable({
271             on.next(30, 301),
272             on.next(130, 302),
273             on.next(230, 303),
274             on.completed(330)
275         });
276 
277         WHEN("the first observable is selected to produce ints"){
278 
279             auto res = w.start(
__anon6fbafb480502() 280                 [&]() {
281                     return ys1
282                         .amb(ys2, ys3)
283                         // forget type to workaround lambda deduction bug on msvc 2013
284                         .as_dynamic();
285                 }
286             );
287 
288             THEN("the output contains ints from the first observable"){
289                 auto required = rxu::to_vector({
290                     on.next(210, 101),
291                     on.next(310, 102),
292                     on.next(410, 103),
293                     on.error(510, ex)
294                 });
295                 auto actual = res.get_observer().messages();
296                 REQUIRE(required == actual);
297             }
298 
299             THEN("there was one subscription and one unsubscription to the ys1"){
300                 auto required = rxu::to_vector({
301                     on.subscribe(200, 510)
302                 });
303                 auto actual = ys1.subscriptions();
304                 REQUIRE(required == actual);
305             }
306 
307             THEN("there was one subscription and one unsubscription to the ys2"){
308                 auto required = rxu::to_vector({
309                     on.subscribe(200, 210)
310                 });
311                 auto actual = ys2.subscriptions();
312                 REQUIRE(required == actual);
313             }
314 
315             THEN("there was one subscription and one unsubscription to the ys3"){
316                 auto required = rxu::to_vector({
317                     on.subscribe(200, 210)
318                 });
319                 auto actual = ys3.subscriptions();
320                 REQUIRE(required == actual);
321             }
322         }
323     }
324 }
325 
326 SCENARIO("variadic amb winner&non-owner throws", "[amb][join][operators]"){
327     GIVEN("3 cold observables of ints."){
328         auto sc = rxsc::make_test();
329         auto w = sc.create_worker();
330         const rxsc::test::messages<int> on;
331 
332         std::runtime_error ex("amb on_error from source");
333 
334         auto ys1 = sc.make_cold_observable({
335             on.next(10, 101),
336             on.next(110, 102),
337             on.next(210, 103),
338             on.error(310, ex)
339         });
340 
341         auto ys2 = sc.make_cold_observable({
342             on.next(20, 201),
343             on.next(120, 202),
344             on.next(220, 203),
345             on.completed(320)
346         });
347 
348         auto ys3 = sc.make_cold_observable({
349             on.next(30, 301),
350             on.next(130, 302),
351             on.next(230, 303),
352             on.completed(330)
353         });
354 
355         WHEN("the first observable is selected to produce ints"){
356 
357             auto res = w.start(
__anon6fbafb480602() 358                 [&]() {
359                     return ys2
360                         .amb(ys1, ys3)
361                         // forget type to workaround lambda deduction bug on msvc 2013
362                         .as_dynamic();
363                 }
364             );
365 
366             THEN("the output contains ints from the first observable"){
367                 auto required = rxu::to_vector({
368                     on.next(210, 101),
369                     on.next(310, 102),
370                     on.next(410, 103),
371                     on.error(510, ex)
372                 });
373                 auto actual = res.get_observer().messages();
374                 REQUIRE(required == actual);
375             }
376 
377             THEN("there was one subscription and one unsubscription to the ys1"){
378                 auto required = rxu::to_vector({
379                     on.subscribe(200, 510)
380                 });
381                 auto actual = ys1.subscriptions();
382                 REQUIRE(required == actual);
383             }
384 
385             THEN("there was one subscription and one unsubscription to the ys2"){
386                 auto required = rxu::to_vector({
387                     on.subscribe(200, 210)
388                 });
389                 auto actual = ys2.subscriptions();
390                 REQUIRE(required == actual);
391             }
392 
393             THEN("there was one subscription and one unsubscription to the ys3"){
394                 auto required = rxu::to_vector({
395                     on.subscribe(200, 210)
396                 });
397                 auto actual = ys3.subscriptions();
398                 REQUIRE(required == actual);
399             }
400         }
401     }
402 }
403 
404 SCENARIO("variadic amb loser&non-owner throws", "[amb][join][operators]"){
405     GIVEN("3 cold observables of ints."){
406         auto sc = rxsc::make_test();
407         auto w = sc.create_worker();
408         const rxsc::test::messages<int> on;
409 
410         std::runtime_error ex("amb on_error from source");
411 
412         auto ys1 = sc.make_cold_observable({
413             on.next(10, 101),
414             on.next(110, 102),
415             on.next(210, 103),
416             on.completed(310)
417         });
418 
419         auto ys2 = sc.make_cold_observable({
420             on.error(20, ex)
421         });
422 
423         auto ys3 = sc.make_cold_observable({
424             on.next(30, 301),
425             on.next(130, 302),
426             on.next(230, 303),
427             on.completed(330)
428         });
429 
430         WHEN("the first observable is selected to produce ints"){
431 
432             auto res = w.start(
__anon6fbafb480702() 433                 [&]() {
434                     return ys2
435                         .amb(ys1, ys3)
436                         // forget type to workaround lambda deduction bug on msvc 2013
437                         .as_dynamic();
438                 }
439             );
440 
441             THEN("the output contains ints from the first observable"){
442                 auto required = rxu::to_vector({
443                     on.next(210, 101),
444                     on.next(310, 102),
445                     on.next(410, 103),
446                     on.completed(510)
447                 });
448                 auto actual = res.get_observer().messages();
449                 REQUIRE(required == actual);
450             }
451 
452             THEN("there was one subscription and one unsubscription to the ys1"){
453                 auto required = rxu::to_vector({
454                     on.subscribe(200, 510)
455                 });
456                 auto actual = ys1.subscriptions();
457                 REQUIRE(required == actual);
458             }
459 
460             THEN("there was one subscription and one unsubscription to the ys2"){
461                 auto required = rxu::to_vector({
462                     on.subscribe(200, 210)
463                 });
464                 auto actual = ys2.subscriptions();
465                 REQUIRE(required == actual);
466             }
467 
468             THEN("there was one subscription and one unsubscription to the ys3"){
469                 auto required = rxu::to_vector({
470                     on.subscribe(200, 210)
471                 });
472                 auto actual = ys3.subscriptions();
473                 REQUIRE(required == actual);
474             }
475         }
476     }
477 }
478 
479 SCENARIO("variadic amb loser&owner throws", "[amb][join][operators]"){
480     GIVEN("3 cold observables of ints."){
481         auto sc = rxsc::make_test();
482         auto w = sc.create_worker();
483         const rxsc::test::messages<int> on;
484 
485         std::runtime_error ex("amb on_error from source");
486 
487         auto ys1 = sc.make_cold_observable({
488             on.next(10, 101),
489             on.next(110, 102),
490             on.next(210, 103),
491             on.completed(310)
492         });
493 
494         auto ys2 = sc.make_cold_observable({
495             on.error(20, ex)
496         });
497 
498         auto ys3 = sc.make_cold_observable({
499             on.next(30, 301),
500             on.next(130, 302),
501             on.next(230, 303),
502             on.completed(330)
503         });
504 
505         WHEN("the first observable is selected to produce ints"){
506 
507             auto res = w.start(
__anon6fbafb480802() 508                 [&]() {
509                     return ys1
510                         .amb(ys2, ys3)
511                         // forget type to workaround lambda deduction bug on msvc 2013
512                         .as_dynamic();
513                 }
514             );
515 
516             THEN("the output contains ints from the first observable"){
517                 auto required = rxu::to_vector({
518                     on.next(210, 101),
519                     on.next(310, 102),
520                     on.next(410, 103),
521                     on.completed(510)
522                 });
523                 auto actual = res.get_observer().messages();
524                 REQUIRE(required == actual);
525             }
526 
527             THEN("there was one subscription and one unsubscription to the ys1"){
528                 auto required = rxu::to_vector({
529                     on.subscribe(200, 510)
530                 });
531                 auto actual = ys1.subscriptions();
532                 REQUIRE(required == actual);
533             }
534 
535             THEN("there was one subscription and one unsubscription to the ys2"){
536                 auto required = rxu::to_vector({
537                     on.subscribe(200, 210)
538                 });
539                 auto actual = ys2.subscriptions();
540                 REQUIRE(required == actual);
541             }
542 
543             THEN("there was one subscription and one unsubscription to the ys3"){
544                 auto required = rxu::to_vector({
545                     on.subscribe(200, 210)
546                 });
547                 auto actual = ys3.subscriptions();
548                 REQUIRE(required == actual);
549             }
550         }
551     }
552 }
553 
554 SCENARIO("variadic amb never empty, custom coordination", "[amb][join][operators]"){
555     GIVEN("2 hot observables of ints."){
556         auto sc = rxsc::make_test();
557         auto so = rx::synchronize_in_one_worker(sc);
558         auto w = sc.create_worker();
559         const rxsc::test::messages<int> on;
560 
561         auto ys1 = sc.make_hot_observable({
562             on.next(100, 1)
563         });
564 
565         auto ys2 = sc.make_hot_observable({
566             on.next(110, 2),
567             on.completed(400)
568         });
569 
570         WHEN("the first observable is selected to produce ints"){
571 
572             auto res = w.start(
__anon6fbafb480902() 573                 [&]() {
574                     return ys1
575                         .amb(so, ys2)
576                         // forget type to workaround lambda deduction bug on msvc 2013
577                         .as_dynamic();
578                 }
579             );
580 
581             THEN("the output contains only complete message"){
582                 auto required = rxu::to_vector({
583                     on.completed(401)
584                 });
585                 auto actual = res.get_observer().messages();
586                 REQUIRE(required == actual);
587             }
588         }
589     }
590 }
591